Amazon S3-Bucket Source connector

Amazon S3-Bucket

This document makes the following assumptions:

  • You already have access to an AWS subscription.

  • You have an S3 bucket (e.g., my-s3-kafka-connect-source) containing data files you wish to ingest.

  • You already have access to an AWS (service) account, which has read permissions on the bucket.

  • You have a secret access key and key ID of the (service) account.

  • You have access to the Axual Self-Service or Terraform to configure the connector.

The S3 Bucket MUST exist before starting the connector.
The connector validates the bucket’s existence immediately upon startup. If the bucket specified in aws.s3.bucket.name does not exist, the connector task will throw an error and fail immediately.
To speed up your deployment, we have prepared a Terraform boilerplate which you can use to spin up this connector quickly. You can find the boilerplate code here: Terraform Boilerplate Repository.

For further details, you can consult the Connector GitHub Repository.

Configuring a new Amazon S3-Bucket source connector

  1. Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
    The name of the stream will be my_s3_connector_source.
    The key/value types will be String/String.

  2. Follow the Configure and install a connector documentation to set up a new Connector-Application.
    Let’s call it my_s3_source.
    The plugin name is io.aiven.kafka.connect.s3.source.S3SourceConnector.
    If a plugin isn’t available, ask a platform operator to install plugins.

  3. Before starting the connector, ensure you have a file in your S3 bucket to read.

    For this example, create a file named data.json in the root of your bucket with the following content:

    {"id": 1, "message": "Hello from S3"}
    {"id": 2, "message": "This is a test"}
  4. Provide the following minimal configuration in order to connect to the configured Amazon S3-Bucket bucket.
    For advanced configuration, see the official connector documentation.

    aws.access.key.id

    Example value:
    AYEM7RPD4TAXLHLPM333

    aws.secret.access.key

    Example value:
    Hc89Cnwp3MnNvmYdJRzlOPOe2WZFWtXt7FndjRCi

    aws.s3.region

    Use the bucket’s real region
    eu-central-1

    aws.s3.bucket.name

    my-s3-kafka-connect-source

    topic

    my_s3_connector_source
    Note: This connector uses the singular topic key, not topics.

    file.name.template

    .*\\.json
    Uses regex to match any JSON file in the bucket.

    key.converter

    org.apache.kafka.connect.storage.StringConverter

    value.converter

    org.apache.kafka.connect.storage.StringConverter
    Reads the file content as simple strings/bytes.

  5. Authorize the my_s3_source sink Connector-Application to consume the my_s3_connector_source stream.

  6. You can now start the source Connector-Application.

S3 Object Key Name Format

The connector uses the following format for input files (blobs): <prefix><filename>.

  • <prefix>: An optional prefix used for subdirectories in the bucket.

  • <filename>: The actual file name, which is parsed according to a configurable template.

You can control how the connector discovers and processes these files using the following settings:

  • file.name.template: Mandatory. If this is not set, no objects will be processed.

  • native.start.key: Optional. Set this with an object key to begin processing objects from AFTER that key. In the event of a restart, the connector will resume from this key.

Template Variables and Placeholders

From version 3.4.0, the behavior of file.name.template depends on the distribution.type:

  • object_hash: You can set the template to .* to match all object keys, or use a regular expression to match specific keys.

  • partition: You must use the placeholders described below to map parts of the filename to Kafka metadata.

The file name format supports placeholders with variable names of the form {{variable_name}}.

Variable Description

{{topic}}

Matches a-z, A-Z, 0-9, -, _, and . one or more times.
This pattern represents the Kafka topic. Once matching starts, it continues until a non-matchable character is encountered.
Care must be taken to ensure the topic does not match another part of the file name.

{{partition}}

Matches 0-9 one or more times.
This pattern specifies the Kafka partition the data should be written to.

{{start_offset}}

Matches 0-9 one or more times.
This represents the Kafka offset of the first record in the file.

{{timestamp}}

Matches 0-9 one or more times.
This is the timestamp of when the Kafka record was processed.

Pattern Match Examples

The table below illustrates how different templates match specific filenames, and where issues might arise.

Pattern Matches File Resulting Values

{{topic}}-{{partition}}-{{start_offset}}

customer-topic-1-1734445664111.txt

topic=customer-topic
partition=1
start_offset=1734445664111

{{topic}}-{{partition}}-{{start_offset}}

22-10-12/customer-topic-1-1734445664111.txt

topic=22 (Incorrect!)
partition=10
start_offset=12
The pattern matched the date prefix instead of the actual filename.

{{topic}}/{{partition}}/{{start_offset}}

customer-topic/1/1734445664111.txt

topic=customer-topic
partition=1
start_offset=1734445664111

topic/{{topic}}/partition/{{partition}}/startOffset/{{start_offset}}

topic/customer-topic/partition/1/startOffset/1734445664111.txt

topic=customer-topic
partition=1
start_offset=1734445664111

Advanced Configuration Examples

Reading Avro files with Regex Matching

This configuration demonstrates how to ingest Avro files from a specific directory prefix (data/). Instead of mapping filename parts to Kafka metadata, it uses a Regex (.*\.avro) to blindly match all files with the .avro extension.

Key features of this configuration:

  • Prefixing: Uses aws.s3.prefix to restrict scanning to the data/ folder.

  • Regex Matching: Uses file.name.template with .*\\.avro to match any Avro file.

  • Avro Format: Explicitly sets input.format to avro so the connector can parse the schema.

{
    "connector.class": "io.aiven.kafka.connect.s3.source.S3SourceConnector",
    "name": "aiven-s3-source-test-avro",
    "aws.access.key.id": "...",
    "aws.secret.access.key": "...",
    "aws.s3.region": "us-east-1",
    "aws.s3.bucket.name": "my-organization-test-bucket",
    "aws.s3.prefix": "data/",
    "topic": "s3-output-topic-avro",
    "input.format": "avro",
    "file.name.template": ".*\\.avro",
    "file.compression.type": "none",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "errors.tolerance": "none",
    "errors.log.enable": "true"
}

Cleanup

Once you are done, stop the Connector-Application and cleanup the unused Axual resources.
Don’t forget to return to AWS and delete your service account and S3 bucket to prevent unwanted costs.

License

Amazon S3-Bucket Source-Connector is licensed under the Apache License, Version 2.0.

Source code

The source code for the Connect-Plugin can be found at github.com/Aiven-Open/cloud-storage-connectors-for-apache-kafka.