Axual Connect

Overview

Axual Connect is Kafka Connect integrated in the Axual Self-Service.

In short, Connect is a framework meant to run Connector Tasks that sync data between a Kafka cluster and another system like a data lake, SQL DB, MQ and many more.

Connect cluster diagram

Connector plugins are made available as reusable components by many different vendors, and most if not all Connector plugins work with the Axual Platform.

What is "Connect"? What are "Connectors"?

Glossary of terms

  • Connect Cluster: A cluster of nodes running Kafka-Connect. All Connector Tasks run on these machines. This cluster is usually owned and operated by Axual.

  • Connect Plugin: A generic program (delivered as a JAR file) which can be configured to integrate an external system with Apache-Kafka. Making an OOP analogy, this can be seen as a "class": it has no runtime of its own, but it can create multiple instances of itself when given the required configuration.

  • Connect Application: A term used within the Axual ecosystem to refer to a Self-Service application that manages a group of Connectors of the same Plugin type. This resource helps with facilitating data governance, the Axual way, the same way we do with regular Kafka applications.

  • Connector: A configured instance of a Connect Plugin. Continuing the OOP analogy, this can be seen as an "Object": a runtime entity. Multiple Connectors of the same Connect Plugin-type can exist at the same time. Connectors are preconfigured to connect to the kafka cluster, so a developer only needs to supply configuration required to reach the other system.

  • Connector Application: A Connect Application can start one Connector Application (an instance of itself) per environment. This is technically just a Connector, deployed onto an Axual-Environment.

  • Connector Task: Connectors are generally run by using multiple "tasks". This is how Connectors scale: by having multiple parallel (and distributed) processes. All instances have the same configuration.

Connect Applications

Differences between Connect Applications and regular Kafka applications:

  1. For Connect Applications, you need to select a "Plugin type" which corresponds to the system you are integrating with (e.g. JDBC, MQTT, Cassandra, etc.), instead of choosing an "Application type" (e.g. Java, Python, Rest, etc.).

  2. Regular Kafka applications require a Certificate PEM file. Connector Applications also require the Private key associated with that certificate. This is simply because the private key must be available to the running program: since the Custom applications run on the tenant’s infrastructure, the key is with them; Connectors run inside the Connect Cluster, so the key must be made available within it.

  3. You can see the status of evert Connector Application in the Self-Service portal. You can also start and stop them from the same place. Custom application lifecycles are handled by the developers and no information about their runtime is displayed in the portal.

Architecture

Connect benefits from being deployed as multiple Connect Nodes over multiple availability zones or racks within Kubernetes for optimal Resilience, follow Kubernetes AffinityRules.

Example use

For example, you are tasked to sync all data from Kafka into an SQL database. Instead of writing your own application, you simply set up a JDBC connector via Axual Self-Service.

API & Interactions

API

The Connect framework has an API that must not be exposed to prevent unauthorized access to Connector configuration and credentials.
Interactions with the Connect API should be solely done through the Self-Service UI or API.
For debugging, an operator can reach the Connect API by port-forwarding the Connect Service or creating an internal Ingress for example. The official Apache Kafka REST API can be referred.

Action: Perform rolling start of Axual Connect

For configuration or image changes, please use a GitOps approach, this is an ad-hoc action for reloading Connector plugins for example. Axual Connect is deployed using Kubernetes Deployments that has a default Update Strategy of RollingUpdate, meaning Pods will be re-created one by one where old Pods are only terminated once the new Pods are in a ready state.
To execute, first get the Deployment name ending with -axual-connect:

kubectl get deployments

NAME                                  READY   UP-TO-DATE   AVAILABLE   AGE
...
tenant-instance-axual-connect         3/3     3            3           15m

Then initiate a restart using the following command, then (optionally) follow the rolling pods

kubectl rollout restart deploy tenant-instance-axual-connect

