Kafka Sink Connector — 1.1.0 (archived)

This is archived documentation for version 1.1.0.
For the current version, see Kafka Sink Connector.

Type

Sink

Class

io.axual.connect.plugins.kafka.sink.KafkaSinkConnector

Target System

Messaging & Streaming (Apache Kafka)

Maintainer

Axual

License

Apache License 2.0

Project

Source Code

Download

Maven Central

This page documents version 1.1.0. Newer versions should be compatible unless there are breaking changes, but field names or default values may differ. If you notice discrepancies, please contact Axual Support.

Description

The Kafka Sink Connector is part of the Kafka Synchronisation Connectors project. It loads data from local Kafka topics to topics on another Kafka cluster, forwarding the full record including Kafka headers. The same library also contains the Kafka Source Connector.

Features

  • Load data from local Kafka topics to topics on a remote Kafka cluster

  • Copies the full record, including Kafka headers

  • Mark copied headers with a configurable prefix

  • Add new static headers to records

  • Add original record position as a header for tracing

  • Multiple strategies for determining the remote topic name

  • Five built-in SMT (Single Message Transform) plugins for topic routing and data (de)serialization

When to Use

  • You need to replicate or forward data from an Axual-managed Kafka cluster to another Kafka cluster.

  • You want to mirror select topics to a remote system with full header fidelity.

  • You need flexible topic name mapping (source, fixed, prefix, or mapping strategies).

When NOT to Use

  • You need transformation or enrichment of records before forwarding — use Kafka Streams or ksqlDB instead.

  • The target system is not Kafka — use a dedicated sink connector for the target platform.

Only ByteArrayConverter is supported with this connector. Other converters are not yet supported.

Installation

The library is available on Maven Central. This library also contains the Kafka Source Connector.

  1. Search for the artifact on Maven Central.

  2. Select the version you wish to install.

  3. Download the JAR type for your Kafka Connect installation.

    Using the wrong JAR type can result in failing connectors caused by class not found exceptions.

    Available JAR types:

    • jar-with-dependencies — Contains the compiled connector code and all dependencies. The Kafka Client version included in Kafka Connect is used.

    • jar — Contains only the compiled connector code with no dependencies. May cause class not found issues.

For installation steps, see Installing Connector Plugins.

Configuration

Configuration options are grouped as follows:

To configure a connector in Axual Self Service, see Starting Connectors. TIP: To speed up your deployment, use the Terraform Boilerplate or the Management API Boilerplate.

Worker and Remote Producer Settings

The worker settings control internal connector functionality such as queue sizes and wait times. Remote settings configure the producer connecting to the remote cluster.

Key Type Default Description

remote.

prefix

not applicable

Prefix for remote producer settings. For example, bootstrap.servers is provided as remote.bootstrap.servers.

queue.size

Integer

10

Size of the internal queue used to buffer lists of records for the asynchronous sender.

queue.put.wait.ms

Long

500

How long the connector waits for the queue to accept a list of records (milliseconds).

Static Header Selection

Adds one or more static headers to every produced record. Header names and values are Strings; values are sent as UTF-8 encoded byte arrays.

Key Type Default Description

header.static.aliases

List

empty list

Aliases used to look up header name and value definitions.

header.static.<alias>.name

String

null

The header name for the given alias.

header.static.<alias>.value

String

null

The header value for the given alias. Sent as a UTF-8 encoded byte array.

Metadata Forwarding

Forwards local record metadata fields as headers on the remote record, allowing the original source to be identified.

Key Type Default Description

header.remote.prefix

String

""

Prefix added to header names of forwarded headers.

forward.metadata.topic.name

String

X-Kafka-Original-Topic

Header name for the original topic name.

forward.metadata.partition.name

String

X-Kafka-Original-Partition

Header name for the original partition number.

forward.metadata.offset.name

String

X-Kafka-Original-Offset

Header name for the original record offset.

forward.metadata.timestamp.name

String

X-Kafka-Original-Timestamp

Header name for the original record timestamp.

Topic Selection

Determines which remote topic a record is produced to. Available strategies:

topic.selector is deprecated as of 1.1.0. Use topic.selector.class with the full class name instead. Both are supported in 1.1.0.

source — Use the same topic name as the original record (default, no further configuration needed).

Key Type Example Description

topic.selector.class

String

io.axual.connect.plugins.kafka.selectors.SourceTopicSelector

(Preferred) Use the source topic name. No additional configuration needed.

topic.selector (deprecated)

String

source

Use the source topic name. No additional configuration needed.

prefix — Prepend a prefix to the source topic name.

Key Type Example Description

topic.selector.class

String

io.axual.connect.plugins.kafka.selectors.PrefixTopicSelector

(Preferred) Use the prefix topic selector.

