Amazon S3-Bucket Source connector
Amazon S3-Bucket
This document makes the following assumptions:
-
You already have access to an AWS subscription.
-
You have an S3 bucket (e.g.,
my-s3-kafka-connect-source) containing data files you wish to ingest. -
You already have access to an AWS (service) account, which has read permissions on the bucket.
-
You have a secret access key and key ID of the (service) account.
-
You have access to the Axual Self-Service or Terraform to configure the connector.
|
The S3 Bucket MUST exist before starting the connector. The connector validates the bucket’s existence immediately upon startup. If the bucket specified in aws.s3.bucket.name does not exist, the connector task will throw an error and fail immediately.
|
| To speed up your deployment, we have prepared a Terraform boilerplate which you can use to spin up this connector quickly. You can find the boilerplate code here: Terraform Boilerplate Repository. |
For further details, you can consult the Connector GitHub Repository.
Configuring a new Amazon S3-Bucket source connector
-
Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
The name of the stream will bemy_s3_connector_source.
The key/value types will beString/String. -
Follow the Configure and install a connector documentation to set up a new Connector-Application.
Let’s call itmy_s3_source.
The plugin name isio.aiven.kafka.connect.s3.source.S3SourceConnector.
If a plugin isn’t available, ask a platform operator to install plugins. -
Before starting the connector, ensure you have a file in your S3 bucket to read.
For this example, create a file named
data.jsonin the root of your bucket with the following content:{"id": 1, "message": "Hello from S3"} {"id": 2, "message": "This is a test"} -
Provide the following minimal configuration in order to connect to the configured Amazon S3-Bucket bucket.
For advanced configuration, see the official connector documentation.aws.access.key.idExample value:
AYEM7RPD4TAXLHLPM333aws.secret.access.keyExample value:
Hc89Cnwp3MnNvmYdJRzlOPOe2WZFWtXt7FndjRCiaws.s3.regionUse the bucket’s real region
eu-central-1aws.s3.bucket.namemy-s3-kafka-connect-sourcetopicmy_s3_connector_source
Note: This connector uses the singulartopickey, nottopics.file.name.template.*\\.json
Uses regex to match any JSON file in the bucket.key.converterorg.apache.kafka.connect.storage.StringConvertervalue.converterorg.apache.kafka.connect.storage.StringConverter
Reads the file content as simple strings/bytes. -
Authorize the
my_s3_sourcesink Connector-Application to consume themy_s3_connector_sourcestream. -
You can now start the source Connector-Application.
S3 Object Key Name Format
The connector uses the following format for input files (blobs): <prefix><filename>.
-
<prefix>: An optional prefix used for subdirectories in the bucket. -
<filename>: The actual file name, which is parsed according to a configurable template.
You can control how the connector discovers and processes these files using the following settings:
-
file.name.template: Mandatory. If this is not set, no objects will be processed. -
native.start.key: Optional. Set this with an object key to begin processing objects from AFTER that key. In the event of a restart, the connector will resume from this key.
Template Variables and Placeholders
From version 3.4.0, the behavior of file.name.template depends on the distribution.type:
-
object_hash: You can set the template to.*to match all object keys, or use a regular expression to match specific keys. -
partition: You must use the placeholders described below to map parts of the filename to Kafka metadata.
The file name format supports placeholders with variable names of the form {{variable_name}}.
| Variable | Description |
|---|---|
|
Matches |
|
Matches |
|
Matches |
|
Matches |
Pattern Match Examples
The table below illustrates how different templates match specific filenames, and where issues might arise.
| Pattern | Matches File | Resulting Values |
|---|---|---|
|
|
topic= |
|
|
topic= |
|
|
topic= |
|
|
topic= |
Advanced Configuration Examples
Reading Avro files with Regex Matching
This configuration demonstrates how to ingest Avro files from a specific directory prefix (data/).
Instead of mapping filename parts to Kafka metadata, it uses a Regex (.*\.avro) to blindly match all files with the .avro extension.
Key features of this configuration:
-
Prefixing: Uses
aws.s3.prefixto restrict scanning to thedata/folder. -
Regex Matching: Uses
file.name.templatewith.*\\.avroto match any Avro file. -
Avro Format: Explicitly sets
input.formattoavroso the connector can parse the schema.
{
"connector.class": "io.aiven.kafka.connect.s3.source.S3SourceConnector",
"name": "aiven-s3-source-test-avro",
"aws.access.key.id": "...",
"aws.secret.access.key": "...",
"aws.s3.region": "us-east-1",
"aws.s3.bucket.name": "my-organization-test-bucket",
"aws.s3.prefix": "data/",
"topic": "s3-output-topic-avro",
"input.format": "avro",
"file.name.template": ".*\\.avro",
"file.compression.type": "none",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"errors.tolerance": "none",
"errors.log.enable": "true"
}
Cleanup
Once you are done, stop the Connector-Application and cleanup the unused Axual resources.
Don’t forget to return to AWS and delete your service account and S3 bucket to prevent unwanted costs.
License
Amazon S3-Bucket Source-Connector is licensed under the Apache License, Version 2.0.
Source code
The source code for the Connect-Plugin can be found at github.com/Aiven-Open/cloud-storage-connectors-for-apache-kafka.