kubeclt get pods -o wide -w
View output
NAME                                          READY   STATUS             AGE     IP
nhn-instance-axual-connect-dff54dcb-678lj     0/1     Init:1/3           1s      10.244.20.114
nhn-instance-axual-connect-dff54dcb-678lj     0/1     Init:1/3           2s      10.244.20.114
nhn-instance-axual-connect-dff54dcb-678lj     0/1     Init:2/3           30s     10.244.20.114
nhn-instance-axual-connect-dff54dcb-678lj     0/1     Init:2/3           31s     10.244.20.114
nhn-instance-axual-connect-dff54dcb-678lj     0/1     PodInitializing    32s     10.244.20.114
nhn-instance-axual-connect-dff54dcb-678lj     0/1     Running            33s     10.244.20.114
nhn-instance-axual-connect-dff54dcb-678lj     1/1     Running            40s     10.244.20.114
nhn-instance-axual-connect-6c47d98cb-x77kv    1/1     Terminating        2m44s   10.244.20.112
nhn-instance-axual-connect-dff54dcb-45wrj     0/1     Pending            0s      <none>
nhn-instance-axual-connect-dff54dcb-45wrj     0/1     Pending            0s      <none>
nhn-instance-axual-connect-dff54dcb-45wrj     0/1     Init:0/3           0s      <none>
nhn-instance-axual-connect-6c47d98cb-x77kv    0/1     Terminating        2m45s   <none>
nhn-instance-axual-connect-6c47d98cb-x77kv    0/1     Terminating        2m46s   10.244.20.112
nhn-instance-axual-connect-dff54dcb-45wrj     0/1     Init:1/3           2s      10.244.20.115
nhn-instance-axual-connect-6c47d98cb-x77kv    0/1     Terminating        2m46s   10.244.20.112
nhn-instance-axual-connect-6c47d98cb-x77kv    0/1     Terminating        2m46s   10.244.20.112
nhn-instance-axual-connect-dff54dcb-45wrj     0/1     Init:1/3           3s      10.244.20.115
nhn-instance-axual-connect-dff54dcb-45wrj     0/1     Init:2/3           30s     10.244.20.115
nhn-instance-axual-connect-dff54dcb-45wrj     0/1     Init:2/3           31s     10.244.20.115
nhn-instance-axual-connect-dff54dcb-45wrj     0/1     PodInitializing    32s     10.244.20.115
nhn-instance-axual-connect-dff54dcb-45wrj     0/1     Running            33s     10.244.20.115
nhn-instance-axual-connect-dff54dcb-45wrj     1/1     Running            40s     10.244.20.115
nhn-instance-axual-connect-6c47d98cb-xh9k5    1/1     Terminating        2m43s   10.244.20.113
nhn-instance-axual-connect-dff54dcb-xdtlj     0/1     Pending            0s      <none>
nhn-instance-axual-connect-dff54dcb-xdtlj     0/1     Pending            0s      <none>
nhn-instance-axual-connect-dff54dcb-xdtlj     0/1     Init:0/3           0s      <none>
nhn-instance-axual-connect-6c47d98cb-xh9k5    0/1     Terminating        2m45s   <none>
nhn-instance-axual-connect-dff54dcb-xdtlj     0/1     Init:1/3           2s      10.244.20.116
nhn-instance-axual-connect-6c47d98cb-xh9k5    0/1     Terminating        2m45s   10.244.20.113
nhn-instance-axual-connect-6c47d98cb-xh9k5    0/1     Terminating        2m45s   10.244.20.113
nhn-instance-axual-connect-6c47d98cb-xh9k5    0/1     Terminating        2m45s   10.244.20.113
nhn-instance-axual-connect-dff54dcb-xdtlj     0/1     Init:1/3           3s      10.244.20.116
nhn-instance-axual-connect-dff54dcb-xdtlj     0/1     Init:2/3           30s     10.244.20.116
nhn-instance-axual-connect-dff54dcb-xdtlj     0/1     Init:2/3           31s     10.244.20.116
nhn-instance-axual-connect-dff54dcb-xdtlj     0/1     PodInitializing    32s     10.244.20.116
nhn-instance-axual-connect-dff54dcb-xdtlj     0/1     Running            33s     10.244.20.116
nhn-instance-axual-connect-dff54dcb-xdtlj     1/1     Running            41s     10.244.20.116
nhn-instance-axual-connect-6c47d98cb-rvg7h    1/1     Terminating        4m55s   10.244.20.111
nhn-instance-axual-connect-6c47d98cb-rvg7h    0/1     Terminating        4m56s   <none>
nhn-instance-axual-connect-6c47d98cb-rvg7h    0/1     Terminating        4m56s   10.244.20.111
nhn-instance-axual-connect-6c47d98cb-rvg7h    0/1     Terminating        4m56s   10.244.20.111
nhn-instance-axual-connect-6c47d98cb-rvg7h    0/1     Terminating        4m56s   10.244.20.111

