Kafka Source Connector

Type

Source

Class

io.axual.connect.plugins.kafka.source.KafkaSourceConnector

Target System

Messaging & Streaming (Apache Kafka)

Maintainer

Axual

License

Apache License 2.0

Project

Source Code

Download

Maven Central

Description

The Kafka Source Connector is part of the Kafka Synchronisation Connectors project. It loads data from topics on a remote Kafka cluster into local Kafka topics, forwarding the full record including Kafka headers. The same library also contains the Kafka Sink Connector.

Features

  • Load data to local Kafka topics from topics on a remote Kafka cluster

  • Copies the full record, including Kafka headers

  • Mark copied headers with a configurable prefix

  • Add new static headers to records

  • Add original record position as a header for tracing

  • Multiple strategies for determining the local topic name

When to Use

  • You need to pull data from an external Kafka cluster into an Axual-managed cluster.

  • You want to mirror selected remote topics locally with full header fidelity.

  • You need flexible topic name mapping (single, source, prefix, or mapping strategies).

When NOT to Use

  • You need transformation or enrichment of records — use Kafka Streams or ksqlDB instead.

  • The source system is not Kafka — use a dedicated source connector for that platform.

Only ByteArrayConverter is supported with this connector. Other converters are not yet supported.

Installation

The library is available on Maven Central. This library also contains the Kafka Sink Connector.

  1. Search for the artifact on Maven Central.

  2. Select the version you wish to install.

  3. Download the JAR type for your Kafka Connect installation.

    Using the wrong JAR type can result in failing connectors caused by class not found exceptions.

    Available JAR types:

    • jar-with-dependencies — Contains the compiled connector code and all dependencies. The Kafka Client version included in Kafka Connect is used.

    • jar — Contains only the compiled connector code with no dependencies. May cause class not found issues.

For installation steps, see Installing Connector Plugins.

Configuration

Configuration options are grouped as follows:

To configure a connector in Axual Self Service, see Starting Connectors. TIP: To speed up your deployment, use the Terraform Boilerplate or the Management API Boilerplate.

Remote Consumer Configuration

Configures the consumer connecting to the remote cluster. All consumer properties can be set except key and value deserializers. Note that the remote consumer group is not optional for this connector.

Key Type Default Description

source.topics

List

empty list

Comma-separated list of remote topics to read from.

source.topics.poll.ms

Integer

100

Time in milliseconds the consumer poll call blocks before returning.

remote.

prefix

n/a

Prefix for remote consumer settings. For example, bootstrap.servers is provided as remote.bootstrap.servers.

Static Header Selection

Adds one or more static headers to every produced record. Header names and values are Strings; values are sent as UTF-8 encoded byte arrays.

Key Type Default Description

header.static.aliases

List

empty list

Aliases used to look up static header name and value definitions.

header.static.<alias>.name

String

null

The header name for the given alias.

header.static.<alias>.value

String

null

The header value for the given alias. Sent as a UTF-8 encoded byte array.

Metadata Forwarding

Forwards remote metadata to the local system as Kafka headers, allowing the original source to be identified. Remote headers are always forwarded as-is; header names can optionally be prefixed.

Kafka messages can have different timestamp types. If forward.metadata.timestamp.name is set, an extra header with the same name suffixed by -Type is sent, indicating the timestamp type (NO_TIMESTAMP_TYPE, CREATE_TIME, or LOG_APPEND_TIME).
Key Type Default Description

header.remote.prefix

String

""

Prefix added to header names of forwarded remote headers.

forward.metadata.topic.name

String

null

If set, the Kafka header name for the originating topic. Value sent as UTF-8 encoded byte array.

forward.metadata.partition.name

String

null

If set, the Kafka header name for the originating partition. Value forwarded as a 32-bit integer.

forward.metadata.offset.name

String

null

If set, the Kafka header name for the originating message offset. Value sent as a 64-bit Long.

forward.metadata.timestamp.name

String

null

If set, the Kafka header name for the originating message timestamp.

Topic Selection

Determines on which local topic remote messages are produced. Available strategies:

single

All messages are produced to a single topic.

Key Type Example Description

topic.selector

String

single

Use single topic selection.

topic.selector.target

String

topic-out