topic.selector (deprecated)

String

prefix

Use the prefix topic selector.

topic.selector.prefix

String

from-remote-

Prefix to prepend to forwarded topic names. Mandatory.

fixed — Use a fixed remote topic name regardless of the source topic.

Key Type Example Description

topic.selector.class

String

io.axual.connect.plugins.kafka.selectors.FixedTopicSelector

(Preferred) Use the fixed topic selector.

topic.selector (deprecated)

String

fixed

Use the fixed topic selector.

topic.selector.target

String

loaded-from-remote

The fixed target topic name. Mandatory.

mapping — Map source topic names to specific remote topic names.

Key Type Example Description

topic.selector.class

String

io.axual.connect.plugins.kafka.selectors.MappingTopicSelector

(Preferred) Use the mapping topic selector.

topic.selector (deprecated)

String

mapping

Use the mapping topic selector.

topic.selector.mapping.<topic-in>

String

local-topic-for-remote-topic-in

Maps records from <topic-in> to the configured topic. Repeat for each input topic.

Partition Selection

Determines which partition a record is produced to on the remote topic.

Key Type Default Description

partition.selector

String

source

Partition selector strategy. Accepts source (use same partition as original — remote topic must have the same number of partitions) or partitioner (use the partitioner defined by the remote producer settings).

Transformations (SMTs)

The following Single Message Transforms (SMTs) are bundled with the connector since version 1.1.0. They can be used with any Kafka Connect connector via the standard transforms.* configuration.

TopicNameLookupRouter

Routes records to a target topic using an explicit mapping table.

Class: io.axual.connect.plugins.kafka.transforms.TopicNameLookupRouter

Key Type Default Description

mapping.<input-topic>

String

none

Maps records from <input-topic> to the specified target topic. Repeat for each mapping.

undefined.topic.action

String

FAIL

Action when no mapping is found for the source topic. Options: FAIL, DROP, IGNORE.

TopicNamePrefixRouter

Prepends a fixed prefix to the source topic name to determine the target topic.

Class: io.axual.connect.plugins.kafka.transforms.TopicNamePrefixRouter

Key Type Default Description

prefix

String

none

The prefix to prepend to the topic name. Mandatory.

TopicNameStaticRouter

Routes all records to a single fixed topic name.

Class: io.axual.connect.plugins.kafka.transforms.TopicNameStaticRouter

Key Type Default Description

topic

String

none

The fixed target topic name. Mandatory.

DeserializeToObject

Deserializes the key or value of a record from a binary format (Avro, Protobuf, JSON Schema) into a structured object using a configurable Kafka deserializer. Requires the relevant serializer JARs to be placed in the connector plugin directory.

Class: io.axual.connect.plugins.kafka.transforms.DeserializeToObject

SerializeFromObject

Serializes the key or value of a record from a structured object into a binary format (Avro, Protobuf, JSON Schema) using a configurable Kafka serializer. Requires the relevant serializer JARs to be placed in the connector plugin directory.

Class: io.axual.connect.plugins.kafka.transforms.SerializeFromObject

Getting Started

This section walks you through configuring the Kafka Sink Connector on Axual to forward records from a local Kafka stream to a topic on a remote Kafka cluster.

Prerequisites

Remote Kafka cluster

You need a remote Kafka cluster accessible from the Axual Connect cluster network.

  • The bootstrap address of the remote cluster must be reachable from the Connect cluster.

  • If using the default source topic selector, the target topic must exist on the remote cluster with the same number of partitions as the source topic.

  • If the remote cluster uses TLS or requires client certificate authentication, have the keystore and truststore files ready to reference from Vault.

Only ByteArrayConverter is supported. Ensure the upstream producer is also writing with ByteArrayConverter.

Axual stream

The stream the connector will consume must already exist in Axual Self-Service. See Creating streams if you need to create it.

Steps

Step 1 — Create a connector application

  1. In Axual Self-Service, go to Applications and create a new application.

  2. Request Consumer access to the stream you want to forward to the remote cluster.

  3. Wait for the stream access request to be approved.

See Configure and install a connector for detailed steps.

Step 2 — Configure the connector

In Axual Self-Service, open the application and add a new connector. Use the following minimal configuration as a starting point. Replace all placeholder values with those of your environment.

Property Value

connector.class

io.axual.connect.plugins.kafka.sink.KafkaSinkConnector

topics

Axual-resolved stream name, e.g. myorg-myinst-dev-my-stream

remote.bootstrap.servers

<remote-cluster-host>:<port>

key.converter

org.apache.kafka.connect.converters.ByteArrayConverter

value.converter

org.apache.kafka.connect.converters.ByteArrayConverter

header.converter

org.apache.kafka.connect.converters.ByteArrayConverter