Installation

The Axual Connect installation depends on Kafka, a plugin download location, Self Service and mainly Vault to store connector credentials.

Helm Charts

Connect Helm charts are standalone, the component can be installed following Axual Connect 0.2.1 Helm Readme.

Configuration

Each Connect Application contains its own set of Properties to be setup. You will need to reference each Connector for information on how to setup the Connect Application. To view the Connect Settings, go to the Connect Application and in the middle of the page click the cog on the connector application graphic.

There do exist base configuration settings shared by most connectors.

key.converter

Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.

Type: class Default: Valid Values: Importance: high

value.converter

Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.

Type: class Default: Valid Values: Importance: high

header.converter

HeaderConverter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the header values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro. By default, the SimpleHeaderConverter is used to serialize header values to strings and deserialize them by inferring the schemas.

Type: class Default: org.apache.kafka.connect.storage.SimpleHeaderConverter Valid Values: Importance: low

Sink Connector Settings

name

Globally unique name to use for this connector.

Type: string Default: Valid Values: non-empty string without ISO control characters Importance: high

tasks.max

Maximum number of tasks to use for this connector.

Type: int Default: 1 Valid Values: [1,…​] Importance: high

topics

List of topics to consume, separated by commas

Type: list Default: "" Valid Values: Importance: high

topics.regex

Regular expression giving topics to consume. Under the hood, the regex is compiled to a java.util.regex.Pattern. Only one of topics or topics.regex should be specified.

Type: string Default: "" Valid Values: valid regex Importance: high

key.converter

Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.

Type: class Default: null Valid Values: Importance: low

value.converter

Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.

Type: class Default: null Valid Values: Importance: low

header.converter

HeaderConverter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the header values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro. By default, the SimpleHeaderConverter is used to serialize header values to strings and deserialize them by inferring the schemas.

Type: class Default: null Valid Values: Importance: low

config.action.reload

The action that Connect should take on the connector when changes in external configuration providers result in a change in the connector’s configuration properties. A value of 'none' indicates that Connect will do nothing. A value of 'restart' indicates that Connect should restart/reload the connector with the updated configuration properties.The restart may actually be scheduled in the future if the external configuration provider indicates that a configuration value will expire in the future.

Type: string Default: restart Valid Values: [none, restart] Importance: low

transforms

Aliases for the transformations to be applied to records.

Type: list Default: "" Valid Values: non-null string, unique transformation aliases Importance: low predicates Aliases for the predicates used by transformations.

Type: list Default: "" Valid Values: non-null string, unique predicate aliases Importance: low

errors.retry.timeout

The maximum duration in milliseconds that a failed operation will be reattempted. The default is 0, which means no retries will be attempted. Use -1 for infinite retries.

Type: long Default: 0 Valid Values: Importance: medium

errors.retry.delay.max.ms

The maximum duration in milliseconds between consecutive retry attempts. Jitter will be added to the delay once this limit is reached to prevent thundering herd issues.

Type: long Default: 60000 (1 minute) Valid Values: Importance: medium

errors.tolerance

Behavior for tolerating errors during connector operation. 'none' is the default value and signals that any error will result in an immediate connector task failure; 'all' changes the behavior to skip over problematic records.

Type: string Default: none Valid Values: [none, all] Importance: medium

errors.log.enable

If true, write each error and the details of the failed operation and problematic record to the Connect application log. This is 'false' by default, so that only errors that are not tolerated are reported.

