Azure Data Lake Storage Gen2 Sink Connector

Type

Sink

Class

io.axual.connect.plugins.adls.gen2.AdlsGen2SinkConnector

Target System

Cloud Storage (Azure Data Lake Storage Gen2)

Maintainer

Axual

License

Apache License 2.0

Project

Source Code

Download

Maven Central

Description

The Azure Data Lake Storage Gen2 Sink Connector loads data from Kafka topics into a container in an ADLS Gen2 storage account. Records are combined and stored in files conforming to the Avro Object Container File specification. All records in a container file are read from the same partition.

Features

  • Offload data from Kafka topics to Azure Storage

  • Configurable staging and target directories

  • Target directory supports timestamp patterns to organise files into directories by timestamp

  • Use record timestamps or processing time for pattern resolving

  • Multiple file rotation triggers:

    • Timestamp resolves into a different target directory path

    • Key or value datatype changes (including different Avro schemas)

    • Number of records in container file

    • Size of container files

    • Inactivity on partition

  • Kafka topic offsets can be committed to only include offsets of records rotated to the target directory

When to Use

  • You need to archive Kafka topic data in Azure Data Lake Storage for downstream analytics or batch processing.

  • You require Avro-formatted files with configurable partitioning by time or schema.

  • You want fine-grained control over file rotation and offset commit behaviour.

When NOT to Use

  • You need real-time querying of Kafka data — this connector is designed for batch offloading, not low-latency retrieval.

  • Your target system is not ADLS Gen2 — consider the Amazon S3 Sink for AWS or a database sink for transactional workloads.

Installation

The library can be found on Maven Central.

  1. Search for the artifact on Maven Central.

  2. Select the version you wish to install.

  3. Download the JAR type for your Kafka Connect installation.

    Using the wrong JAR type can result in failing connectors caused by class not found exceptions.

    Available JAR types:

    • jar-with-dependencies — Contains all dependencies including the Azure SDK and Avro/Confluent Schema Registry Converter libraries. Use this if the common classpath of the Kafka Connect installation does not already contain the Avro or Confluent libraries.

    • jar-without-avro-dependencies — Contains all dependencies except Avro/Confluent libraries. Use this if those libraries are already on the Kafka Connect classpath.

    • jar — Not recommended. Contains only the compiled code with no dependencies.

For installation steps, see Installing Connector Plugins.

Configuration

The connector can be configured for the following categories:

To configure a connector in Axual Self Service, see Starting Connectors. TIP: To speed up your deployment, use the Terraform Boilerplate or the Management API Boilerplate.

Azure Connection

Key Type Default Description

adls.endpoint

String

<null>

The URL to connect to the storage service. Usually https://<account name>.dfs.core.windows.net.

adls.container.name

String

<null>

The name of the container in the storage account.

adls.auth.method

String

AccountKey

The authentication method. Available values: AccountKey, ClientSecret.

adls.client.timeout.seconds

Integer

15

Maximum seconds the Azure Data Lake Storage client will wait for a call to return before failing.

adls.client.retry.maximum.tries

Integer

4

Maximum number of times the Azure Data Lake Storage client will retry a call before failing.

adls.client.retry.interval

Long

10000

Milliseconds to wait before retrying. When exponential retry is enabled, this value doubles for each retry up to the maximum retry timeout.

adls.client.retry.exponential

Boolean

false

Whether the Azure Data Lake Storage client uses exponential backoff for retries.

adls.client.retry.maximum.interval

Long

60000

Maximum milliseconds to wait before retrying when using exponential backoff.

Account Key / Access Key Authentication

The account key method uses an Access Key for the Storage account.

Key Type Default Description

adls.account.name

String

<null>

The name of the Azure Data Lake Storage account.

adls.account.key

String

<null>

One of the access keys of the Azure Data Lake Storage account. Found in the Azure portal page of the account.

Client Secret Authentication

Uses a secret of an Azure Active Directory user or application registration.

Key Type Default Description

adls.tenant.id

String

<null>

The ID of the Azure Tenant for the Azure AD user/application registration.

adls.client.id

String

<null>

The ID of the client in the Azure AD user/application registration.

adls.client.secret

String

<null>