The single output topic. Mandatory.

prefix

Messages are produced to the same topic name as they came from, with a configurable prefix prepended.

Key Type Example Description

topic.selector

String

prefix

Use prefixed topic selection.

topic.selector.prefix

String

from-

The prefix to prepend to topic names. Mandatory.

source

Messages are produced to the same topic name as they came from. No further configuration needed.

Key Type Example Description

topic.selector

String

source

Use source topic selection.

mapping

Incoming messages are mapped to output topics by a configurable mapping.

In case of pattern subscriptions on the remote system, receiving a message on a topic for which no mapping is configured is flagged as a runtime exception.
Key Type Example Description

topic.selector

String

mapping

Use mapping topic selection.

topic.selector.mapping.<source-topic>

String

out-1

Maps records from <source-topic> to the specified output topic. Repeat for each desired mapping.

Getting Started

This section walks you through configuring the Kafka Source Connector on Axual to pull records from a remote Kafka cluster into a local Axual stream.

Prerequisites

Remote Kafka cluster

You need a remote Kafka cluster accessible from the Axual Connect cluster network.

  • The bootstrap address of the remote cluster must be reachable from the Connect cluster.

  • The remote consumer group ID is mandatory — choose a unique group ID that will not conflict with existing consumers on the remote cluster.

  • The source topic must exist on the remote cluster and contain records to consume.

  • If the remote cluster uses TLS or requires client certificate authentication, have the keystore and truststore files ready to reference from Vault.

Only ByteArrayConverter is supported. Ensure the downstream consumers of the local Axual stream are also configured to use ByteArrayConverter.

Axual stream

The local stream where the connector will produce records must already exist in Axual Self-Service. See Creating streams if you need to create it.

Steps

Step 1 — Create a connector application

  1. In Axual Self-Service, go to Applications and create a new application.

  2. Request Producer access to the local stream the connector will write to.

  3. Wait for the stream access request to be approved.

See Configure and install a connector for detailed steps.

Step 2 — Configure the connector

In Axual Self-Service, open the application and add a new connector. Use the following minimal configuration as a starting point. Replace all placeholder values with those of your environment.

Property Value

connector.class

io.axual.connect.plugins.kafka.source.KafkaSourceConnector

source.topics

<remote-topic-name>

remote.bootstrap.servers

<remote-cluster-host>:<port>

remote.group.id

<unique-consumer-group-id>

remote.auto.offset.reset

earliest

topic.selector

single

topic.selector.target

Axual-resolved stream name, e.g. myorg-myinst-dev-my-stream

key.converter

org.apache.kafka.connect.converters.ByteArrayConverter

value.converter

org.apache.kafka.connect.converters.ByteArrayConverter

header.converter

org.apache.kafka.connect.converters.ByteArrayConverter

For topic selector options, see the Topic Selection section above. For all available properties, see the Configuration section above.

Step 3 — Start the connector

Start the connector application from Axual Self-Service. Once running, records from the remote Kafka topic will be produced to the configured local Axual stream.

Step 4 — Verify

In Axual Self-Service, use the stream-browse feature on the target stream to confirm that records are arriving.

Cleanup

When you are done testing:

  1. Stop the connector application in Axual Self-Service.

  2. Remove stream access for the application if it is no longer needed.

Known limitations

  • Only ByteArrayConverter is supported for key, value, and header converters — schema-aware converters cannot be used.

  • The remote consumer group ID (remote.group.id) is mandatory — there is no automatic group assignment.

  • When using the mapping topic selector, receiving a message from an unmapped remote topic causes a runtime exception.

  • There is no built-in message transformation — use Kafka Streams or ksqlDB if transformation is required after ingestion.

Examples

All examples assume a remote Kafka cluster using a PLAIN listener and access to the given resources unless stated otherwise.

Example 1 — Minimal configuration

Specifies the remote cluster, topics to read from, and the local target topic.

