Configuring the Kafka Source Connector

Overview

The Kafka Source Connector configuration options can be divided into the following groups.

  • Remote consumer configuration
    The settings for the internal worker handling the records and the producer to send it to the remote Kafka cluster

  • Static Header Selection
    The settings to define static headers to be added to each record produced to the remote topic

  • Metadata Forwarding
    Remote metadata such as source topic, partition, offset and timestamp can optionally be forwarded as Kafka headers. The names of these headers are configurable; if no header name is configured for a field, that information is not sent.

  • Topic Selection
    The local topic where Connect will produce to can be controlled in different ways. Current implementations are:

in case of pattern subscriptions on the remote system, if a message is received on a topic for which no mapping is configured, this is flagged as a runtime exception.
To find out how to configure a connector in Axual Self Service, see starting-connectors.adoc

Remote consumer configuration

The remote settings are used to set up the consumer connecting to the remote cluster. All consumer properties can be set with the exception of the key and value deserializers. Note that the remote user group is not optional for this connector.

Key Type Default Description

source.topics

List

empty list

The topics to read from as a comma separated list.

source.topics.poll.ms

Integer

100

The amount of time the Consumer poll call blocks before returning.

remote.

prefix

n/a

The prefix for remote consumer settings. For example, the configuration bootstrap.servers is provided as remote.bootstrap.servers.

Static Header Selection

These settings are used to provide each record with one or more static headers. The name and value are set as Strings, and the value will be sent as a UTF-8 encoded byte array.

Key Type Default Description

header.static.aliases

List

empty list

The aliases used to determine the static header names and values

header.static.<alias>.name

String

null

The name of the header for this alias

header.static.<alias>.value

String

null

The value of the header for this alias. This value will be sent as an UTF-8 encoded byte array.

Metadata Forwarding

These settings make it possible to forward remote metadata to the local system as Kafka headers.This makes it possible to identify the original source of a record produced on the local system.
Remote headers are always forwarded as-is; header names can optionally be prefixed with a configurable string.

Kafka messages can have different types of timestamps. If timestamps are to be forwarded (forward.metadata.timestamp.name is set) an extra header is sent with the configured name, suffixed by -Type. This will indicate the type of the timestamp as specified in the originating record; the value is one of NO_TIMESTAMP_TYPE, CREATE_TIME or LOG_APPEND_TIME.
Key Type Default Description

header.remote.prefix

String

""

Prefix to add to header names of forwarded remote headers.

forward.metadata.topic.name

String

null

If configured, name of the Kafka header in which the originating topic is sent. The value is sent as an UTF-8 encoded byte array.

forward.metadata.partition.name

String

null

If configured, name of the Kafka header in which the originating partition is sent. The value is forwarded as a 32-bit integer.

forward.metadata.offset.name

String

null

If configured, name of the Kafka header in which the offset of the originating message is sent. The value is sent as a 64-bit Long.

forward.metadata.timestamp.name

String

null

If configured, name of the Kafka header in which the timestamp of the originating message is sent.

Topic Selection

The topic selection settings determine on which local topic remote messages will be produced.
Currently there are the following implementations:

single

All messages are produced to a single topic.

Key Type Example Description

topic.selector

String

single

Select single topic selection type

topic.selector.target

String

topic-out

The single output topic to select

prefix

All messages are produced on the same topic as they came from, with the name prefixed by a configurable prefix.

Key Type Example Description

topic.selector

String

prefix

Select prefixed topic selection type

topic.selector.prefix

String

from-

The prefix to prepend to topic names

source

All messages are produced on the same topic as they came from. This selector has nu further configuration.

Key Type Example Description

topic.selector

String

source

Select source topic selection type

mapping

Incoming messages are mapped by a configurable mapping.

Key Type Example Description

topic.selector

String

mapping

Select prefixed topic selection type

topic.selector.mapping.<source-topic>

String

out-1

Maps records from the source topic with name <source-topic> to the specified output out-1. This setting is repeated for each desired topic mapping.

Known limitations

Only use the ByteArrayConverters with this connector, other converters are not (yet) supported.