For topic and partition selector options, see the Topic Selection and Partition Selection sections above. For all available properties, see the connector releases and the Configuration section above.

Step 3 — Start the connector

Start the connector application from Axual Self-Service. Once running, records from the local stream will be forwarded to the configured topic on the remote Kafka cluster.

Step 4 — Verify

Using a Kafka consumer on the remote cluster, consume from the target topic and confirm that records arrive:

kafka-console-consumer.sh \
  --bootstrap-server <remote-cluster-host>:<port> \
  --topic <target-topic> \
  --from-beginning \
  --max-messages 5

Cleanup

When you are done testing:

  1. Stop the connector application in Axual Self-Service.

  2. Remove stream access for the application if it is no longer needed.

Known limitations

  • Only ByteArrayConverter is supported for key, value, and header converters — schema-aware converters cannot be used.

  • When using the source partition selector, the remote topic must have the same number of partitions as the source topic.

  • There is no built-in message transformation — use Kafka Streams or ksqlDB if transformation is required before forwarding.

Examples

All examples assume a remote Kafka cluster using a PLAIN listener and access to the given resources unless stated otherwise.

Example 1 — Minimal configuration

Uses source topic and partition selectors (default). Requires the same topic to exist on the remote cluster with the same number of partitions.

{
  "name": "kafka-sink-minimal",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 2 — Minimal configuration using TLS client certificate

Same as above; the remote cluster uses TLS and requires a client certificate.

{
  "name": "kafka-sink-minimal-with-tls",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "remote.security.protocol" : "SSL",
    "remote.ssl.truststore.location" : "/clients/remote-cluster-truststore.jks",
    "remote.ssl.truststore.password" : "someSecret",
    "remote.ssl.keystore.location" : "/clients/remote-cluster-keystore.jks",
    "remote.ssl.keystore.password" : "verySecret",
    "remote.ssl.key.password" : "alsoVerySecret",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 3 — Using static headers

Two static headers are added to each produced record.

{
  "name": "kafka-sink-static-header",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "header.static.aliases": "why,now",
    "header.static.why.name": "Example-Why",
    "header.static.why.value": "We needed an example",
    "header.static.how.name": "Example-How",
    "header.static.how.value": "Using the Kafka Sink Connector",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 4 — Using metadata forwarding

Each produced record contains metadata from the original record as custom-named headers.

{
  "name": "kafka-sink-metadata-forward",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "forward.metadata.topic.name": "Example-Topic",
    "forward.metadata.partition.name": "Example-Partition",
    "forward.metadata.offset.name": "Example-Offset",
    "forward.metadata.timestamp.name": "Example-Timestamp",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 5 — Using source topic selector

Each record is written to the same topic as the original. Functionally identical to the minimal configuration.

{
  "name": "kafka-sink-topic-source",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "topic.selector.class": "io.axual.connect.plugins.kafka.selectors.SourceTopicSelector",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 6 — Using fixed topic selector

All records are written to forwarded-to-us regardless of their source topic.

{
  "name": "kafka-sink-topic-fixed",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "topic.selector.class": "io.axual.connect.plugins.kafka.selectors.FixedTopicSelector",
    "topic.selector.target": "forwarded-to-us",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 7 — Using prefix topic selector

Records from test-topic are written to forwarded-test-topic on the remote cluster.

{
  "name": "kafka-sink-topic-prefix",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "topic.selector.class": "io.axual.connect.plugins.kafka.selectors.PrefixTopicSelector",
    "topic.selector.prefix": "forwarded-",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 8 — Using mapping topic selector

test-topic is mapped to forwarded-from-test-topic; topic2 is mapped to another-out.

{
  "name": "kafka-sink-topic-mapping",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic,topic2",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "topic.selector.class": "io.axual.connect.plugins.kafka.selectors.MappingTopicSelector",
    "topic.selector.mapping.test-topic": "forwarded-from-test-topic",
    "topic.selector.mapping.topic2": "another-out",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 9 — Using source partition selector

Each record is written to the same partition as the original. Functionally identical to the minimal configuration.

{
  "name": "kafka-sink-partition-source",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "partition.selector": "source",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 10 — Using partitioner partition selector

Target partition is determined by the Kafka DefaultPartitioner.

{
  "name": "kafka-sink-partition-partitioner",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "partition.selector": "partitioner",
    "remote.partitioner.class": "org.apache.kafka.clients.producer.internals.DefaultPartitioner",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 11 — Prefixing headers

Forwarded header names are prefixed with X-Original- on the produced record.

{
  "name": "kafka-sink-minimal",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.sink.KafkaSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "header.remote.prefix": "X-Original-",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

License

This connector is licensed under the Apache License, Version 2.0.