Deploying Distributor
Distributor components
The Distributor consists of a Kafka Connect image container, the Axual Distributor connector plugins and a Helm chart controlling the deployment and the connector configurations.
The Axual Distributor components are:
- Message Distributor
-
which distributes the records on the topics of the local Kafka cluster to a remote Kafka cluster
- Offset Distributor
-
which distributes the committed offsets of clients from the local Kafka cluster to a special topic on the remote Kafka cluster
- Offset Committer
-
reads message from Offset Distributors from a special topic on the local Kafka cluster and commits them in the local cluster
- Schema Distributor
-
reads topics from the Confluent Schema Registry topic on the local Kafka cluster and produces them to a remote Kafka cluster
All distributor are aware of the Axual naming patterns used for multitenancy, and can transform topic and consumer group names to the correct pattern used by the remote Kafka cluster.
The Distributor Helm charts enables you to install and run the Kafka Connect components without starting any distributors, allowing for a step-by-step installation and activation of the different distribution component
Prerequisites
A separate Distributor is deployed for each Kafka Cluster of a tenant instance. The following components are needed to deploy a Distributor:
- Kafka Cluster
-
The local cluster from which data is read and distributed to remote Kafka clusters
- Strimzi Operator
-
The Strimzi Operator watches for new or changed Kafka Connect and Connector definitions and performs the actual deployment according to those definitions. The Distributor Helm charts use Strimzi to deploy and configure the correct containers and distribution configuration.
- Runtime Client Certificate
-
The Distributor needs a client certificate to connect to the Kafka Cluster and access the correct topics and consumer groups. This certificate does not need to be a super user in the Kafka Cluster.
- Initialisation Client TLS Secret
-
A TLS secret containing a client certificate which allows initialisation tor create new access control list entries and topics for distributor.
Installing the Distributor Helm Chart
The distributor helm charts are stored in a public registry maintained by Axual. To access the Helm chart and the images you need an account to access it.
Contact the Axual support team to request access.
The following example logs into the registry and installs the Axual Distributor Chart with a single values.yaml
helm registry login -u [your-user] registry.axual.io/axual-charts
helm upgrade --install local-distributor oci://registry.axual.io/axual-charts/distributor --version 5.3.4 -f values.yaml
Deploying the Distributor
The Helm configuration options for Distributor is split into three sections:
- Init
-
Configures the working of the initialisation jobs. It creates the required topics and access control list entries.
- Connect
-
Configures the Kafka Connect deployment
- Distribution
-
Configures the actual configuration for distribution
Configuring initialisation jobs
Both Kafka Connect and the distribution connector plugins need access to several topics and consumer groups for proper deployment. The init section configures which topics and access control list entries to create and which user should have access.
The initialisation scripts will use configuration from the connect
and the distribution
sections to determine the correct Kafka cluster, topic and consumer group names.
Example init configuration
The configuration below contains an example configuration for a user principal CN=Example distributor client for test instance cluster 1,OU=Infra,O=Axual
. The Kubernetes secret cluster-1-root
contains the key data to connect to the Kafka Cluster, and the secret cluster-1-ca
contains the trusted certificate authorities.
If the Kafka cluster uses the Principal Chain Builder then the fully chain principal name should be used. In this example that is [0] CN=RootCA,OU=Infra,O=Axual, [1] CN=Intermediate CA 1,OU=Infra,O=Axual, [2] CN=Example distributor client for test instance cluster 1,OU=Infra,O=Axual
# Initialise Kafka Cluster for Connect or Connector
init:
# Initialisation calls are executed when enabled and the connect and/or distribution is also enabled
# Default value is false
enabled: true
# Set the principal names matching the Distinguished Name of the Client Certificate
# Set multiple names if distributors connecting from other clusters to this cluster use different certifcates
principals:
# Names without using Principal Chain Builder
# The principal used by a distributor connecting from cluster 1
- "User:CN=Example distributor client for test instance cluster 1,OU=Infra,O=Axual"
# The principal used by a distributor connecting from cluster 2
- "User:CN=Example distributor client for test instance cluster 2,OU=Infra,O=Axual"
# Names using Principal Chain Builder
- "User:[0] CN=RootCA,OU=Infra,O=Axual, [1] CN=Intermediate CA 1,OU=Infra,O=Axual, [2] CN=Example distributor client for test instance cluster 1,OU=Infra,O=Axual"
- "User:[0] CN=RootCA,OU=Infra,O=Axual, [1] CN=Intermediate CA 2,OU=Infra,O=Axual, [2] CN=Example distributor client for test instance cluster 2,OU=Infra,O=Axual"
# Define optional extra configuration options for the admin client. For properties, see https://kafka.apache.org/documentation/#adminclientconfigs
additionalClientConfigs:
# Set the initial TLS protocol to TLSv1.2
ssl.protocol: TLSv1.2
# Don't support pushing client metrics to server for initialisation calls
enable.metrics.push: false
# Secrets for connecting to Kafka Cluster to create topics and access control entries
tls:
# Name of and existing Secret containing the private key and certificate to access Kafka cluster
keypairSecretName: "cluster-1-root"
# The key of the Secret field containing the private key data
keypairSecretKeyName: "key"
# The key of the Secret field containing the public certificate chain for the key
keypairSecretCertName: "cert"
# Name of and existing Secret containing the list of trusted certificate authorities
truststoreCaSecretName: "cluster-1-ca"
# The key of the Secret container the trusted certificate authorities
truststoreCaSecretCertName: "cacert"
# Resource allocation for the init job pod
#resources: {}
# Defines the security options the container should be run with
#securityContext: {}
# Initialisation image information
#image:
# registry: "registry.axual.io"
# repository: "axual/distributor"
# tag: "5.3.4-0.43.0"
# pullSecrets:
# - axual-credentials
Configuring the Kafka Connect deployment
The configuration of the Kafka Connect component of Distributor is mostly based on the Strimzi Kafka Connect resource definition.
The following information is needed to deploy connect
- Kafka Cluster bootstrap servers
-
The bootstrap servers is a list of the hostname and port combination used to set up an initial connection to with the local Kafka Cluster. Make sure that the firewall and/or network policies allow access from Kafka Connect to all brokers in the Kafka cluster.
- Kafka Cluster trusted certificate authorities
-
Kafka Connect needs to determine which Certificate Authorities are trusted when connecting to the Kafka Cluster. This can be as a Kubernetes Secret or in the
values.yaml
as PEM encoded data. - Client certificate for local Kafka cluster
-
The deployment needs the private key and certificate chain for the connection to the Kafka cluster. This can be as a Kubernetes Secret or in the
values.yaml
as PEM encoded data. - Topic and Consumer Group names
-
Kafka Connect needs three topics to store configurations, status and connector offset information. It also uses a consumer group to allow worker nodes to become aware of each other. The names need to be unique per distributor installation, and should not match the naming patterns of the local Kafka cluster to prevent distributing the data in each topic. Naming often starts with an underscore and contains the {instance-sn} short names.
- Strimzi Operator version
-
The Strimzi Cluster Operator controls the deployment and operation of the Distributor. It requires that the base image of the Distributor is matching the operator, to prevent compatibility issues. A release of the Axual Distributor will support multiple Strimzi versions.
Example connect configuration
The following example will connect to the Kafka Cluster cluster01
in namespace kafka
of the same Kubernetes cluster.
It is deployed for tenant demo
and instance prod
and uses the Secrets demo-prod-distributor-client
and kafka-cluster01-ca
for connecting to the Kafka Cluster. The internal consumer group is _demo-prod-distribitor
and the Connect topics are _demo-prod-distribitor-config
, _demo-prod-distribitor-status
and _demo-prod-distribitor-offset
.
Strimzi 0.43.0 is used, so the distributor image tag is set to 5.3.4-0.43.0
to select the Strimzi 0.43.0 version of Distributor 5.3.4.
connect:
# Which image and registry should be used for connect
image:
# Set the specific tag of the repository to use for the init containers
tag: "5.3.4-0.43.0"
# The number of connect workers to start for parallel processing
replicas: 3
# Rack aware assignment should be used
rack:
enabled: true
topologyKey: topology.kubernetes.io/zone
# The bootstrap url used to connect to the kafka cluster
bootstrapServers: "cluster01-kafka-bootstrap.kafka:9093"
groupId: "_demo-prod-distributor"
# Contains the internal connect topics settings
topics:
# Make sure to set this to a proper value for your Kafka cluster. In production this is at least 3
replicationFactor: 3
config:
name: "_demo-prod-distributor-config"
status:
name: "_demo-prod-distributor-status"
partitions: 2
offset:
name: "_demo-prod-distributor-offset"
partitions: 3
# Set extra connect configuration values
config:
key.converter: JsonConverter
value.converter: JsonConverter
# SASL PLAIN/SCRAM-SHA-25
sasl:
# Set to true to enable a SASL connection
enabled: false
type: "PLAIN"
username: "hello"
password: "world"
tls:
enabled: true
createCaCertsSecret: false
# if createTruststoreCaSecret is true, set the CA certs below
# caCerts:
# one_ca.crt: |
# -----BEGIN CERTIFICATE-----
# other_ca.crt: |
# -----BEGIN CERTIFICATE-----
# if createTruststoreCaSecret is false, the caCerts need to be set
# with an existing secret (name) and the name of the cert inside the
# secret
# caSecret:
# secretName: your_custom_ca_secret
# keyForCertificate: your_custom_ca_cert
caSecret:
secretName: "cluster01-cluster-ca-cert"
keyForCertificate: "ca.crt"
# Configure authentication using a client certificate
authentication:
enabled: true
createTlsClientSecret: false
# if createTlsClientSecret is true, set the client key and certificate chain below
#clientCert: |
# -----BEGIN CERTIFICATE-----
# -----END CERTIFICATE-----
#clientKey: |
# -----BEGIN PRIVATE KEY-----
# -----END PRIVATE KEY-----
# if a TLS secret already exists with the client credentials, provide the name here
secretName: "demo-prod-cluster01-distributor"
# Configure logging and log levels, see the Strimzi API documentation on how to configure https://strimzi.io/docs/operators/latest/configuring#type-KafkaConnectSpec-schema-reference
logging:
type: inline
loggers:
log4j.rootLogger: "INFO"
log4j.logger.io.axual.distributor.common: "INFO"
log4j.logger.io.axual.distributor.message: "INFO"
log4j.logger.io.axual.distributor.offset: "INFO"
log4j.logger.io.axual.distributor.schema: "INFO"
log4j.logger.org.apache.kafka.connect.runtime.rest: "WARN"
# expose metrics using the JMX Prometheus Exporter
metrics:
enabled: false
# Instruct Prometheus to scrape the metrics using a PodMonitor resource
podMonitor:
enabled: false
# additionalLabels:
# label/my1: hi
# lbl2: there
# additionalAnnotations:
# annotation/my1: hi
# ann2: there
# interval: 60s
# scrapeTimeout: 12s
# Create Prometheus Alert Manager Rules
prometheusRule:
enabled: false
# additionalAnnotations:
# my-annotation: help
# some-annotation: none
# additionalLabels:
# my-label: lab-help
# some-label: 123
# rules:
# - alert: MyCustomRuleName
# annotations:
# message: '{{ "{{ $labels.connector }}" }} send rate has dropped to 0'
# expr: sum by (connector) ( kafka_connect_sink_task_sink_record_send_rate{connector=~".*-message-distributor-.*"}) == 0
# for: 5m
# labels:
# severity: high
# callout: false
# Add additional configuration here, see the Strimzi API documentation on how to use them https://strimzi.io/docs/operators/latest/configuring#type-KafkaConnectSpec-schema-reference
#resources: {}
#livenessProbe: {}
#readinessProbe: {}
#jvmOptions: {}
#jmxOptions: {}
#tracing: {}
#template: {}
#build: {}
Configuring distribution
A distribution model is needed to properly set up the data replication between clusters. The Axual multi level distribution model is designed to distribute the data to all clusters with a minimum of number of steps. It is designed to minimize the crossing of network boundaries and reducing the amount of data send from one cluster to another.
Requirements distribution
- Tenant and Instance short names
-
Each Distributor deployment is dedicated for a single tenant instance. The shortnames are often used in the naming patterns of a cluster, and is also used to identify the connector.
- Distribution model
-
Design a distribution model and determine what the static target clusters are for each. This information is needed to determine the contents of the following requirements.
The cluster names used in the model are static, renaming a cluster, or removing and adding with a different name later can cause in existing data not being distributed, or being distributed again. |
- Target cluster connectivity information
-
The connection information needed to reach each if the remote Kafka Cluster. This includes bootstrap servers, security protocols, but can also contain specific settings to work around network issues like firewalls or routers cutting off inactive connections.
- Trusted certificate authorities for each remote Kafka cluster
-
Kafka Connect needs to determine which Certificate Authorities are trusted when connecting to each of the remote Kafka Clusters. This must be specified in the
values.yaml
as PEM encoded data. - Client certificate for each remote Kafka cluster
-
The deployment needs the private key and certificate chain for the connection to each of the remote Kafka clusters. his must be specified in the
values.yaml
as PEM encoded data.
Distribution uses references to cluster configuration in a map. This allows potential target clusters to be defined next to the active target clusters. Doing this allows for quicker reconfiguration of distribution in case one of the clusters goes offline. You just have to update the reference for the target cluster. |
Configure local and remote clusters
The following example adds the cluster information to the values.yaml
.
In this example there are three Kafka clusters:
- cluster01
-
The local Kafka Cluster from which the records and offsets are read, and distributed to the remote Kafka Clusters.
- cluster02
-
The first remote Kafka Cluster.
- cluster03
-
The second remote Kafka Cluster.
The Distributor configurations running on Cluster02 and Cluster03 are not shown on this page, but they should also be running and distributing messages to Cluster01. |
distribution:
# If set to false, all distribution are disabled. If true, the individual distribution enabling flags are evaluated
enabled: true
# Setting the tenant information
tenantShortName: "demo"
instanceShortName: "prod"
# Prefix values for topic and group ACLs on the local Kafka Cluster.
prefixAclSettings:
# The local topic pattern is {tenant}-{instance}-{environment}-{topic}. environment and topic are dynamic and are ignored for the prefix ACL. Results in all topics with a name starting with this prefix to be distributed
topicPrefix: "demo-prod-"
# The local group pattern is {tenant}-{instance}-{environment}-{group.id}. environment and group id are dynamic and are ignored for the prefix ACL. Results in all offsets for a consumer group with a name starting with this prefix to be distributed
groupPrefix: "demo-prod-"
# Information about the source cluster. This is the cluster Kafka Connect is connecting with and distributors read their data from
sourceCluster:
# Cluster name, this is also used in the connector and consumer group names
name: cluster01
# Contains instance topic information for the cluster
topics:
# The replication factor to use for the local distributor specific topics
# Make sure to set this to a proper value for your Kafka cluster. In production this is at least 3
replicationFactor: 3
# Timestamps topic for offset distribution, name is _<tenant>-<instance>-consumer-timestamps
timestamps:
nameOverride: ""
# The number of partitions to use for the timestamps topic
partitions: 25
# The resource naming pattern used on this cluster
topicPattern: '{tenant}-{instance}-{environment}-{topic}'
groupPattern: '{tenant}-{instance}-{environment}-{group.id}'
# If a pattern value is missing the value specified in this dictionary should be used
patternDefaultValues:
environment: prod
# Connection information, used by offset distributor and committer to update/query data from the source cluster
bootstrapServers: "cluster01-kafka-bootstrap.kafka:9093"
# What is the protocol for the Kafka connection using the bootstrap server.
# Valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, SASL
securityProtocol: "SSL"
# Authentication configuration is provided separately, but other client connection settings
# can be specified here. For example, TLS protocols and buffer sizes
additionalClientConfigs:
ssl.enabled.protocols: TLSv1.2
# If the security protocol is SASL_PLAINTEXT or SASL_SSL then this section must be used
# sasl:
# Which SASL mechanism should be used. Accepts PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512
# mechanism: SCRAM-SHA-512
# username: my-user
# password: my-password
# Contains the tls settings for the cluster
tls:
# A list of PEM encoded CA certificates to use when connecting to the Kafka cluster
# This is used to verify the server certificate
caCerts:
- |
-----BEGIN CERTIFICATE-----
-----END CERTIFICATE-----
# The PEM encoded client certificate chain to use if Mutual TLS, or certificate based authentication is required
# Only when both clientCert and clientKey are provided will Mutual TLS be enabled
clientCert: |
-----BEGIN CERTIFICATE-----
-----END CERTIFICATE-----
# A PEM encoded PKCS#8 key to use if Mutual TLS, or certificate based authentication is required
# Encrypted keys are supported, the password should go in the clientKeyPassword configuration
# Only when both clientCert and clientKey are provided will Mutual TLS be enabled
clientKey: |
-----BEGIN PRIVATE KEY-----
-----END PRIVATE KEY-----
# The password for the clientKey, only set this if the clientKey is encrypted
# clientKeyPassword: somethingVerySecret
# Information about all potential target clusters. These are the cluster where data will be written to
clusters:
# The key/id of the cluster to use. Recommend to keep this the same as the cluster name
cluster02:
# Cluster name, this is also used in the connector and consumer group names
name: cluster02
# Contains instance topic information for the cluster
topics:
# Timestamps topic for offset distribution
timestamps:
# Use name override if the target cluster does not use default name of _<tenant>-<instance>-consumer-timestamps
nameOverride: ""
# Schema topic for schema distribution
schemas:
# Use name override if the target cluster does not use default name of _<tenant>-<instance>-schemas
nameOverride: ""
# The resource naming pattern used on this cluster
topicPattern: '{tenant}-{instance}-{environment}-{topic}'
groupPattern: '{tenant}-{instance}-{environment}-{group.id}'
# If a pattern value is missing the value specified in this dictionary should be used
patternDefaultValues:
environment: prod
# Connection information, used by connectors to connect to the target
bootstrapServers: "cluster02-kafka-bootstrap.region2.demo.com:9093"
# What is the protocol for the Kafka connection using the bootstrap server.
# Valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
securityProtocol: SSL
# Authentication configuration is provided separately, but other client connection settings
# can be specified here. For example, TLS protocols and buffer sizes
additionalClientConfigs:
ssl.enabled.protocols: TLSv1.2
# If the security protocol is SASL_PLAINTEXT or SASL_SSL then this section must be used
#sasl:
# # Which SASL mechanism should be used. Accepts PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512
# mechanism: SCRAM-SHA-512
# username: my-user
# password: my-password
# Contains the tls settings for the cluster
tls:
# A list of PEM encoded CA certificates to use when connecting to the target Kafka cluster
# This is used to verify the server certificate
caCerts:
- |
-----BEGIN CERTIFICATE-----
-----END CERTIFICATE-----
# The PEM encoded client certificate chain to use if Mutual TLS, or certificate based authentication is required
# Only when both clientCert and clientKey are provided will Mutual TLS be enabled
clientCert: |
-----BEGIN CERTIFICATE-----
-----END CERTIFICATE-----
# A PEM encoded PKCS#8 key to use if Mutual TLS, or certificate based authentication is required
# Only when both clientCert and clientKey are provided will Mutual TLS be enabled
# Encrypted keys are supported, the password should go in the clientKeyPassword configuration
clientKey: |
-----BEGIN PRIVATE KEY-----
-----END PRIVATE KEY-----
# The password for the clientKey, only set this if the clientKey is encrypted
#clientKeyPassword: somethingVerySecret
# The key/id of the cluster to use. Recommend to keep this the same as the cluster name
cluster03:
# Cluster name, this is also used in the connector and consumer group names
name: cluster3
# Contains instance topic information for the cluster
topics:
# Timestamps topic for offset distribution
timestamps:
# Use name override to not use default name of _<tenant>-<instance>-consumer-timestamps
nameOverride: ""
# Schema topic for schema distribution
schemas:
# Use name override to not use default name of _<tenant>-<instance>-schemas
nameOverride: ""
# The resource naming pattern used on this cluster
topicPattern: '{tenant}-{instance}-{environment}-{topic}'
groupPattern: '{tenant}-{instance}-{environment}-{group.id}'
# If a pattern value is missing the value specified in this dictionary should be used
patternDefaultValues:
environment: prod
# Connection information, used by connectors to connect to the target
bootstrapServers: "cluster03-kafka-bootstrap.region3.demo.com:9093"
# What is the protocol for the Kafka connection using the bootstrap server.
# Valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, SASL
securityProtocol: SSL
# Authentication configuration is provided separately, but other client connection settings
# can be specified here. For example, TLS protocols and buffer sizes
additionalClientConfigs:
ssl.enabled.protocols: TLSv1.2
# If the security protocol is SASL_PLAINTEXT or SASL_SSL then this section must be used
#sasl:
# # Which SASL mechanism should be used. Accepts PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512
# mechanism: SCRAM-SHA-512
# username: my-user
# password: my-password
# Contains the tls settings for the cluster
tls:
# A list of PEM encoded CA certificates to use when connecting to the target Kafka cluster
# This is used to verify the server certificate
caCerts:
- |
-----BEGIN CERTIFICATE-----
-----END CERTIFICATE-----
# The PEM encoded client certificate chain to use if Mutual TLS, or certificate based authentication is required
# Only when both clientCert and clientKey are provided will Mutual TLS be enabled
clientCert: |
-----BEGIN CERTIFICATE-----
-----END CERTIFICATE-----
# A PEM encoded PKCS#8 key to use if Mutual TLS, or certificate based authentication is required
# Only when both clientCert and clientKey are provided will Mutual TLS be enabled
# Encrypted keys are supported, the password should go in the clientKeyPassword configuration
clientKey: |
-----BEGIN PRIVATE KEY-----
-----END PRIVATE KEY-----
# The password for the clientKey, only set this if the clientKey is encrypted
#clientKeyPassword: somethingVerySecret
Configuring Schema Distributor
Only start the Schema Distributor on the primary cluster. Schema Distributor is meant to synchronize schema topics from a leading schema registry to following schema registries. The other way around may delete schemas from the leading topic and enabling Schema Distributors on multiple clusters should be avoided to prevent looping behaviour and schema id mismatches. |
The following schema distributor configuration distributes schemas to remote Kafka cluster cluster01
and cluster02
. The connection information and topic names are retrieved from the sourceCluster
and clusters` configuration set in Configure local and remote clusters.
The configuration map from additionalClientConfigs
is merged with the clusters.[].additionalClientConfigs
and used to configure the producer to the remote Kafka Cluster.
distribution:
# Schema Distribution
schemaDistributor:
enabled: true
# Which clusters should it distribute to, uses the {clusters.[].name} to find cluster information
targetClusterNames:
- "cluster02"
- "cluster03"
# Override the settings for the internal Kafka Connect consumer for the tasks. This can be used to tweak performance
consumerOverrides:
# Override how many records will be read in a single poll of Kafka Connect
max.poll.records: 50
# Any additional client configs needed to connect to the remote cluster.
# For example, TLS protocols and buffer sizes and retries
additionalClientConfigs:
acks: all
retries: 5
max.in.flight.requests.per.connection: 5
enable.idempotence: true
ssl.enabled.protocols: TLSv1.2,TLSv1.3
ssl.endpoint.identification.algorithm: ''
Configuring Offset Committer
The offset committer reads offset timestamps for consumer groups from the local Kafka Cluster, translates those to offsets and commits the offsets on the local Kafka Cluster.
The offset committer can be started immediately since it relies on data send to the local timestamps topic by the Offset Distributors running on remote clusters. When those are not active the Offset Committer will just wait.
The following configuration enables the Offset Committer, running with 5 parallel tasks. The setting distributionOffsetMs
determines how much time to subtract from the received timestamp before determining the offset to commit. This ensures that the client application does not miss records when migrating from one cluster to another.
Resolving the names of consumer groups and topic names can be expensive. A cache is used to store the resolved names. The size of the caches can be set with topicCacheSize
and groupCacheSize
The configuration map from offsetCommiter.additionalClientConfigs
is merged with the sourceCluster.additionalClientConfigs
and used to configure the clients needed to look up and commit offsets on the local Kafka Cluster.
distribution:
offsetCommitter:
enabled: true
# how many parallel tasks should the connector have
maxTasks: 5
# Setting this to true will skip any connectivity check when creating and validating the connector config
# skipConnectionValidation: true
# On severe failures there will be continuous processing errors.
# Internal connector resources can be reinitialized when reaching the thresholds defined here
reinitializeOnContinuousError:
# Reinitialize if the continuous error count reaches this number
maximumCount: 5000
# Reinitialize if the continues error count has existed for this long
maximumTimeMs: 30000
# The offset committer reads timestamps that relate to records produced on the source topic.
# This value pushed the timestamp back to make sure that any application will continue before or at
# the offset of the original record, guaranteeing at-least-once consuming of records
# If the message distribution to this cluster slows down a higher value might be needed
distributionOffsetMs: 30000
# The maximum size of the cache used to optimize group and topic resolving.
topicCacheSize: 1000
groupCacheSize: 1000
# Override the settings for the internal Kafka Connect consumer for the tasks. This can be used to tweak performance
consumerOverrides:
# Override how many records will be read in a single poll of Kafka Connect
max.poll.records: 50
# Any additional client configs needed to connect to the source cluster to commit the offsets.
# For example, TLS protocols and buffer sizes
additionalClientConfigs:
ssl.enabled.protocols: TLSv1.2,TLSv1.3
ssl.endpoint.identification.algorithm: ''
Configuring Message Distributor
The message distributor uses the distribution model to determine if a record should be distributed or not. The configuration for distributor is split in separate configuration per level.
Each level distributes to a remote Kafka Cluster, and a level consists of Message and Offset Distributor settings.
Message Distributor requires a regular expression to subscribe to the local topics which should be distributed. This regular expression should match the topic pattern to prevent issues during distribution.
The following example configures a level 1 distribution to cluster01
using 5 tasks and a level 2 distribution to cluster02
using 2 tasks.
The topicRegex
is set to demo-prod-.*
since tenant and instance are static values for the topic pattern used. All topics for this instance begin with demo-prod-
.
The configuration map from levels.[].messageDistributor.additionalClientConfigs
is merged with the clusters.[].additionalClientConfigs
and used to configure the clients produce records to the remote Kafka Cluster.
distribution:
# Axual uses a multi level distribution model to make sure records and offsets arrive at the correct cluster
# This is a dictionary containing the distribution levels relevant to the source cluster.
# Each level can be activated independent and contains settings for the offset and message distributors for that
# level
# The level number is 1 to 32
levels:
# level numbers should be numbers, and are used in connector and group naming
1:
enabled: true
# To which cluster does the data go, uses the {clusters.[].name} to find cluster information
targetClusterName: "cluster02"
# The message distributor settings to copy records from the source cluster to the target cluster
messageDistributor:
enabled: true
# how many parallel tasks should the connector have
maxTasks: 5
# which topic pattern should be used for subscription. Form is dependent on to {sourceCluster.topicPattern}
topicRegex: 'demo-prod-.*'
# The maximum size of the cache used to optimize topic resolving.
topicCacheSize: 1000
# Override the settings for the internal Kafka Connect consumer for the tasks. This can be used to tweak performance
consumerOverrides:
# Override how many records will be read in a single poll of Kafka Connect
max.poll.records: 50
# Any additional client configs needed to connect to the target cluster.
# For example, TLS protocols and buffer sizes
additionalTargetClientConfigs: {}
2:
enabled: true
targetClusterName: "cluster03"
messageDistributor:
enabled: true
maxTasks: 2
topicRegex: 'demo-prod-.*'
topicCacheSize: 1000
consumerOverrides:
max.poll.records: 50
additionalTargetClientConfigs: {}
Configuring Offset Distributor
The Offset Distributor uses the distribution model to determine if a committed offset should be distributed or not. The configuration for this distributor is split in separate configuration per level.
Each level controls the distribution to a remote Kafka Cluster, and a level configuration contains both Offset and Message Distributor settings.
The offset distributor reads from the __consumer_offsets
on the local Kafka Cluster, filters out all commits that are not for the specific tenant and instance, or that should not be distributed according to the distribution model.
Offset commits can happen often for a consumer group, as this is controlled by the consumer application and the load on the partition. The offset distributor only produces a single timestamp in a time window for each consumer group. The window size is set using the windowSizeMs
property.
The following example configures a level 1 distribution to cluster01
using 5 tasks and a level 2 distribution to cluster02
using 5 tasks. The window size for both is 15 seconds (15000 milliseconds).
The configuration map from level[].offsetDistributor.additionalLocalClientConfigs
is merged with the sourceCluster.additionalClientConfigs
and used to configure the clients needed to look up and commit offsets on the local Kafka Cluster
The configuration map from level[].offsetDistributor.additionalTargetClientConfigs
is merged with the clusters.[].additionalClientConfigs
and used to configure the clients produce records to the remote Kafka Cluster.
distribution:
# Axual uses a multi level distribution model to make sure records and offsets arrive at the correct cluster
# This is a dictionary containing the distribution levels relevant to the source cluster.
# Each level can be activated independent and contains settings for the offset and message distributors for that
# level
# The level number is 1 to 32
levels:
# level numbers should be numbers, and are used in connector and group naming
1:
enabled: true
# To which cluster does the data go, uses the {clusters.[].name} to find cluster information
targetClusterName: "cluster02"
# The offset distributor settings to distribute offsets from the source cluster as a timestamp
# to the target cluster where it can be committed.
# The distributor produces to the timestamps topic on te target cluster
# To prevent a flood of offset commits only the latest value for a specific consumer group and topic partition
# combination in a time window is translated and produced as a timestamp to the remote cluster
offsetDistributor:
enabled: true
# how many parallel tasks should the connector have
maxTasks: 5
# The size of the window to group offset commits in.
# Grouping is done on group.id+topic+partition.
windowSizeMs: 15000
# The maximum size of the cache used to optimize group and topic resolving.
topicCacheSize: 1000
groupCacheSize: 1000
# Override the settings for the internal Kafka Connect consumer for the tasks. This can be used to tweak performance
consumerOverrides:
# Override how many records will be read in a single poll of Kafka Connect
max.poll.records: 50
# Any additional client configs needed to connect to the source cluster.
# For example, TLS protocols and buffer sizes
# The local connection is used to determine the timestamps of a specific record
additionalLocalClientConfigs: {}
# Any additional client configs needed to connect to the target cluster.
# For example, TLS protocols and buffer sizes
additionalTargetClientConfigs: {}
2:
enabled: true
targetClusterName: "cluster03"
offsetDistributor:
enabled: true
maxTasks: 5
windowSizeMs: 15000
topicCacheSize: 1000
groupCacheSize: 1000
consumerOverrides:
max.poll.records: 50
additionalLocalClientConfigs: {}
additionalTargetClientConfigs: {}
Order of deployment of the Distributor connectors
The order to start connectors is important. The normal order is:
-
Schema Distributor on the cluster where Self Service registers the schema
-
Offset Committer runs on all clusters and looks up and commits offsets to the local cluster.
-
Message Distributor to distribute all records to the target cluster
-
Wait for Message Distributor to have distributed all topics and is keeping up with new production. This is needed to ensure that Offset Distribution does refer to message offsets that haven’t been distributed
-
Offset Distributor to determine the timestamp and distribute that timestamp to remote clusters.
How to configure remote SSL security through K8S Secret resource?
In case you want to use a K8S Secret resources to configure remote SSL config for the Distributors you have to add the following code block:
The K8S Secret with the client certificate, private key and CA certificates in PEM format should be available when the Distributor is started. |
Modify your values.yaml
and update externalConfiguration
in distributor configuration as follows:
connect:
# You can mount K8S ConfigMaps or Secrets into a distributor pod as environment variables or volumes. Volumes and environment variables are configured in the externalConfiguration property
# for full documentation visit: https://strimzi.io/docs/operators/latest/configuring.html#type-ExternalConfiguration-reference
externalConfiguration:
volumes:
- name: distribution-secret-cluster02
secret:
secretName: <distribution-secret-name>
- name: distribution-secret-cluster03
secret:
secretName: <distribution-secret-name>
and update the distributor configuration to remove the explicit client certificate configuration and add additional configuration
distribution:
clusters:
cluster02:
# Skipping all other settings to illustrate needed settings
additionalClientConfigs:
# define the DirectoryConfigProvider
config.providers: directory
config.providers.directory.class: org.apache.kafka.common.config.provider.DirectoryConfigProvider
# The placeholder structure is directory:PATH:FILE-NAME. DirectoryConfigProvider reads and extracts the credentials from the mounted Secret in schema distributor configurations.
ssl.keystore.certificate.chain: "${directory:/opt/kafka/external-configuration/distribution-secret-cluster02:cert-chain.pem}"
ssl.keystore.key: "${directory:/opt/kafka/external-configuration/distribution-secret-cluster02:private-key.pem}"
ssl.truststore.certificates: "${directory:/opt/kafka/external-configuration/distribution-secret-cluster02:trusted-ca.pem}"
# Disable the tls settings for the cluster
tls: {}
cluster03:
# Skipping all other settings to illustrate needed settings
additionalClientConfigs:
# define the DirectoryConfigProvider
config.providers: directory
config.providers.directory.class: org.apache.kafka.common.config.provider.DirectoryConfigProvider
# The placeholder structure is directory:PATH:FILE-NAME. DirectoryConfigProvider reads and extracts the credentials from the mounted Secret in schema distributor configurations.
ssl.keystore.certificate.chain: "${directory:/opt/kafka/external-configuration/distribution-secret-cluster03:cert-chain.pem}"
ssl.keystore.key: "${directory:/opt/kafka/external-configuration/distribution-secret-cluster03:private-key.pem}"
ssl.truststore.certificates: "${directory:/opt/kafka/external-configuration/distribution-secret-cluster03:trusted-ca.pem}"
# Disable the tls settings for the cluster
tls: {}
How to enable JSON Logging in Distributor?
Writing the log levels in JSON format often allows for easier parsing of the logs by a log collector. The distributor can also write the log entries in JSON
Not all log lines in Distributor will be written as JSON. There are several subcomponents which will NOT write in JSON format to the console. This happens usually on starts. This may cause warnings from your log collection stack. |
To enable logging an extra configmap needs to be created and referenced as an external logging configuration
Json Logging Config Map
Create the following configmap in the same namespace as the Distributor.
kind: ConfigMap
metadata:
name: distributor-json-logging-generic
data:
log4j.properties: |-
### Configure logging to write to the console
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
### Configure logging to in Json format to the console
log4j.appender.CONSOLE.layout=net.logstash.log4j.JSONEventLayoutV1
### Default log level is set to INFO, and uses the CONSOLE appender
log4j.rootLogger=INFO, CONSOLE
### Specify some default log levels for the Distributor
log4j.logger.io.axual.distributor.common=INFO
log4j.logger.io.axual.distributor.message=INFO
log4j.logger.io.axual.distributor.offset=INFO
log4j.logger.io.axual.distributor.schema=INFO
log4j.logger.org.apache.kafka.clients.consumer=WARN
log4j.logger.org.apache.kafka.clients.producer=WARN
log4j.logger.org.apache.kafka.clients.admin=WARN
Enable external logging
Update your distributor values.yaml
to enable.
connect:
# Configure logging and log levels, see the Strimzi API documentation on how to configure https://strimzi.io/docs/operators/latest/configuring#type-KafkaConnectSpec-schema-reference
logging:
type: external
valueFrom:
configMapKeyRef:
name: distributor-json-logging-generic
key: log4j.properties
The distributor pods will be stopped by the Strimzi Cluster Operator when applying this config, and recreated with the config map mounted and Kafka Connect configured to use the specified log4j.properties
file.