Amazon S3 Source Connector
Type |
Source |
Class |
|
Target System |
Amazon S3 |
Maintainer |
Aiven |
License |
Apache License 2.0 |
Project |
github.com/Aiven-Open/cloud-storage-connectors-for-apache-kafka |
Download |
|
This page documents version 3.4.2. Newer versions should be compatible unless there are breaking changes, but field names or default values may differ. If you notice discrepancies, please contact Axual Support. |
Description
The Amazon S3 Source Connector reads files from an Amazon S3 bucket and publishes their contents as records to Kafka topics.
It is maintained by Aiven as part of the open-source github.com/Aiven-Open/cloud-storage-connectors-for-apache-kafka.
| To speed up your deployment, we have prepared a Terraform boilerplate which you can use to spin up this connector quickly. You can find the boilerplate code here: Terraform Boilerplate Repository. |
For further details, you can consult the Connector GitHub Repository.
Features
-
Read files from Amazon S3 and publish records to Kafka topics
-
Configurable file name template with regex or placeholder-based matching
-
Supports multiple input formats: JSON, Avro, and more
-
Configurable prefix to restrict scanning to a specific S3 directory
When to Use
-
You need to ingest files stored in S3 into Kafka for stream processing.
-
You want to replay historical data stored in S3 into Kafka topics.
When NOT to Use
-
You need to write Kafka records into S3 — use the Amazon S3 Sink Connector instead.
|
The S3 Bucket MUST exist before starting the connector. The connector validates the bucket’s existence immediately upon startup. If the bucket specified in aws.s3.bucket.name does not exist, the connector task will throw an error and fail immediately.
|
Installation
The connector is available from the GitHub Releases.
-
Navigate to the releases page and select the version matching your Kafka Connect installation.
-
Download the JAR file.
For installation steps, see Installing Connector Plugins.
Configuration
For the complete configuration reference, see the official connector documentation.
| To configure a connector in Axual Self-Service, see Starting Connectors. |
S3 Object Key Name Format
The connector uses the format <prefix><filename> for input files.
-
<prefix>: Optional prefix for subdirectories. -
<filename>: The actual file name, matched according tofile.name.template.
From version 3.4.0, file.name.template behaviour depends on distribution.type:
-
object_hash: Use.*to match all object keys, or a regex for specific keys. -
partition: Use placeholders to map parts of the filename to Kafka metadata.
Available placeholders:
| Variable | Description |
|---|---|
|
Matches the Kafka topic name portion of the file name. |
|
Matches the Kafka partition number. |
|
Matches the Kafka offset of the first record in the file. |
|
Matches the timestamp of when the Kafka record was processed. |
Getting Started
Prerequisites
AWS account and S3 bucket
-
You already have access to an AWS subscription.
-
You have an S3 bucket (e.g.
my-s3-kafka-connect-source) containing data files to ingest. -
You have an AWS (service) account with read permissions on the bucket.
-
You have the secret access key and key ID of the service account.
Axual stream
The stream where the connector will produce events must already exist in Axual Self-Service. See Creating streams if you need to create it.
Steps
Step 1 — Create a connector application
-
Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
The name of the stream will bemy_s3_connector_source.
The key/value types will beString/String. -
Follow the Configure and install a connector documentation to set up a new Connector-Application.
Let’s call itmy_s3_source.
The plugin name isio.aiven.kafka.connect.s3.source.S3SourceConnector.
If a plugin isn’t available, ask a platform operator to install plugins.
Step 2 — Configure the connector
-
Before starting the connector, ensure you have a file in your S3 bucket to read.
For this example, create a file nameddata.jsonin the root of your bucket with the following content:{"id": 1, "message": "Hello from S3"} {"id": 2, "message": "This is a test"} -
Provide the following minimal configuration:
aws.access.key.idExample value:
AYEM7RPD4TAXLHLPM333aws.secret.access.keyExample value:
Hc89Cnwp3MnNvmYdJRzlOPOe2WZFWtXt7FndjRCiaws.s3.regionUse the bucket’s real region
eu-central-1aws.s3.bucket.namemy-s3-kafka-connect-sourcetopicmy_s3_connector_source
Note: This connector uses the singulartopickey, nottopics.file.name.template.*\\.json
Uses regex to match any JSON file in the bucket.key.converterorg.apache.kafka.connect.storage.StringConvertervalue.converterorg.apache.kafka.connect.storage.StringConverter
Reads the file content as simple strings/bytes.For advanced options, see the official connector documentation.
-
Authorize the
my_s3_sourcesink Connector-Application to consume themy_s3_connector_sourcestream.
Step 4 — Verify
In Axual Self-Service, use stream-browse on my_s3_connector_source to confirm records from the S3 files are arriving.
Advanced Configuration Examples
Reading Avro files with Regex Matching
This configuration demonstrates how to ingest Avro files from a specific directory prefix (data/).
{
"connector.class": "io.aiven.kafka.connect.s3.source.S3SourceConnector",
"name": "aiven-s3-source-test-avro",
"aws.access.key.id": "...",
"aws.secret.access.key": "...",
"aws.s3.region": "us-east-1",
"aws.s3.bucket.name": "my-organization-test-bucket",
"aws.s3.prefix": "data/",
"topic": "s3-output-topic-avro",
"input.format": "avro",
"file.name.template": ".*\\.avro",
"file.compression.type": "none",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"errors.tolerance": "none",
"errors.log.enable": "true"
}
Known limitations
-
file.name.templateis mandatory — if not set, no objects are processed. -
The S3 bucket must exist before the connector starts; a missing bucket causes immediate task failure.
-
The connector uses the singular
topicconfiguration key, nottopics.
Examples
Minimal configuration
{
"name": "my-s3-source",
"config": {
"connector.class": "io.aiven.kafka.connect.s3.source.S3SourceConnector",
"aws.access.key.id": "<your-access-key-id>",
"aws.secret.access.key": "<your-secret-access-key>",
"aws.s3.region": "eu-central-1",
"aws.s3.bucket.name": "my-s3-kafka-connect-source",
"topic": "my_s3_connector_source",
"file.name.template": ".*\\.json",
"input.format": "jsonl",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
License
Amazon S3-Bucket source connector is licensed under the Apache License, Version 2.0.