Kafka Sink Connector
Type |
Sink |
Class |
|
Target System |
Messaging & Streaming (Apache Kafka) |
Maintainer |
Axual |
License |
Apache License 2.0 |
Project |
|
Download |
Description
The Kafka Sink Connector is part of the Kafka Synchronisation Connectors project. It loads data from local Kafka topics to topics on another Kafka cluster, forwarding the full record including Kafka headers. The same library also contains the Kafka Source Connector.
Features
-
Load data from local Kafka topics to topics on a remote Kafka cluster
-
Copies the full record, including Kafka headers
-
Mark copied headers with a configurable prefix
-
Add new static headers to records
-
Add original record position as a header for tracing
-
Multiple strategies for determining the remote topic name
When to Use
-
You need to replicate or forward data from an Axual-managed Kafka cluster to another Kafka cluster.
-
You want to mirror select topics to a remote system with full header fidelity.
-
You need flexible topic name mapping (source, fixed, prefix, or mapping strategies).
When NOT to Use
-
You need transformation or enrichment of records before forwarding — use Kafka Streams or ksqlDB instead.
-
The target system is not Kafka — use a dedicated sink connector for the target platform.
Only ByteArrayConverter is supported with this connector. Other converters are not yet supported.
|
Installation
The library is available on Maven Central. This library also contains the Kafka Source Connector.
-
Search for the artifact on Maven Central.
-
Select the version you wish to install.
-
Download the JAR type for your Kafka Connect installation.
Using the wrong JAR type can result in failing connectors caused by class not found exceptions. Available JAR types:
-
jar-with-dependencies — Contains the compiled connector code and all dependencies. The Kafka Client version included in Kafka Connect is used.
-
jar — Contains only the compiled connector code with no dependencies. May cause class not found issues.
-
For installation steps, see Installing Connector Plugins.
Configuration
Configuration options are grouped as follows:
-
Worker and Remote Producer Settings — Internal worker and remote producer settings
-
Static Header Selection — Static headers to add to each record
-
Metadata Forwarding — Forwarding of record metadata (topic, partition, offset, timestamp) as headers
-
Topic Selection — Logic to determine the remote topic name
-
Partition Selection — Logic to determine the target partition
| To configure a connector in Axual Self Service, see Starting Connectors. TIP: To speed up your deployment, use the Terraform Boilerplate or the Management API Boilerplate. |
Worker and Remote Producer Settings
The worker settings control internal connector functionality such as queue sizes and wait times. Remote settings configure the producer connecting to the remote cluster.
| Key | Type | Default | Description |
|---|---|---|---|
|
prefix |
not applicable |
Prefix for remote producer settings. For example, |
|
Integer |
|
Size of the internal queue used to buffer lists of records for the asynchronous sender. |
|
Long |
|
How long the connector waits for the queue to accept a list of records (milliseconds). |
Static Header Selection
Adds one or more static headers to every produced record. Header names and values are Strings; values are sent as UTF-8 encoded byte arrays.
| Key | Type | Default | Description |
|---|---|---|---|
|
List |
empty list |
Aliases used to look up header name and value definitions. |
|
String |
null |
The header name for the given alias. |
|
String |
null |
The header value for the given alias. Sent as a UTF-8 encoded byte array. |
Metadata Forwarding
Forwards local record metadata fields as headers on the remote record, allowing the original source to be identified.
| Key | Type | Default | Description |
|---|---|---|---|
|
String |
|
Prefix added to header names of forwarded headers. |
|
String |
|
Header name for the original topic name. |
|
String |
|
Header name for the original partition number. |
|
String |
|
Header name for the original record offset. |
|
String |
|
Header name for the original record timestamp. |
Topic Selection
Determines which remote topic a record is produced to. Available strategies:
source — Use the same topic name as the original record (default, no further configuration needed).
| Key | Type | Example | Description |
|---|---|---|---|
|
String |
|
Use the source topic name. No additional configuration needed. |
prefix — Prepend a prefix to the source topic name.
| Key | Type | Example | Description |
|---|---|---|---|
|
String |
|
Use the prefix topic selector. |
|
String |
|
Prefix to prepend to forwarded topic names. Mandatory. |
fixed — Use a fixed remote topic name regardless of the source topic.
| Key | Type | Example | Description |
|---|---|---|---|
|
String |
|
Use the fixed topic selector. |
|
String |
|
The fixed target topic name. Mandatory. |
mapping — Map source topic names to specific remote topic names.
| Key | Type | Example | Description |
|---|---|---|---|
|
String |
|
Use the mapping topic selector. |
|
String |
|
Maps records from |
Partition Selection
Determines which partition a record is produced to on the remote topic.
| Key | Type | Default | Description |
|---|---|---|---|
|
String |
|
Partition selector strategy. Accepts |
Getting Started
This section walks you through configuring the Kafka Sink Connector on Axual to forward records from a local Kafka stream to a topic on a remote Kafka cluster.
Prerequisites
Remote Kafka cluster
You need a remote Kafka cluster accessible from the Axual Connect cluster network.
-
The bootstrap address of the remote cluster must be reachable from the Connect cluster.
-
If using the default
sourcetopic selector, the target topic must exist on the remote cluster with the same number of partitions as the source topic. -
If the remote cluster uses TLS or requires client certificate authentication, have the keystore and truststore files ready to reference from Vault.
Only ByteArrayConverter is supported. Ensure the upstream producer is also writing with ByteArrayConverter.
|
Axual stream
The stream the connector will consume must already exist in Axual Self-Service. See Creating streams if you need to create it.
Steps
Step 1 — Create a connector application
-
In Axual Self-Service, go to Applications and create a new application.
-
Request Consumer access to the stream you want to forward to the remote cluster.
-
Wait for the stream access request to be approved.
See Configure and install a connector for detailed steps.
Step 2 — Configure the connector
In Axual Self-Service, open the application and add a new connector. Use the following minimal configuration as a starting point. Replace all placeholder values with those of your environment.
| Property | Value |
|---|---|
|
|
|
Axual-resolved stream name, e.g. |
|
|
|
|
|
|
|
|
For topic and partition selector options, see the Topic Selection and Partition Selection sections above. For all available properties, see the connector releases and the Configuration section above.
Step 3 — Start the connector
Start the connector application from Axual Self-Service. Once running, records from the local stream will be forwarded to the configured topic on the remote Kafka cluster.
Known limitations
-
Only
ByteArrayConverteris supported for key, value, and header converters — schema-aware converters cannot be used. -
When using the
sourcepartition selector, the remote topic must have the same number of partitions as the source topic. -
There is no built-in message transformation — use Kafka Streams or ksqlDB if transformation is required before forwarding.
Examples
All examples assume a remote Kafka cluster using a PLAIN listener and access to the given resources unless stated otherwise.
Example 1 — Minimal configuration
Uses source topic and partition selectors (default). Requires the same topic to exist on the remote cluster with the same number of partitions.
{
"name": "kafka-sink-minimal",
"config": {
"connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
"tasks.max": "10",
"topics": "test-topic",
"remote.bootstrap.servers": "remote-cluster:19093",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
Example 2 — Minimal configuration using TLS client certificate
Same as above; the remote cluster uses TLS and requires a client certificate.
{
"name": "kafka-sink-minimal-with-tls",
"config": {
"connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
"tasks.max": "10",
"topics": "test-topic",
"remote.bootstrap.servers": "remote-cluster:19093",
"remote.security.protocol" : "SSL",
"remote.ssl.truststore.location" : "/clients/remote-cluster-truststore.jks",
"remote.ssl.truststore.password" : "someSecret",
"remote.ssl.keystore.location" : "/clients/remote-cluster-keystore.jks",
"remote.ssl.keystore.password" : "verySecret",
"remote.ssl.key.password" : "alsoVerySecret",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
Example 3 — Using static headers
Two static headers are added to each produced record.
{
"name": "kafka-sink-static-header",
"config": {
"connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
"tasks.max": "10",
"topics": "test-topic",
"remote.bootstrap.servers": "remote-cluster:19093",
"header.static.aliases": "why,now",
"header.static.why.name": "Example-Why",
"header.static.why.value": "We needed an example",
"header.static.how.name": "Example-How",
"header.static.how.value": "Using the Kafka Sink Connector",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
Example 4 — Using metadata forwarding
Each produced record contains metadata from the original record as custom-named headers.
{
"name": "kafka-sink-metadata-forward",
"config": {
"connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
"tasks.max": "10",
"topics": "test-topic",
"remote.bootstrap.servers": "remote-cluster:19093",
"forward.metadata.topic.name": "Example-Topic",
"forward.metadata.partition.name": "Example-Partition",
"forward.metadata.offset.name": "Example-Offset",
"forward.metadata.timestamp.name": "Example-Timestamp",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
Example 5 — Using source topic selector
Each record is written to the same topic as the original. Functionally identical to the minimal configuration.
{
"name": "kafka-sink-topic-source",
"config": {
"connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
"tasks.max": "10",
"topics": "test-topic",
"remote.bootstrap.servers": "remote-cluster:19093",
"topic.selector": "source",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
Example 6 — Using fixed topic selector
All records are written to forwarded-to-us regardless of their source topic.
{
"name": "kafka-sink-topic-fixed",
"config": {
"connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
"tasks.max": "10",
"topics": "test-topic",
"remote.bootstrap.servers": "remote-cluster:19093",
"topic.selector": "fixed",
"topic.selector.target": "forwarded-to-us",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
Example 7 — Using prefix topic selector
Records from test-topic are written to forwarded-test-topic on the remote cluster.
{
"name": "kafka-sink-topic-prefix",
"config": {
"connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
"tasks.max": "10",
"topics": "test-topic",
"remote.bootstrap.servers": "remote-cluster:19093",
"topic.selector": "prefix",
"topic.selector.prefix": "forwarded-",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
Example 8 — Using mapping topic selector
test-topic is mapped to forwarded-from-test-topic; topic2 is mapped to another-out.
{
"name": "kafka-sink-topic-mapping",
"config": {
"connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
"tasks.max": "10",
"topics": "test-topic,topic2",
"remote.bootstrap.servers": "remote-cluster:19093",
"topic.selector": "mapping",
"topic.selector.mapping.test-topic": "forwarded-from-test-topic",
"topic.selector.mapping.topic2": "another-out",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
Example 9 — Using source partition selector
Each record is written to the same partition as the original. Functionally identical to the minimal configuration.
{
"name": "kafka-sink-partition-source",
"config": {
"connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
"tasks.max": "10",
"topics": "test-topic",
"remote.bootstrap.servers": "remote-cluster:19093",
"partition.selector": "source",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
Example 10 — Using partitioner partition selector
Target partition is determined by the Kafka DefaultPartitioner.
{
"name": "kafka-sink-partition-partitioner",
"config": {
"connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
"tasks.max": "10",
"topics": "test-topic",
"remote.bootstrap.servers": "remote-cluster:19093",
"partition.selector": "partitioner",
"remote.partitioner.class": "org.apache.kafka.clients.producer.internals.DefaultPartitioner",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
Example 11 — Prefixing headers
Forwarded header names are prefixed with X-Original- on the produced record.
{
"name": "kafka-sink-minimal",
"config": {
"connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
"tasks.max": "10",
"topics": "test-topic",
"remote.bootstrap.servers": "remote-cluster:19093",
"header.remote.prefix": "X-Original-",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
License
This connector is licensed under the Apache License, Version 2.0.