Kafka Sink Connector

Type

Sink

Class

io.axual.connect.plugins.kafka.sink.KafkaSinkConnector

Target System

Messaging & Streaming (Apache Kafka)

Maintainer

Axual

License

Apache License 2.0

Project

Source Code

Download

Maven Central

Description

The Kafka Sink Connector is part of the Kafka Synchronisation Connectors project. It loads data from local Kafka topics to topics on another Kafka cluster, forwarding the full record including Kafka headers. The same library also contains the Kafka Source Connector.

Features

  • Load data from local Kafka topics to topics on a remote Kafka cluster

  • Copies the full record, including Kafka headers

  • Mark copied headers with a configurable prefix

  • Add new static headers to records

  • Add original record position as a header for tracing

  • Multiple strategies for determining the remote topic name

When to Use

  • You need to replicate or forward data from an Axual-managed Kafka cluster to another Kafka cluster.

  • You want to mirror select topics to a remote system with full header fidelity.

  • You need flexible topic name mapping (source, fixed, prefix, or mapping strategies).

When NOT to Use

  • You need transformation or enrichment of records before forwarding — use Kafka Streams or ksqlDB instead.

  • The target system is not Kafka — use a dedicated sink connector for the target platform.

Only ByteArrayConverter is supported with this connector. Other converters are not yet supported.

Installation

The library is available on Maven Central. This library also contains the Kafka Source Connector.

  1. Search for the artifact on Maven Central.

  2. Select the version you wish to install.

  3. Download the JAR type for your Kafka Connect installation.

    Using the wrong JAR type can result in failing connectors caused by class not found exceptions.

    Available JAR types:

    • jar-with-dependencies — Contains the compiled connector code and all dependencies. The Kafka Client version included in Kafka Connect is used.

    • jar — Contains only the compiled connector code with no dependencies. May cause class not found issues.

For installation steps, see Installing Connector Plugins.

Configuration

Configuration options are grouped as follows:

To configure a connector in Axual Self Service, see Starting Connectors. TIP: To speed up your deployment, use the Terraform Boilerplate or the Management API Boilerplate.

Worker and Remote Producer Settings

The worker settings control internal connector functionality such as queue sizes and wait times. Remote settings configure the producer connecting to the remote cluster.

Key Type Default Description

remote.

prefix

not applicable

Prefix for remote producer settings. For example, bootstrap.servers is provided as remote.bootstrap.servers.

queue.size

Integer

10

Size of the internal queue used to buffer lists of records for the asynchronous sender.

queue.put.wait.ms

Long

500

How long the connector waits for the queue to accept a list of records (milliseconds).

Static Header Selection

Adds one or more static headers to every produced record. Header names and values are Strings; values are sent as UTF-8 encoded byte arrays.

Key Type Default Description

header.static.aliases

List

empty list

Aliases used to look up header name and value definitions.

header.static.<alias>.name

String

null

The header name for the given alias.

header.static.<alias>.value

String

null

The header value for the given alias. Sent as a UTF-8 encoded byte array.

Metadata Forwarding

Forwards local record metadata fields as headers on the remote record, allowing the original source to be identified.

Key Type Default Description

header.remote.prefix

String

""

Prefix added to header names of forwarded headers.

forward.metadata.topic.name

String

X-Kafka-Original-Topic

Header name for the original topic name.

forward.metadata.partition.name

String

X-Kafka-Original-Partition

Header name for the original partition number.

forward.metadata.offset.name

String

X-Kafka-Original-Offset

Header name for the original record offset.

forward.metadata.timestamp.name

String

X-Kafka-Original-Timestamp

Header name for the original record timestamp.

Topic Selection

Determines which remote topic a record is produced to. Available strategies:

source — Use the same topic name as the original record (default, no further configuration needed).

Key Type Example Description

topic.selector

String

source

Use the source topic name. No additional configuration needed.

prefix — Prepend a prefix to the source topic name.

Key Type Example Description

topic.selector

String

prefix

Use the prefix topic selector.

topic.selector.prefix

String

from-remote-

Prefix to prepend to forwarded topic names. Mandatory.

fixed — Use a fixed remote topic name regardless of the source topic.

Key Type Example Description

topic.selector

String

fixed

Use the fixed topic selector.

topic.selector.target

String

loaded-from-remote

The fixed target topic name. Mandatory.

mapping — Map source topic names to specific remote topic names.

Key Type Example Description

topic.selector

String

mapping

Use the mapping topic selector.

topic.selector.mapping.<topic-in>

String

local-topic-for-remote-topic-in

Maps records from <topic-in> to the configured topic. Repeat for each input topic.

Partition Selection

Determines which partition a record is produced to on the remote topic.

Key Type Default Description

partition.selector

String

source

Partition selector strategy. Accepts source (use same partition as original — remote topic must have the same number of partitions) or partitioner (use the partitioner defined by the remote producer settings).

Getting Started

This section walks you through configuring the Kafka Sink Connector on Axual to forward records from a local Kafka stream to a topic on a remote Kafka cluster.

Prerequisites

Remote Kafka cluster

You need a remote Kafka cluster accessible from the Axual Connect cluster network.

  • The bootstrap address of the remote cluster must be reachable from the Connect cluster.

  • If using the default source topic selector, the target topic must exist on the remote cluster with the same number of partitions as the source topic.

  • If the remote cluster uses TLS or requires client certificate authentication, have the keystore and truststore files ready to reference from Vault.