Type: boolean Default: false Valid Values: Importance: medium

errors.log.include.messages

Whether to the include in the log the Connect record that resulted in a failure. This is 'false' by default, which will prevent record keys, values, and headers from being written to log files, although some information such as topic and partition number will still be logged.

Type: boolean Default: false Valid Values: Importance: medium

errors.deadletterqueue.topic.name

The name of the topic to be used as the dead letter queue (DLQ) for messages that result in an error when processed by this sink connector, or its transformations or converters. The topic name is blank by default, which means that no messages are to be recorded in the DLQ.

Type: string Default: "" Valid Values: Importance: medium

errors.deadletterqueue.topic.replication.factor

Replication factor used to create the dead letter queue topic when it doesn’t already exist.

Type: short Default: 3 Valid Values: Importance: medium

errors.deadletterqueue.context.headers.enable

If true, add headers containing error context to the messages written to the dead letter queue. To avoid clashing with headers from the original record, all error context header keys, all error context header keys will start with __connect.errors.

Type: boolean Default: false Valid Values: Importance: medium

Source Connector Settings

name

Globally unique name to use for this connector.

Type: string Default: Valid Values: non-empty string without ISO control characters Importance: high connector.class Name or alias of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector. If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name, or use "FileStreamSink" or "FileStreamSinkConnector" to make the configuration a bit shorter

Type: string Default: Valid Values: Importance: high

tasks.max

Maximum number of tasks to use for this connector.

Type: int Default: 1 Valid Values: [1,…​] Importance: high

key.converter

Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.

Type: class Default: null Valid Values: Importance: low

value.converter

Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.

Type: class Default: null Valid Values: Importance: low

header.converter

HeaderConverter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the header values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro. By default, the SimpleHeaderConverter is used to serialize header values to strings and deserialize them by inferring the schemas.

Type: class Default: null Valid Values: Importance: low

config.action.reload

The action that Connect should take on the connector when changes in external configuration providers result in a change in the connector’s configuration properties. A value of 'none' indicates that Connect will do nothing. A value of 'restart' indicates that Connect should restart/reload the connector with the updated configuration properties.The restart may actually be scheduled in the future if the external configuration provider indicates that a configuration value will expire in the future.

Type: string Default: restart Valid Values: [none, restart] Importance: low

transforms

Aliases for the transformations to be applied to records.

Type: list Default: "" Valid Values: non-null string, unique transformation aliases Importance: low predicates Aliases for the predicates used by transformations.

Type: list Default: "" Valid Values: non-null string, unique predicate aliases Importance: low

errors.retry.timeout

The maximum duration in milliseconds that a failed operation will be reattempted. The default is 0, which means no retries will be attempted. Use -1 for infinite retries.

Type: long Default: 0 Valid Values: Importance: medium

errors.retry.delay.max.ms

The maximum duration in milliseconds between consecutive retry attempts. Jitter will be added to the delay once this limit is reached to prevent thundering herd issues.

Type: long Default: 60000 (1 minute) Valid Values: Importance: medium

errors.tolerance

Behavior for tolerating errors during connector operation. 'none' is the default value and signals that any error will result in an immediate connector task failure; 'all' changes the behavior to skip over problematic records.

Type: string Default: none Valid Values: [none, all] Importance: medium

errors.log.enable

If true, write each error and the details of the failed operation and problematic record to the Connect application log. This is 'false' by default, so that only errors that are not tolerated are reported.

Type: boolean Default: false Valid Values: Importance: medium

errors.log.include.messages

Whether to the include in the log the Connect record that resulted in a failure. This is 'false' by default, which will prevent record keys, values, and headers from being written to log files, although some information such as topic and partition number will still be logged.

Type: boolean Default: false Valid Values: Importance: medium

topic.creation.groups

Groups of configurations for topics created by source connectors

Type: list Default: "" Valid Values: non-null string, unique topic creation groups Importance: low

Logging

Logging onto Kafka topic

Axual Connect has the additional logging capability of writing logs of Connector tasks onto a Kafka topic. This allows users that don’t have access to view the logs to consume the logs via Kafka. More details here.