Azure Data Lake Storage Gen2 Sink Connector
Type |
Sink |
Class |
|
Target System |
Cloud Storage (Azure Data Lake Storage Gen2) |
Maintainer |
Axual |
License |
Apache License 2.0 |
Project |
|
Download |
Description
The Azure Data Lake Storage Gen2 Sink Connector loads data from Kafka topics into a container in an ADLS Gen2 storage account. Records are combined and stored in files conforming to the Avro Object Container File specification. All records in a container file are read from the same partition.
Features
-
Offload data from Kafka topics to Azure Storage
-
Configurable staging and target directories
-
Target directory supports timestamp patterns to organise files into directories by timestamp
-
Use record timestamps or processing time for pattern resolving
-
Multiple file rotation triggers:
-
Timestamp resolves into a different target directory path
-
Key or value datatype changes (including different Avro schemas)
-
Number of records in container file
-
Size of container files
-
Inactivity on partition
-
-
Kafka topic offsets can be committed to only include offsets of records rotated to the target directory
When to Use
-
You need to archive Kafka topic data in Azure Data Lake Storage for downstream analytics or batch processing.
-
You require Avro-formatted files with configurable partitioning by time or schema.
-
You want fine-grained control over file rotation and offset commit behaviour.
When NOT to Use
-
You need real-time querying of Kafka data — this connector is designed for batch offloading, not low-latency retrieval.
-
Your target system is not ADLS Gen2 — consider the Amazon S3 Sink for AWS or a database sink for transactional workloads.
Installation
The library can be found on Maven Central.
-
Search for the artifact on Maven Central.
-
Select the version you wish to install.
-
Download the JAR type for your Kafka Connect installation.
Using the wrong JAR type can result in failing connectors caused by class not found exceptions. Available JAR types:
-
jar-with-dependencies — Contains all dependencies including the Azure SDK and Avro/Confluent Schema Registry Converter libraries. Use this if the common classpath of the Kafka Connect installation does not already contain the Avro or Confluent libraries.
-
jar-without-avro-dependencies — Contains all dependencies except Avro/Confluent libraries. Use this if those libraries are already on the Kafka Connect classpath.
-
jar — Not recommended. Contains only the compiled code with no dependencies.
-
For installation steps, see Installing Connector Plugins.
Configuration
The connector can be configured for the following categories:
-
Azure Connection — Target storage account, container, and Azure client retry options
-
Account Key / Access Key Authentication — Account Key based authentication settings
-
Client Secret Authentication — Azure AD Client Secret based authentication settings
-
Retries — Retry and failure handling logic
-
Converter Configuration — Plugin converter configuration
-
Container File — Staging and target file locations and rotation settings
-
Offset Commit — Offset commit behaviour
| To configure a connector in Axual Self Service, see Starting Connectors. TIP: To speed up your deployment, use the Terraform Boilerplate or the Management API Boilerplate. |
Azure Connection
| Key | Type | Default | Description |
|---|---|---|---|
|
String |
<null> |
The URL to connect to the storage service. Usually |
|
String |
<null> |
The name of the container in the storage account. |
|
String |
AccountKey |
The authentication method. Available values: |
|
Integer |
15 |
Maximum seconds the Azure Data Lake Storage client will wait for a call to return before failing. |
|
Integer |
4 |
Maximum number of times the Azure Data Lake Storage client will retry a call before failing. |
|
Long |
10000 |
Milliseconds to wait before retrying. When exponential retry is enabled, this value doubles for each retry up to the maximum retry timeout. |
|
Boolean |
false |
Whether the Azure Data Lake Storage client uses exponential backoff for retries. |
|
Long |
60000 |
Maximum milliseconds to wait before retrying when using exponential backoff. |
Account Key / Access Key Authentication
The account key method uses an Access Key for the Storage account.
| Key | Type | Default | Description |
|---|---|---|---|
|
String |
<null> |
The name of the Azure Data Lake Storage account. |
|
String |
<null> |
One of the access keys of the Azure Data Lake Storage account. Found in the Azure portal page of the account. |
Client Secret Authentication
Uses a secret of an Azure Active Directory user or application registration.
| Key | Type | Default | Description |
|---|---|---|---|
|
String |
<null> |
The ID of the Azure Tenant for the Azure AD user/application registration. |
|
String |
<null> |
The ID of the client in the Azure AD user/application registration. |
|
String |
<null> |
The secret for the client in the Azure AD user/application registration. |
Retries
| Key | Type | Default | Description |
|---|---|---|---|
|
Integer |
10 |
Maximum number of times to retry an action before failing. |
|
Long |
500 |
Milliseconds to wait before retrying. |
|
Boolean |
true |
Use exponential backoff for retries. Doubles the retry interval for each subsequent retry. |
|
Long |
15000 |
Maximum milliseconds to wait before retrying when using exponential backoff. |
Converter Configuration
| Key | Value | Description |
|---|---|---|
|
|
Attempts to create string data from all headers. |
|
|
Custom Avro Converter required to read from Avro topics while preserving the schema. |
Additional converters usable as key or value converters if the topic data matches:
-
org.apache.kafka.connect.converters.ByteArrayConverter -
org.apache.kafka.connect.converters.DoubleConverter -
org.apache.kafka.connect.converters.FloatConverter -
org.apache.kafka.connect.converters.IntegerConverter -
org.apache.kafka.connect.converters.LongConverter -
org.apache.kafka.connect.storage.StringConverter
Container File
Processed records are stored in Avro Object Container files in the staging directory. Files are moved (rotated) to the target directory when one of the following conditions is met:
-
Target directory changes when a timestamp pattern is in use
-
Maximum number of records per file reached
-
Maximum file size exceeded
-
Inactivity time reached (file has records in staging but no new records arrive within the configured period)
| Key | Type | Default | Description |
|---|---|---|---|
|
String |
<empty string> |
Base path for all files. Target and staging paths use this as root. Created if it does not exist. |
|
String |
staging |
Directory where Avro container files are created before being moved to the target directory. |
|
String |
target |
Directory pattern where finalised Avro container files are stored.
Supports time format patterns, e.g. |
|
String |
<null> |
Compression type for the container file. Valid values: |
|
Integer |
64000 |
Approximate number of uncompressed bytes to write in each Avro block. Higher values reduce Azure calls and improve throughput. Valid range: 32 to 2^30. |
|
String |
UTC |
Java ZoneID name used when determining the target directory with a time format pattern. E.g. |
|
String |
processed |
Timestamp source for time-based rotation. Use |
|
Integer |
100 |
Maximum records per Avro container file before rotating. |
|
Long |
1800000 |
Milliseconds of inactivity before rotating a file that has records in staging. |
|
Long |
100000000 |
Maximum file size in bytes. A rotation takes place when this limit is reached. |
Offset Commit
| Key | Type | Default | Description |
|---|---|---|---|
|
Boolean |
true |
If |
|
Integer |
100 |
Maximum records processed by a task before requesting a commit, when |
Getting Started
This section walks you through configuring the Azure Data Lake Storage Gen2 Sink Connector on Axual to offload records from a Kafka stream into an ADLS Gen2 container.
Prerequisites
Azure Storage account and container
You need an Azure Data Lake Storage Gen2 storage account with a container.
-
The storage account must have the hierarchical namespace enabled.
-
The account endpoint is typically
https://<account-name>.dfs.core.windows.net. -
Have your credentials ready: either an Account Key (from Storage account → Access keys in the Azure portal) or an Azure AD Client Secret.
| The Axual Connect cluster must be able to reach the ADLS Gen2 endpoint over HTTPS (port 443). Ask your cluster administrator to verify egress is allowed if the connector fails to connect. |
Axual stream
The stream the connector will consume must already exist in Axual Self-Service. See Creating streams if you need to create it.
Steps
Step 1 — Create a connector application
-
In Axual Self-Service, go to Applications and create a new application.
-
Request Consumer access to the stream you want to offload to Azure.
-
Wait for the stream access request to be approved.
See Configure and install a connector for detailed steps.
Step 2 — Configure the connector
In Axual Self-Service, open the application and add a new connector. Use the following minimal configuration for Account Key authentication as a starting point. Replace all placeholder values with those of your environment.
| Property | Value |
|---|---|
|
|
|
Axual-resolved stream name, e.g. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Store adls.account.key (or client secret credentials) in Vault before starting the connector.
See Configure and install a connector for instructions on adding Vault secrets.
|
For all available properties, see the Configuration section above.
Step 3 — Start the connector
Start the connector application from Axual Self-Service. Once running, records from the stream will be written to Avro Object Container files in the staging directory of your ADLS Gen2 container. Files are rotated to the target directory according to the rotation settings.
Known limitations
-
Output is always Avro Object Container files — other output formats are not supported.
-
All records in a single container file originate from the same Kafka partition.
-
The
jar(no dependencies) artifact type is not recommended and may cause class-not-found exceptions at runtime. -
Kafka topic offsets for in-progress staging files are not committed until the file is rotated to the target directory (when
commit.rotated.only=true, which is the default).
Examples
-
Example 1 — Minimal configuration with Account Key Authentication
-
Example 2 — Minimal configuration with Client Secret Authentication
-
Example 3 — Snappy compression with Account Key Authentication
-
Example 5 — Retry and pattern target with Account Key Authentication
-
Example 6 — Full configuration with Account Key Authentication
Example 1 — Minimal configuration with Account Key Authentication
Connects to the storage account myexampleadlsaccount, uses container test-container,
creates base directory minimal-fixed-target, and uses default staging and target directories.
{
"name": "minimal-fixed-target-account-key",
"config": {
"connector.class": "io.axual.connect.plugins.adls.gen2.AdlsGen2SinkConnector",
"tasks.max": "3",
"topics": "test-topic1",
"adls.endpoint": "https://myexampleadlsaccount.dfs.core.windows.net",
"adls.container.name": "test-container",
"adls.auth.method": "AccountKey",
"adls.account.name": "myexampleadlsaccount",
"adls.account.key": "get-this-from-the-azure-storage-account-settings",
"base.directory": "minimal-fixed-target",
"key.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
"value.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
"header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter"
}
}
Example 2 — Minimal configuration with Client Secret Authentication
Same as above but authenticates using Azure AD Client Secret.
{
"name": "minimal-fixed-target-client-secret",
"config": {
"connector.class": "io.axual.connect.plugins.adls.gen2.AdlsGen2SinkConnector",
"tasks.max": "3",
"topics": "test-topic1",
"adls.endpoint": "https://myexampleadlsaccount.dfs.core.windows.net",
"adls.container.name": "test-container",
"adls.auth.method": "ClientSecret",
"adls.tenant.id": "get.tenant.id.from.azure.ad",
"adls.client.id": "get.client.id.from.azure.ad.user.details",
"adls.client.secret": "get.client.secret.from.azure.ad.user.details",
"base.directory": "minimal-fixed-target",
"key.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
"value.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
"header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter"
}
}
Example 3 — Snappy compression with Account Key Authentication
Enables Snappy compression to create smaller Avro Object Container files.
{
"name": "snappy-fixed-target-account-key",
"config": {
"connector.class": "io.axual.connect.plugins.adls.gen2.AdlsGen2SinkConnector",
"tasks.max": "3",
"topics": "test-topic1",
"adls.endpoint": "https://myexampleadlsaccount.dfs.core.windows.net",
"adls.container.name": "test-container",
"adls.auth.method": "AccountKey",
"adls.account.name": "myexampleadlsaccount",
"adls.account.key": "get-this-from-the-azure-storage-account-settings",
"base.directory": "minimal-fixed-target",
"compression": "snappy",
"key.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
"value.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
"header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter"
}
}
Example 4 — Pattern target with Account Key Authentication
Uses a timestamp pattern as target directory to create a year/month/day structure.
The record timestamp is used for pattern resolving with the GMT timezone.
{
"name": "pattern-target-account-key",
"config": {
"connector.class": "io.axual.connect.plugins.adls.gen2.AdlsGen2SinkConnector",
"tasks.max": "3",
"topics": "test-topic1",
"adls.endpoint": "https://myexampleadlsaccount.dfs.core.windows.net",
"adls.container.name": "test-container",
"adls.auth.method": "AccountKey",
"adls.account.name": "myexampleadlsaccount",
"adls.account.key": "get-this-from-the-azure-storage-account-settings",
"base.directory": "minimal-fixed-target",
"staging.directory": "staging",
"target.directory": "{yyyy}/{MM}/{dd}",
"rotation.time.source": "produced",
"rotation.time.zone": "GMT",
"key.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
"value.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
"header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter"
}
}
Example 5 — Retry and pattern target with Account Key Authentication
Year/month/day target pattern with the Azure client configured for 10 retries (20s timeout, exponential backoff), and overall retry set to 8 retries with exponential backoff (1s interval, 1 minute maximum).
{
"name": "retrying-pattern-target-account-key",
"config": {
"connector.class": "io.axual.connect.plugins.adls.gen2.AdlsGen2SinkConnector",
"tasks.max": "3",
"topics": "test-topic1",
"adls.endpoint": "https://myexampleadlsaccount.dfs.core.windows.net",
"adls.container.name": "test-container",
"adls.auth.method": "AccountKey",
"adls.account.name": "myexampleadlsaccount",
"adls.account.key": "get-this-from-the-azure-storage-account-settings",
"base.directory": "minimal-fixed-target",
"staging.directory": "staging",
"target.directory": "{yyyy}/{MM}/{dd}",
"rotation.time.source": "produced",
"rotation.time.zone": "GMT",
"adls.client.retry.maximum.tries": "10",
"adls.client.timeout.seconds": "20",
"adls.client.retry.exponential": "true",
"adls.client.retry.interval": "2000",
"adls.client.retry.maximum.interval": "60000",
"retry.maximum.tries": "8",
"retry.exponential": "true",
"retry.interval": "1000",
"retry.maximum.interval": "60000",
"key.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
"value.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
"header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter"
}
}
Example 6 — Full configuration with Account Key Authentication
Full configuration combining pattern target, custom staging, and all retry settings.
{
"name": "full-config-account-key",
"config": {
"connector.class": "io.axual.connect.plugins.adls.gen2.AdlsGen2SinkConnector",
"tasks.max": "3",
"topics": "test-topic1",
"adls.endpoint": "https://myexampleadlsaccount.dfs.core.windows.net",
"adls.container.name": "test-container",
"adls.auth.method": "AccountKey",
"adls.account.name": "myexampleadlsaccount",
"adls.account.key": "get-this-from-the-azure-storage-account-settings",
"base.directory": "minimal-fixed-target",
"staging.directory": "staging",
"target.directory": "{yyyy}/{MM}/{dd}",
"rotation.time.source": "produced",
"rotation.time.zone": "GMT",
"adls.client.retry.maximum.tries": "10",
"adls.client.timeout.seconds": "20",
"adls.client.retry.exponential": "true",
"adls.client.retry.interval": "2000",
"adls.client.retry.maximum.interval": "60000",
"retry.maximum.tries": "8",
"retry.exponential": "true",
"retry.interval": "1000",
"retry.maximum.interval": "60000",
"rotation.record.count": "20000",
"rotation.inactivity": "3600000",
"rotation.filesize": "50000000",
"commit.rotated.only": "true",
"commit.record.count": "5000",
"compression": "snappy",
"sync.interval": "64000",
"key.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
"value.converter": "io.axual.connect.plugins.adls.gen2.AvroObjectConverter",
"header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter"
}
}
License
This connector is licensed under the Apache License, Version 2.0.