The secret for the client in the Azure AD user/application registration.

Retries

Key Type Default Description

retry.maximum.tries

Integer

10

Maximum number of times to retry an action before failing.

retry.interval

Long

500

Milliseconds to wait before retrying.

retry.exponential

Boolean

true

Use exponential backoff for retries. Doubles the retry interval for each subsequent retry.

retry.maximum.interval

Long

15000

Maximum milliseconds to wait before retrying when using exponential backoff.

Converter Configuration

Key Value Description

header.converter

org.apache.kafka.connect.storage.SimpleHeaderConverter

Attempts to create string data from all headers.

key.converter / value.converter

io.axual.connect.plugins.adls.gen2.AvroObjectConverter

Custom Avro Converter required to read from Avro topics while preserving the schema.

Additional converters usable as key or value converters if the topic data matches:

  • org.apache.kafka.connect.converters.ByteArrayConverter

  • org.apache.kafka.connect.converters.DoubleConverter

  • org.apache.kafka.connect.converters.FloatConverter

  • org.apache.kafka.connect.converters.IntegerConverter

  • org.apache.kafka.connect.converters.LongConverter

  • org.apache.kafka.connect.storage.StringConverter

Container File

Processed records are stored in Avro Object Container files in the staging directory. Files are moved (rotated) to the target directory when one of the following conditions is met:

  • Target directory changes when a timestamp pattern is in use

  • Maximum number of records per file reached

  • Maximum file size exceeded

  • Inactivity time reached (file has records in staging but no new records arrive within the configured period)

Key Type Default Description

base.directory

String

<empty string>

Base path for all files. Target and staging paths use this as root. Created if it does not exist.

staging.directory

String

staging

Directory where Avro container files are created before being moved to the target directory.

target.directory

String

target

Directory pattern where finalised Avro container files are stored. Supports time format patterns, e.g. target/year={yyyy}/month={MM}/day={dd}.

compression

String

<null>

Compression type for the container file. Valid values: snappy or null (no compression).

sync.interval

Integer

64000

Approximate number of uncompressed bytes to write in each Avro block. Higher values reduce Azure calls and improve throughput. Valid range: 32 to 2^30.

rotation.time.zone

String

UTC

Java ZoneID name used when determining the target directory with a time format pattern. E.g. GMT, UTC, Europe/Amsterdam.

rotation.time.source

String

processed

Timestamp source for time-based rotation. Use processed for connector processing time, produced for the record timestamp.

rotation.record.count

Integer

100

Maximum records per Avro container file before rotating.

rotation.inactivity

Long

1800000

Milliseconds of inactivity before rotating a file that has records in staging.

rotation.filesize

Long

100000000

Maximum file size in bytes. A rotation takes place when this limit is reached.

Offset Commit

Key Type Default Description

commit.rotated.only

Boolean

true

If true, only offsets of records in a rotated file are committed. If false, offsets of records in staging files are committed as well.

commit.record.count

Integer

100

Maximum records processed by a task before requesting a commit, when commit.rotated.only is false.

Getting Started

This section walks you through configuring the Azure Data Lake Storage Gen2 Sink Connector on Axual to offload records from a Kafka stream into an ADLS Gen2 container.

Prerequisites

Azure Storage account and container

You need an Azure Data Lake Storage Gen2 storage account with a container.

The Axual Connect cluster must be able to reach the ADLS Gen2 endpoint over HTTPS (port 443). Ask your cluster administrator to verify egress is allowed if the connector fails to connect.

Axual stream

The stream the connector will consume must already exist in Axual Self-Service. See Creating streams if you need to create it.

Steps

Step 1 — Create a connector application

  1. In Axual Self-Service, go to Applications and create a new application.

  2. Request Consumer access to the stream you want to offload to Azure.

  3. Wait for the stream access request to be approved.

See Configure and install a connector for detailed steps.

Step 2 — Configure the connector

In Axual Self-Service, open the application and add a new connector. Use the following minimal configuration for Account Key authentication as a starting point. Replace all placeholder values with those of your environment.

Property Value

connector.class

io.axual.connect.plugins.adls.gen2.AdlsGen2SinkConnector

topics

Axual-resolved stream name, e.g. myorg-myinst-dev-my-stream