{
  "name": "kafka-source-minimal",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
    "tasks.max": "1",
    "source.topics": "input-topic-remote",
    "topic.selector.target": "output-topic-local",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "remote.group.id": "connect_group",
    "remote.enable.auto.commit": "true",
    "remote.auto.offset.reset": "earliest",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 2 — Minimal configuration using TLS client certificate

Same as above; the remote cluster uses TLS and requires a client certificate.

{
  "name": "kafka-source-minimal-with-tls",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
    "tasks.max": "1",
    "source.topics": "input-topic-remote",
    "topic.selector.target": "output-topic-local",
    "remote.security.protocol" : "SSL",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "remote.ssl.truststore.location" : "/clients/remote-cluster-truststore.jks",
    "remote.ssl.truststore.password" : "someSecret",
    "remote.ssl.keystore.location" : "/clients/remote-cluster-keystore.jks",
    "remote.ssl.keystore.password" : "verySecret",
    "remote.ssl.key.password" : "alsoVerySecret",
    "remote.group.id": "connect_group",
    "remote.enable.auto.commit": "true",
    "remote.auto.offset.reset": "earliest",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 3 — Using mapping topic selector

Maps records from two subscribed remote topics to two different local output topics.

{
  "name": "kafka-source-topic-mapping",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
    "tasks.max": "1",
    "source.topics": "input-topic-1,input-topic-2",
    "topic.selector": "mapping",
    "topic.selector.mapping.input-topic-1": "local-topic-one",
    "topic.selector.mapping.input-topic-2": "local-topic-two",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "remote.group.id": "connect_group",
    "remote.enable.auto.commit": "true",
    "remote.auto.offset.reset": "earliest",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 4 — Using source topic selector

Maps incoming records to the same topic name as the remote source.

{
  "name": "kafka-source-topic-source",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
    "tasks.max": "1",
    "source.topics": "input-topic-1,input-topic-2",
    "topic.selector": "source",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "remote.group.id": "connect_group",
    "remote.enable.auto.commit": "true",
    "remote.auto.offset.reset": "earliest",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 5 — Using prefix topic selector

Maps incoming records to the same topic name as the remote source, with the specified prefix prepended.

{
  "name": "kafka-source-topic-prefix",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
    "tasks.max": "1",
    "source.topics": "input-topic-1,input-topic-2",
    "topic.selector": "prefix",
    "topic.selector.prefix": "incoming-",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "remote.group.id": "connect_group",
    "remote.enable.auto.commit": "true",
    "remote.auto.offset.reset": "earliest",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 6 — Forwarding remote metadata

Adds headers with the originating topic, partition, offset, and timestamp to each produced record. Using X-Kafka-Original-Timestamp also creates the header X-Kafka-Original-Timestamp-Type.

{
  "name": "kafka-source-metadata-forwarding",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
    "tasks.max": "1",
    "source.topics": "input-topic-remote",
    "topic.selector.target": "output-topic-local",
    "forward.metadata.topic.name": "X-Kafka-Original-Topic",
    "forward.metadata.partition.name": "X-Kafka-Original-Partition",
    "forward.metadata.offset.name": "X-Kafka-Original-Offset",
    "forward.metadata.timestamp.name": "X-Kafka-Original-Timestamp",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "remote.group.id": "connect_group",
    "remote.enable.auto.commit": "true",
    "remote.auto.offset.reset": "earliest",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 7 — Using static headers

Two static headers are added to each produced record.

{
  "name": "kafka-source-static-headers",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
    "tasks.max": "1",
    "source.topics": "input-topic-remote",
    "topic.selector.target": "output-topic-local",
    "header.static.aliases": "hdr1,hdr2",
    "header.static.hdr1.name": "X-Header-1",
    "header.static.hdr1.value": "Value for header 1",
    "header.static.hdr2.name": "X-Header-Another",
    "header.static.hdr2.value": "Header added by source connector",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "remote.group.id": "connect_group",
    "remote.enable.auto.commit": "true",
    "remote.auto.offset.reset": "earliest",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 8 — Prefixing remote headers

Header names from the remote record are forwarded with Remote- prepended.

{
  "name": "kafka-source-prefix-headers",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
    "tasks.max": "1",
    "source.topics": "input-topic-remote",
    "topic.selector.target": "output-topic-local",
    "header.remote.prefix": "Remote-",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "remote.group.id": "connect_group",
    "remote.enable.auto.commit": "true",
    "remote.auto.offset.reset": "earliest",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

License

This connector is licensed under the Apache License, Version 2.0.