Amazon S3 Source Connector

Type

Source

Class

io.aiven.kafka.connect.s3.source.S3SourceConnector

Target System

Amazon S3

Maintainer

Aiven

License

Apache License 2.0

Project

github.com/Aiven-Open/cloud-storage-connectors-for-apache-kafka

Download

GitHub Releases

This page documents version 3.4.2. Newer versions should be compatible unless there are breaking changes, but field names or default values may differ. If you notice discrepancies, please contact Axual Support.

Description

The Amazon S3 Source Connector reads files from an Amazon S3 bucket and publishes their contents as records to Kafka topics.

It is maintained by Aiven as part of the open-source github.com/Aiven-Open/cloud-storage-connectors-for-apache-kafka.

To speed up your deployment, we have prepared a Terraform boilerplate which you can use to spin up this connector quickly. You can find the boilerplate code here: Terraform Boilerplate Repository.

For further details, you can consult the Connector GitHub Repository.

Features

  • Read files from Amazon S3 and publish records to Kafka topics

  • Configurable file name template with regex or placeholder-based matching

  • Supports multiple input formats: JSON, Avro, and more

  • Configurable prefix to restrict scanning to a specific S3 directory

When to Use

  • You need to ingest files stored in S3 into Kafka for stream processing.

  • You want to replay historical data stored in S3 into Kafka topics.

When NOT to Use

The S3 Bucket MUST exist before starting the connector.
The connector validates the bucket’s existence immediately upon startup. If the bucket specified in aws.s3.bucket.name does not exist, the connector task will throw an error and fail immediately.

Installation

The connector is available from the GitHub Releases.

  1. Navigate to the releases page and select the version matching your Kafka Connect installation.

  2. Download the JAR file.

For installation steps, see Installing Connector Plugins.

Configuration

For the complete configuration reference, see the official connector documentation.

To configure a connector in Axual Self-Service, see Starting Connectors.

S3 Object Key Name Format

The connector uses the format <prefix><filename> for input files.

  • <prefix>: Optional prefix for subdirectories.

  • <filename>: The actual file name, matched according to file.name.template.

From version 3.4.0, file.name.template behaviour depends on distribution.type:

  • object_hash: Use .* to match all object keys, or a regex for specific keys.

  • partition: Use placeholders to map parts of the filename to Kafka metadata.

Available placeholders:

Variable Description

{{topic}}

Matches the Kafka topic name portion of the file name.

{{partition}}

Matches the Kafka partition number.

{{start_offset}}

Matches the Kafka offset of the first record in the file.

{{timestamp}}

Matches the timestamp of when the Kafka record was processed.

Getting Started

Prerequisites

AWS account and S3 bucket

  • You already have access to an AWS subscription.

  • You have an S3 bucket (e.g. my-s3-kafka-connect-source) containing data files to ingest.

  • You have an AWS (service) account with read permissions on the bucket.

  • You have the secret access key and key ID of the service account.

Axual stream

The stream where the connector will produce events must already exist in Axual Self-Service. See Creating streams if you need to create it.

Steps

Step 1 — Create a connector application

  1. Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
    The name of the stream will be my_s3_connector_source.
    The key/value types will be String/String.

  2. Follow the Configure and install a connector documentation to set up a new Connector-Application.
    Let’s call it my_s3_source.
    The plugin name is io.aiven.kafka.connect.s3.source.S3SourceConnector.
    If a plugin isn’t available, ask a platform operator to install plugins.

Step 2 — Configure the connector

  1. Before starting the connector, ensure you have a file in your S3 bucket to read.
    For this example, create a file named data.json in the root of your bucket with the following content:

    {"id": 1, "message": "Hello from S3"}
    {"id": 2, "message": "This is a test"}
  2. Provide the following minimal configuration:

    aws.access.key.id

    Example value:
    AYEM7RPD4TAXLHLPM333

    aws.secret.access.key

    Example value:
    Hc89Cnwp3MnNvmYdJRzlOPOe2WZFWtXt7FndjRCi

    aws.s3.region

    Use the bucket’s real region
    eu-central-1

    aws.s3.bucket.name

    my-s3-kafka-connect-source

    topic

    my_s3_connector_source
    Note: This connector uses the singular topic key, not topics.

    file.name.template

    .*\\.json
    Uses regex to match any JSON file in the bucket.

    key.converter

    org.apache.kafka.connect.storage.StringConverter

    value.converter

    org.apache.kafka.connect.storage.StringConverter
    Reads the file content as simple strings/bytes.

    For advanced options, see the official connector documentation.

  3. Authorize the my_s3_source sink Connector-Application to consume the my_s3_connector_source stream.

Step 3 — Start the connector

Start the connector application from Axual Self-Service.

Step 4 — Verify

In Axual Self-Service, use stream-browse on my_s3_connector_source to confirm records from the S3 files are arriving.

Advanced Configuration Examples

Reading Avro files with Regex Matching

This configuration demonstrates how to ingest Avro files from a specific directory prefix (data/).

{
    "connector.class": "io.aiven.kafka.connect.s3.source.S3SourceConnector",
    "name": "aiven-s3-source-test-avro",
    "aws.access.key.id": "...",
    "aws.secret.access.key": "...",
    "aws.s3.region": "us-east-1",
    "aws.s3.bucket.name": "my-organization-test-bucket",
    "aws.s3.prefix": "data/",
    "topic": "s3-output-topic-avro",
    "input.format": "avro",
    "file.name.template": ".*\\.avro",
    "file.compression.type": "none",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "errors.tolerance": "none",
    "errors.log.enable": "true"
}

Cleanup

When you are done:

  1. Stop the connector application in Axual Self-Service.

  2. Remove stream access for the application if no longer needed.

  3. Return to AWS and delete your service account and S3 bucket to prevent unwanted costs.

Known limitations

  • file.name.template is mandatory — if not set, no objects are processed.

  • The S3 bucket must exist before the connector starts; a missing bucket causes immediate task failure.

  • The connector uses the singular topic configuration key, not topics.

Examples

Minimal configuration

{
  "name": "my-s3-source",
  "config": {
    "connector.class": "io.aiven.kafka.connect.s3.source.S3SourceConnector",
    "aws.access.key.id": "<your-access-key-id>",
    "aws.secret.access.key": "<your-secret-access-key>",
    "aws.s3.region": "eu-central-1",
    "aws.s3.bucket.name": "my-s3-kafka-connect-source",
    "topic": "my_s3_connector_source",
    "file.name.template": ".*\\.json",
    "input.format": "jsonl",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter"
  }
}

License

Amazon S3-Bucket source connector is licensed under the Apache License, Version 2.0.