adls.endpoint

https://<account-name>.dfs.core.windows.net

adls.container.name

<container-name>

adls.auth.method

AccountKey

adls.account.name

<account-name>

adls.account.key

${keyvault:connectors/<tenant>/<instance>/<env>/<app>:adls.account.key}

base.directory

<your-base-path>

key.converter

io.axual.connect.plugins.adls.gen2.AvroObjectConverter

value.converter

io.axual.connect.plugins.adls.gen2.AvroObjectConverter

header.converter

org.apache.kafka.connect.storage.SimpleHeaderConverter

Store adls.account.key (or client secret credentials) in Vault before starting the connector. See Configure and install a connector for instructions on adding Vault secrets.

For all available properties, see the Configuration section above.

Step 3 — Start the connector

Start the connector application from Axual Self-Service. Once running, records from the stream will be written to Avro Object Container files in the staging directory of your ADLS Gen2 container. Files are rotated to the target directory according to the rotation settings.

Step 4 — Verify

In the Azure portal, open your storage account and navigate to the container. You should see files appearing in the configured staging.directory and, after rotation, in target.directory.

Cleanup

When you are done testing:

  1. Stop the connector application in Axual Self-Service.

  2. Remove stream access for the application if it is no longer needed.

  3. Delete the files created in the Azure container if they were created only for testing.

Known limitations

  • Output is always Avro Object Container files — other output formats are not supported.

  • All records in a single container file originate from the same Kafka partition.

  • The jar (no dependencies) artifact type is not recommended and may cause class-not-found exceptions at runtime.

  • Kafka topic offsets for in-progress staging files are not committed until the file is rotated to the target directory (when commit.rotated.only=true, which is the default).

Examples

Example 1 — Minimal configuration with Account Key Authentication

Connects to the storage account myexampleadlsaccount, uses container test-container, creates base directory minimal-fixed-target, and uses default staging and target directories.

{
  "name": "minimal-fixed-target-account-key",
  "config": {
    "connector.class": "io.axual.connect.plugins.adls.gen2.AdlsGen2SinkConnector",
    "tasks.max": "3",
    "topics": "test-topic1",
    "adls.endpoint": "https://myexampleadlsaccount.dfs.core.windows.net",
    "adls.container.name": "test-container",
    "adls.auth.method": "AccountKey",
    "adls.account.name": "myexampleadlsaccount",
    "adls.account.key": "get-this-from-the-azure-storage-account-settings",
    "base.directory": "minimal-fixed-target",
    "key.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
    "value.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
    "header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter"
  }
}

Example 2 — Minimal configuration with Client Secret Authentication

Same as above but authenticates using Azure AD Client Secret.

{
  "name": "minimal-fixed-target-client-secret",
  "config": {
    "connector.class": "io.axual.connect.plugins.adls.gen2.AdlsGen2SinkConnector",
    "tasks.max": "3",
    "topics": "test-topic1",
    "adls.endpoint": "https://myexampleadlsaccount.dfs.core.windows.net",
    "adls.container.name": "test-container",
    "adls.auth.method": "ClientSecret",
    "adls.tenant.id": "get.tenant.id.from.azure.ad",
    "adls.client.id": "get.client.id.from.azure.ad.user.details",
    "adls.client.secret": "get.client.secret.from.azure.ad.user.details",
    "base.directory": "minimal-fixed-target",
    "key.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
    "value.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
    "header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter"
  }
}

Example 3 — Snappy compression with Account Key Authentication

Enables Snappy compression to create smaller Avro Object Container files.

{
  "name": "snappy-fixed-target-account-key",
  "config": {
    "connector.class": "io.axual.connect.plugins.adls.gen2.AdlsGen2SinkConnector",
    "tasks.max": "3",
    "topics": "test-topic1",
    "adls.endpoint": "https://myexampleadlsaccount.dfs.core.windows.net",
    "adls.container.name": "test-container",
    "adls.auth.method": "AccountKey",
    "adls.account.name": "myexampleadlsaccount",
    "adls.account.key": "get-this-from-the-azure-storage-account-settings",
    "base.directory": "minimal-fixed-target",
    "compression": "snappy",
    "key.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
    "value.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
    "header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter"
  }
}