Only ByteArrayConverter is supported. Ensure the upstream producer is also writing with ByteArrayConverter.

Axual stream

The stream the connector will consume must already exist in Axual Self-Service. See Creating streams if you need to create it.

Steps

Step 1 — Create a connector application

  1. In Axual Self-Service, go to Applications and create a new application.

  2. Request Consumer access to the stream you want to forward to the remote cluster.

  3. Wait for the stream access request to be approved.

See Configure and install a connector for detailed steps.

Step 2 — Configure the connector

In Axual Self-Service, open the application and add a new connector. Use the following minimal configuration as a starting point. Replace all placeholder values with those of your environment.

Property Value

connector.class

io.axual.connect.plugins.kafka.sink.KafkaSinkConnector

topics

Axual-resolved stream name, e.g. myorg-myinst-dev-my-stream

remote.bootstrap.servers

<remote-cluster-host>:<port>

key.converter

org.apache.kafka.connect.converters.ByteArrayConverter

value.converter

org.apache.kafka.connect.converters.ByteArrayConverter

header.converter

org.apache.kafka.connect.converters.ByteArrayConverter

For topic and partition selector options, see the Topic Selection and Partition Selection sections above. For all available properties, see the connector releases and the Configuration section above.

Step 3 — Start the connector

Start the connector application from Axual Self-Service. Once running, records from the local stream will be forwarded to the configured topic on the remote Kafka cluster.

Step 4 — Verify

Using a Kafka consumer on the remote cluster, consume from the target topic and confirm that records arrive:

kafka-console-consumer.sh \
  --bootstrap-server <remote-cluster-host>:<port> \
  --topic <target-topic> \
  --from-beginning \
  --max-messages 5

Cleanup

When you are done testing:

  1. Stop the connector application in Axual Self-Service.

  2. Remove stream access for the application if it is no longer needed.

Known limitations

  • Only ByteArrayConverter is supported for key, value, and header converters — schema-aware converters cannot be used.

  • When using the source partition selector, the remote topic must have the same number of partitions as the source topic.

  • There is no built-in message transformation — use Kafka Streams or ksqlDB if transformation is required before forwarding.

Examples

All examples assume a remote Kafka cluster using a PLAIN listener and access to the given resources unless stated otherwise.

Example 1 — Minimal configuration

Uses source topic and partition selectors (default). Requires the same topic to exist on the remote cluster with the same number of partitions.

{
  "name": "kafka-sink-minimal",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 2 — Minimal configuration using TLS client certificate

Same as above; the remote cluster uses TLS and requires a client certificate.

{
  "name": "kafka-sink-minimal-with-tls",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "remote.security.protocol" : "SSL",
    "remote.ssl.truststore.location" : "/clients/remote-cluster-truststore.jks",
    "remote.ssl.truststore.password" : "someSecret",
    "remote.ssl.keystore.location" : "/clients/remote-cluster-keystore.jks",
    "remote.ssl.keystore.password" : "verySecret",
    "remote.ssl.key.password" : "alsoVerySecret",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 3 — Using static headers

Two static headers are added to each produced record.

{
  "name": "kafka-sink-static-header",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "header.static.aliases": "why,now",
    "header.static.why.name": "Example-Why",
    "header.static.why.value": "We needed an example",
    "header.static.how.name": "Example-How",
    "header.static.how.value": "Using the Kafka Sink Connector",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 4 — Using metadata forwarding

Each produced record contains metadata from the original record as custom-named headers.

{
  "name": "kafka-sink-metadata-forward",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "forward.metadata.topic.name": "Example-Topic",
    "forward.metadata.partition.name": "Example-Partition",
    "forward.metadata.offset.name": "Example-Offset",
    "forward.metadata.timestamp.name": "Example-Timestamp",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 5 — Using source topic selector

Each record is written to the same topic as the original. Functionally identical to the minimal configuration.

{
  "name": "kafka-sink-topic-source",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "topic.selector": "source",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 6 — Using fixed topic selector

All records are written to forwarded-to-us regardless of their source topic.

{
  "name": "kafka-sink-topic-fixed",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "topic.selector": "fixed",
    "topic.selector.target": "forwarded-to-us",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 7 — Using prefix topic selector

Records from test-topic are written to forwarded-test-topic on the remote cluster.

{
  "name": "kafka-sink-topic-prefix",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "topic.selector": "prefix",
    "topic.selector.prefix": "forwarded-",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 8 — Using mapping topic selector

test-topic is mapped to forwarded-from-test-topic; topic2 is mapped to another-out.

{
  "name": "kafka-sink-topic-mapping",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic,topic2",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "topic.selector": "mapping",
    "topic.selector.mapping.test-topic": "forwarded-from-test-topic",
    "topic.selector.mapping.topic2": "another-out",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 9 — Using source partition selector

Each record is written to the same partition as the original. Functionally identical to the minimal configuration.

{
  "name": "kafka-sink-partition-source",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "partition.selector": "source",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 10 — Using partitioner partition selector

Target partition is determined by the Kafka DefaultPartitioner.

{
  "name": "kafka-sink-partition-partitioner",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "partition.selector": "partitioner",
    "remote.partitioner.class": "org.apache.kafka.clients.producer.internals.DefaultPartitioner",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 11 — Prefixing headers

Forwarded header names are prefixed with X-Original- on the produced record.

{
  "name": "kafka-sink-minimal",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "header.remote.prefix": "X-Original-",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

License

This connector is licensed under the Apache License, Version 2.0.