Configuring the Kafka Sink Connector

Overview

The Kafka Sink Connector configuration options can be divided into the following groups.

  • Worker and remote producer settings
    The settings for the internal worker handling the records and the producer to send it to the remote Kafka cluster

  • Static Header Selection
    The settings to define static headers to be added to each record produced to the remote topic

  • Metadata Forwarding
    Settings to control the forwarding of record metadata like the topic, partition, offset and timestamp of the record.

  • Topic Selection
    Settings to control the logic to determine the remote topic name where a record must be produced

  • Partition Selection
    Settings to control on which partition a record should be produced

To find out how to configure a connector in Axual Self Service, see Starting Connectors

Worker and remote producer settings

The worker settings control the internal connector functionality, like queue sizes and wait times for handling Connect internal requests.
The remote settings are used to set producer connecting to the remote cluster.

Key Type Default Description

remote.

prefix

not applicable

This is the prefix for the default producer settings.
For example, the producer configuration bootstrap.servers is provided as remote.bootstrap.servers

queue.size

Integer

10

Connect provides the connector with a list of records to send which are put on a queue.
The sender is an asynchronous process retrieving these lists from a queue.
This setting determines the size of the queue

queue.put.wait.ms

Long

500

This setting determines how long the connector waits for the queue to accepts the list with records

Static Header Selection

These settings are used to provide each record with a static header.
The name and value will be set as String, and the value will be sent as a UTF-8 encoded byte array.

Key Type Default Description

header.static.aliases

List

empty list

The aliases used to determine the headers used for name and value definition

header.static.<alias>.name

String

null

The name of the header for the alias

header.static.<alias>.value

String

null

The value of header for the alias. This string value will be sent as an UTF-8 String

Metadata Forwarding

These settings make it possible to forward certain local record metadata fields as headers of the remote record metadata. This makes it possible to identify the original source of a record that was produced by this connector.

Key Type Default Description

header.remote.prefix

String

""

Prefix to add to header names of forwarded remote headers.

forward.metadata.topic.name

String

X-Kafka-Original-Topic

The name to use for the header containing the record topic name

forward.metadata.partition.name

String

X-Kafka-Original-Partition

The name to use for the header containing the record partition

forward.metadata.offset.name

String

X-Kafka-Original-Offset

The name to use for the header containing the record offset

forward.metadata.timestamp.name

String

X-Kafka-Original-Timestamp

The name to use for the header containing the record timestamp

Topic Selection

The topic selector determines to which topic a record will be produced.

The current implementations are:

  • source

    Use the topic name used for the original record, this is the default setting.
    This selector requires no selector configuration.

    Key Type Example Description

    topic.selector

    String

    source

    Determines which topic selector to use, here source (the default); this selector needs no further configuration.

  • prefix

    Add a prefix to the topic name used for the original record.
    This selector uses the selector configuration as the prefix for the remote topic name.

    Key Type Example Description

    topic.selector

    String

    prefix

    Determines which topic selector to use, here prefix

    topic.selector.prefix

    String

    from-remote-

    The prefix to prepend to forwarded topics. This is a mandatory configuration.

  • fixed

    Use a fixed topic name and ignores the original topic name.
    This selector uses the selector configuration as the remote topic name.

    Key Type Example Description

    topic.selector

    String

    fixed

    Determines which topic selector to use; here fixed.

    topic.selector.target

    String

    loaded-from-remote

    The fixed target topic to use. This is a mandatory configuration.

  • mapping

    Use a mapping from input topic name to output. Multiple mappings can be configured, as follows:

    Key Type Example Description

    topic.selector

    String

    mapping

    Determines which topic selector to use, here mapping.

    topic.selector.mapping.<topic-in>

    String

    local-topic-for-remote-topic-in

    map messages coming from topic-in to the configured topic. This configuration can be repeated multiple times for multiple input topics; if no mappings are provided, the selector will log an error and exit.

Partition Selection

The partition selector determines to which topic a record will be produced.

The current implementations are:

  • source

    Use the partition number used for the original record, this is the default setting.
    If the remote topic doesn’t have this partition the produce call and connector task will fail.

  • partitioner

    Use the partitioner defined by the remote producer settings.

Key Type Default Description

partition.selector

String

source

Determines which partition selector to use, accepts source, partitioner

Known limitations

Only use the ByteArrayConverters with this connector, other converters are not (yet) supported.