Example 4 — Pattern target with Account Key Authentication

Uses a timestamp pattern as target directory to create a year/month/day structure. The record timestamp is used for pattern resolving with the GMT timezone.

{
  "name": "pattern-target-account-key",
  "config": {
    "connector.class": "io.axual.connect.plugins.adls.gen2.AdlsGen2SinkConnector",
    "tasks.max": "3",
    "topics": "test-topic1",
    "adls.endpoint": "https://myexampleadlsaccount.dfs.core.windows.net",
    "adls.container.name": "test-container",
    "adls.auth.method": "AccountKey",
    "adls.account.name": "myexampleadlsaccount",
    "adls.account.key": "get-this-from-the-azure-storage-account-settings",
    "base.directory": "minimal-fixed-target",
    "staging.directory": "staging",
    "target.directory": "{yyyy}/{MM}/{dd}",
    "rotation.time.source": "produced",
    "rotation.time.zone": "GMT",
    "key.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
    "value.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
    "header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter"
  }
}

Example 5 — Retry and pattern target with Account Key Authentication

Year/month/day target pattern with the Azure client configured for 10 retries (20s timeout, exponential backoff), and overall retry set to 8 retries with exponential backoff (1s interval, 1 minute maximum).

{
  "name": "retrying-pattern-target-account-key",
  "config": {
    "connector.class": "io.axual.connect.plugins.adls.gen2.AdlsGen2SinkConnector",
    "tasks.max": "3",
    "topics": "test-topic1",
    "adls.endpoint": "https://myexampleadlsaccount.dfs.core.windows.net",
    "adls.container.name": "test-container",
    "adls.auth.method": "AccountKey",
    "adls.account.name": "myexampleadlsaccount",
    "adls.account.key": "get-this-from-the-azure-storage-account-settings",
    "base.directory": "minimal-fixed-target",
    "staging.directory": "staging",
    "target.directory": "{yyyy}/{MM}/{dd}",
    "rotation.time.source": "produced",
    "rotation.time.zone": "GMT",
    "adls.client.retry.maximum.tries": "10",
    "adls.client.timeout.seconds": "20",
    "adls.client.retry.exponential": "true",
    "adls.client.retry.interval": "2000",
    "adls.client.retry.maximum.interval": "60000",
    "retry.maximum.tries": "8",
    "retry.exponential": "true",
    "retry.interval": "1000",
    "retry.maximum.interval": "60000",
    "key.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
    "value.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
    "header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter"
  }
}

Example 6 — Full configuration with Account Key Authentication

Full configuration combining pattern target, custom staging, and all retry settings.

{
  "name": "full-config-account-key",
  "config": {
    "connector.class": "io.axual.connect.plugins.adls.gen2.AdlsGen2SinkConnector",
    "tasks.max": "3",
    "topics": "test-topic1",
    "adls.endpoint": "https://myexampleadlsaccount.dfs.core.windows.net",
    "adls.container.name": "test-container",
    "adls.auth.method": "AccountKey",
    "adls.account.name": "myexampleadlsaccount",
    "adls.account.key": "get-this-from-the-azure-storage-account-settings",
    "base.directory": "minimal-fixed-target",
    "staging.directory": "staging",
    "target.directory": "{yyyy}/{MM}/{dd}",
    "rotation.time.source": "produced",
    "rotation.time.zone": "GMT",
    "adls.client.retry.maximum.tries": "10",
    "adls.client.timeout.seconds": "20",
    "adls.client.retry.exponential": "true",
    "adls.client.retry.interval": "2000",
    "adls.client.retry.maximum.interval": "60000",
    "retry.maximum.tries": "8",
    "retry.exponential": "true",
    "retry.interval": "1000",
    "retry.maximum.interval": "60000",
    "rotation.record.count": "20000",
    "rotation.inactivity": "3600000",
    "rotation.filesize": "50000000",
    "commit.rotated.only": "true",
    "commit.record.count": "5000",
    "compression": "snappy",
    "sync.interval": "64000",
    "key.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
    "value.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
    "header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter"
  }
}

License

This connector is licensed under the Apache License, Version 2.0.