Example configurations for the Kafka Source Connector

Example 1 - Minimal configuration

Minimal configuration, specifying the remote cluster and topics to read from, the target topic name to write to.

{
  "name": "kafka-source-minimal",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
    "tasks.max": "1",
    "source.topics": "input-topic-remote",
    "topic.selector.target": "output-topic-local",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "remote.group.id": "connect_group",
    "remote.enable.auto.commit": "true",
    "remote.auto.offset.reset": "earliest",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 2 - Minimal configuration using TLS Client certificate

Minimal configuration, specifying the remote cluster and topics to read from, the target topic name to write to.
The remote cluster uses TLS and requires a client certificate to be provided.

{
  "name": "kafka-source-minimal-with-tls",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
    "tasks.max": "1",
    "source.topics": "input-topic-remote",
    "topic.selector.target": "output-topic-local",
    "remote.security.protocol" : "SSL",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "remote.ssl.truststore.location" : "/clients/remote-cluster-truststore.jks",
    "remote.ssl.truststore.password" : "someSecret",
    "remote.ssl.keystore.location" : "/clients/remote-cluster-keystore.jks",
    "remote.ssl.keystore.password" : "verySecret",
    "remote.ssl.key.password" : "alsoVerySecret",
    "remote.group.id": "connect_group",
    "remote.enable.auto.commit": "true",
    "remote.auto.offset.reset": "earliest",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 3 - Using mapping topic selector

Maps incoming records from two subscribed topics to two different output topics.

{
  "name": "kafka-source-topic-mapping",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
    "tasks.max": "1",
    "source.topics": "input-topic-1,input-topic-2",
    "topic.selector": "mapping",
    "topic.selector.mapping.input-topic-1": "local-topic-one",
    "topic.selector.mapping.input-topic-2": "local-topic-two",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "remote.group.id": "connect_group",
    "remote.enable.auto.commit": "true",
    "remote.auto.offset.reset": "earliest",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 4 - Using source topic selector

Maps incoming records to the same topic name as were they came from.

{
  "name": "kafka-source-topic-source",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
    "tasks.max": "1",
    "source.topics": "input-topic-1,input-topic-2",
    "topic.selector": "source",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "remote.group.id": "connect_group",
    "remote.enable.auto.commit": "true",
    "remote.auto.offset.reset": "earliest",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 5 - Using prefix topic selector

Maps incoming records to the same topic name as were they came from, but with the specified prefix prepended.

{
  "name": "kafka-source-topic-prefix",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
    "tasks.max": "1",
    "source.topics": "input-topic-1,input-topic-2",
    "topic.selector": "prefix",
    "topic.selector.prefix": "incoming-",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "remote.group.id": "connect_group",
    "remote.enable.auto.commit": "true",
    "remote.auto.offset.reset": "earliest",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 6 - Forwarding remote metadata

Adds headers to each produced record containing the originating topic, partition, offset and timestamp.
The timestamp type will be appended as a header with -Type appended to the specified timestamp header name.
Using the timestamp header X-Kafka-Original-Timestamp will also create the header X-Kafka-Original-Timestamp-Type.

{
  "name": "kafka-source-metadata-forwarding",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
    "tasks.max": "1",
    "source.topics": "input-topic-remote",
    "topic.selector.target": "output-topic-local",
    "forward.metadata.topic.name": "X-Kafka-Original-Topic",
    "forward.metadata.partition.name": "X-Kafka-Original-Partition",
    "forward.metadata.offset.name": "X-Kafka-Original-Offset",
    "forward.metadata.timestamp.name": "X-Kafka-Original-Timestamp",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "remote.group.id": "connect_group",
    "remote.enable.auto.commit": "true",
    "remote.auto.offset.reset": "earliest",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 7 - Using static headers

Two static headers are added to each produced record.

{
  "name": "kafka-source-static-headers",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
    "tasks.max": "1",
    "source.topics": "input-topic-remote",
    "topic.selector.target": "output-topic-local",
    "header.static.aliases": "hdr1,hdr2",
    "header.static.hdr1.name": "X-Header-1",
    "header.static.hdr1.value": "Value for header 1",
    "header.static.hdr2.name": "X-Header-Another",
    "header.static.hdr2.value": "Header added by source connector",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "remote.group.id": "connect_group",
    "remote.enable.auto.commit": "true",
    "remote.auto.offset.reset": "earliest",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Example 8 - Prefixing remote headers

Header names of the record from the remote system will be forwarded, with the name prefixed with Remote- on the record produced to the local system.

{
  "name": "kafka-source-prefix-headers",
  "config": {
    "connector.class": "io.axual.connect.plugins.kafka.source.KafkaSourceConnector",
    "tasks.max": "1",
    "source.topics": "input-topic-remote",
    "topic.selector.target": "output-topic-local",
    "header.remote.prefix": "Remote-",
    "remote.bootstrap.servers": "remote-cluster:19093",
    "remote.group.id": "connect_group",
    "remote.enable.auto.commit": "true",
    "remote.auto.offset.reset": "earliest",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}