Kafka Source Connector
Type |
Source |
Class |
|
Target System |
Messaging & Streaming (Apache Kafka) |
Maintainer |
Axual |
License |
Apache License 2.0 |
Project |
|
Download |
Description
The Kafka Source Connector is part of the Kafka Synchronisation Connectors project. It loads data from topics on a remote Kafka cluster into local Kafka topics, forwarding the full record including Kafka headers. The same library also contains the Kafka Sink Connector.
Features
-
Load data to local Kafka topics from topics on a remote Kafka cluster
-
Copies the full record, including Kafka headers
-
Mark copied headers with a configurable prefix
-
Add new static headers to records
-
Add original record position as a header for tracing
-
Multiple strategies for determining the local topic name
When to Use
-
You need to pull data from an external Kafka cluster into an Axual-managed cluster.
-
You want to mirror selected remote topics locally with full header fidelity.
-
You need flexible topic name mapping (single, source, prefix, or mapping strategies).
When NOT to Use
-
You need transformation or enrichment of records — use Kafka Streams or ksqlDB instead.
-
The source system is not Kafka — use a dedicated source connector for that platform.
Only ByteArrayConverter is supported with this connector. Other converters are not yet supported.
|
Installation
The library is available on Maven Central. This library also contains the Kafka Sink Connector.
-
Search for the artifact on Maven Central.
-
Select the version you wish to install.
-
Download the JAR type for your Kafka Connect installation.
Using the wrong JAR type can result in failing connectors caused by class not found exceptions. Available JAR types:
-
jar-with-dependencies — Contains the compiled connector code and all dependencies. The Kafka Client version included in Kafka Connect is used.
-
jar — Contains only the compiled connector code with no dependencies. May cause class not found issues.
-
For installation steps, see Installing Connector Plugins.
Configuration
Configuration options are grouped as follows:
-
Remote Consumer Configuration — Remote consumer settings
-
Static Header Selection — Static headers to add to each record
-
Metadata Forwarding — Forwarding of remote metadata (topic, partition, offset, timestamp) as headers
-
Topic Selection — Logic to determine the local topic name
| To configure a connector in Axual Self Service, see Starting Connectors. TIP: To speed up your deployment, use the Terraform Boilerplate or the Management API Boilerplate. |
Remote Consumer Configuration
Configures the consumer connecting to the remote cluster. All consumer properties can be set except key and value deserializers. Note that the remote consumer group is not optional for this connector.
| Key | Type | Default | Description |
|---|---|---|---|
|
List |
empty list |
Comma-separated list of remote topics to read from. |
|
Integer |
100 |
Time in milliseconds the consumer poll call blocks before returning. |
|
prefix |
n/a |
Prefix for remote consumer settings. For example, |
Static Header Selection
Adds one or more static headers to every produced record. Header names and values are Strings; values are sent as UTF-8 encoded byte arrays.
| Key | Type | Default | Description |
|---|---|---|---|
|
List |
empty list |
Aliases used to look up static header name and value definitions. |
|
String |
null |
The header name for the given alias. |
|
String |
null |
The header value for the given alias. Sent as a UTF-8 encoded byte array. |
Metadata Forwarding
Forwards remote metadata to the local system as Kafka headers, allowing the original source to be identified. Remote headers are always forwarded as-is; header names can optionally be prefixed.
Kafka messages can have different timestamp types. If forward.metadata.timestamp.name is set, an extra header with the same name suffixed by -Type is sent, indicating the timestamp type (NO_TIMESTAMP_TYPE, CREATE_TIME, or LOG_APPEND_TIME).
|
| Key | Type | Default | Description |
|---|---|---|---|
|
String |
|
Prefix added to header names of forwarded remote headers. |
|
String |
null |
If set, the Kafka header name for the originating topic. Value sent as UTF-8 encoded byte array. |
|
String |
null |
If set, the Kafka header name for the originating partition. Value forwarded as a 32-bit integer. |
|
String |
null |
If set, the Kafka header name for the originating message offset. Value sent as a 64-bit Long. |
|
String |
null |
If set, the Kafka header name for the originating message timestamp. |
Topic Selection
Determines on which local topic remote messages are produced. Available strategies:
single
All messages are produced to a single topic.
| Key | Type | Example | Description |
|---|---|---|---|
|
String |
|
Use single topic selection. |
|
String |
|
The single output topic. Mandatory. |
prefix
Messages are produced to the same topic name as they came from, with a configurable prefix prepended.
| Key | Type | Example | Description |
|---|---|---|---|
|
String |
|
Use prefixed topic selection. |
|
String |
|
The prefix to prepend to topic names. Mandatory. |
source
Messages are produced to the same topic name as they came from. No further configuration needed.
| Key | Type | Example | Description |
|---|---|---|---|
|
String |
|
Use source topic selection. |
mapping
Incoming messages are mapped to output topics by a configurable mapping.
| In case of pattern subscriptions on the remote system, receiving a message on a topic for which no mapping is configured is flagged as a runtime exception. |
| Key | Type | Example | Description |
|---|---|---|---|
|
String |
|
Use mapping topic selection. |
|
String |
|
Maps records from |
Getting Started
This section walks you through configuring the Kafka Source Connector on Axual to pull records from a remote Kafka cluster into a local Axual stream.
Prerequisites
Remote Kafka cluster
You need a remote Kafka cluster accessible from the Axual Connect cluster network.
-
The bootstrap address of the remote cluster must be reachable from the Connect cluster.
-
The remote consumer group ID is mandatory — choose a unique group ID that will not conflict with existing consumers on the remote cluster.
-
The source topic must exist on the remote cluster and contain records to consume.
-
If the remote cluster uses TLS or requires client certificate authentication, have the keystore and truststore files ready to reference from Vault.
Only ByteArrayConverter is supported. Ensure the downstream consumers of the local Axual stream are also configured to use ByteArrayConverter.
|
Axual stream
The local stream where the connector will produce records must already exist in Axual Self-Service. See Creating streams if you need to create it.
Steps
Step 1 — Create a connector application
-
In Axual Self-Service, go to Applications and create a new application.
-
Request Producer access to the local stream the connector will write to.
-
Wait for the stream access request to be approved.
See Configure and install a connector for detailed steps.
Step 2 — Configure the connector
In Axual Self-Service, open the application and add a new connector. Use the following minimal configuration as a starting point. Replace all placeholder values with those of your environment.
| Property | Value |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
Axual-resolved stream name, e.g. |
|
|
|
|
|
|
For topic selector options, see the Topic Selection section above. For all available properties, see the Configuration section above.
Step 3 — Start the connector
Start the connector application from Axual Self-Service. Once running, records from the remote Kafka topic will be produced to the configured local Axual stream.
Known limitations
-
Only
ByteArrayConverteris supported for key, value, and header converters — schema-aware converters cannot be used. -
The remote consumer group ID (
remote.group.id) is mandatory — there is no automatic group assignment. -
When using the
mappingtopic selector, receiving a message from an unmapped remote topic causes a runtime exception. -
There is no built-in message transformation — use Kafka Streams or ksqlDB if transformation is required after ingestion.
Examples
All examples assume a remote Kafka cluster using a PLAIN listener and access to the given resources unless stated otherwise.
Example 1 — Minimal configuration
Specifies the remote cluster, topics to read from, and the local target topic.
{
"name": "kafka-source-minimal",
"config": {
"connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
"tasks.max": "1",
"source.topics": "input-topic-remote",
"topic.selector.target": "output-topic-local",
"remote.bootstrap.servers": "remote-cluster:19093",
"remote.group.id": "connect_group",
"remote.enable.auto.commit": "true",
"remote.auto.offset.reset": "earliest",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
Example 2 — Minimal configuration using TLS client certificate
Same as above; the remote cluster uses TLS and requires a client certificate.
{
"name": "kafka-source-minimal-with-tls",
"config": {
"connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
"tasks.max": "1",
"source.topics": "input-topic-remote",
"topic.selector.target": "output-topic-local",
"remote.security.protocol" : "SSL",
"remote.bootstrap.servers": "remote-cluster:19093",
"remote.ssl.truststore.location" : "/clients/remote-cluster-truststore.jks",
"remote.ssl.truststore.password" : "someSecret",
"remote.ssl.keystore.location" : "/clients/remote-cluster-keystore.jks",
"remote.ssl.keystore.password" : "verySecret",
"remote.ssl.key.password" : "alsoVerySecret",
"remote.group.id": "connect_group",
"remote.enable.auto.commit": "true",
"remote.auto.offset.reset": "earliest",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
Example 3 — Using mapping topic selector
Maps records from two subscribed remote topics to two different local output topics.
{
"name": "kafka-source-topic-mapping",
"config": {
"connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
"tasks.max": "1",
"source.topics": "input-topic-1,input-topic-2",
"topic.selector": "mapping",
"topic.selector.mapping.input-topic-1": "local-topic-one",
"topic.selector.mapping.input-topic-2": "local-topic-two",
"remote.bootstrap.servers": "remote-cluster:19093",
"remote.group.id": "connect_group",
"remote.enable.auto.commit": "true",
"remote.auto.offset.reset": "earliest",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
Example 4 — Using source topic selector
Maps incoming records to the same topic name as the remote source.
{
"name": "kafka-source-topic-source",
"config": {
"connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
"tasks.max": "1",
"source.topics": "input-topic-1,input-topic-2",
"topic.selector": "source",
"remote.bootstrap.servers": "remote-cluster:19093",
"remote.group.id": "connect_group",
"remote.enable.auto.commit": "true",
"remote.auto.offset.reset": "earliest",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
Example 5 — Using prefix topic selector
Maps incoming records to the same topic name as the remote source, with the specified prefix prepended.
{
"name": "kafka-source-topic-prefix",
"config": {
"connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
"tasks.max": "1",
"source.topics": "input-topic-1,input-topic-2",
"topic.selector": "prefix",
"topic.selector.prefix": "incoming-",
"remote.bootstrap.servers": "remote-cluster:19093",
"remote.group.id": "connect_group",
"remote.enable.auto.commit": "true",
"remote.auto.offset.reset": "earliest",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
Example 6 — Forwarding remote metadata
Adds headers with the originating topic, partition, offset, and timestamp to each produced record.
Using X-Kafka-Original-Timestamp also creates the header X-Kafka-Original-Timestamp-Type.
{
"name": "kafka-source-metadata-forwarding",
"config": {
"connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
"tasks.max": "1",
"source.topics": "input-topic-remote",
"topic.selector.target": "output-topic-local",
"forward.metadata.topic.name": "X-Kafka-Original-Topic",
"forward.metadata.partition.name": "X-Kafka-Original-Partition",
"forward.metadata.offset.name": "X-Kafka-Original-Offset",
"forward.metadata.timestamp.name": "X-Kafka-Original-Timestamp",
"remote.bootstrap.servers": "remote-cluster:19093",
"remote.group.id": "connect_group",
"remote.enable.auto.commit": "true",
"remote.auto.offset.reset": "earliest",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
Example 7 — Using static headers
Two static headers are added to each produced record.
{
"name": "kafka-source-static-headers",
"config": {
"connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
"tasks.max": "1",
"source.topics": "input-topic-remote",
"topic.selector.target": "output-topic-local",
"header.static.aliases": "hdr1,hdr2",
"header.static.hdr1.name": "X-Header-1",
"header.static.hdr1.value": "Value for header 1",
"header.static.hdr2.name": "X-Header-Another",
"header.static.hdr2.value": "Header added by source connector",
"remote.bootstrap.servers": "remote-cluster:19093",
"remote.group.id": "connect_group",
"remote.enable.auto.commit": "true",
"remote.auto.offset.reset": "earliest",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
Example 8 — Prefixing remote headers
Header names from the remote record are forwarded with Remote- prepended.
{
"name": "kafka-source-prefix-headers",
"config": {
"connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
"tasks.max": "1",
"source.topics": "input-topic-remote",
"topic.selector.target": "output-topic-local",
"header.remote.prefix": "Remote-",
"remote.bootstrap.servers": "remote-cluster:19093",
"remote.group.id": "connect_group",
"remote.enable.auto.commit": "true",
"remote.auto.offset.reset": "earliest",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
License
This connector is licensed under the Apache License, Version 2.0.