Deploying Distributor

Distributor components

The Distributor consists of a Kafka Connect image container, the Axual Distributor connector plugins and a Helm chart controlling the deployment and the connector configurations.

The Axual Distributor components are:

Message Distributor

which distributes the records on the topics of the local Kafka cluster to a remote Kafka cluster

Offset Distributor

which distributes the committed offsets of clients from the local Kafka cluster to a special topic on the remote Kafka cluster

Offset Committer

reads message from Offset Distributors from a special topic on the local Kafka cluster and commits them in the local cluster

Schema Distributor

reads topics from the Confluent Schema Registry topic on the local Kafka cluster and produces them to a remote Kafka cluster

All distributor are aware of the Axual naming patterns used for multitenancy, and can transform topic and consumer group names to the correct pattern used by the remote Kafka cluster.

The Distributor Helm charts enables you to install and run the Kafka Connect components without starting any distributors, allowing for a step-by-step installation and activation of the different distribution component

Prerequisites

A separate Distributor is deployed for each Kafka Cluster of a tenant instance. The following components are needed to deploy a Distributor:

Kafka Cluster

The local cluster from which data is read and distributed to remote Kafka clusters

Strimzi Operator

The Strimzi Operator watches for new or changed Kafka Connect and Connector definitions and performs the actual deployment according to those definitions. The Distributor Helm charts use Strimzi to deploy and configure the correct containers and distribution configuration.

Runtime Client Certificate

The Distributor needs a client certificate to connect to the Kafka Cluster and access the correct topics and consumer groups. This certificate does not need to be a super user in the Kafka Cluster.

Initialisation Client TLS Secret

A TLS secret containing a client certificate which allows initialisation tor create new access control list entries and topics for distributor.

Installing the Distributor Helm Chart

The distributor helm charts are stored in a public registry maintained by Axual. To access the Helm chart and the images you need an account to access it.

Contact the Axual support team to request access.

The following example logs into the registry and installs the Axual Distributor Chart with a single values.yaml

helm registry login -u [your-user] registry.axual.io/axual-charts
helm  upgrade --install local-distributor oci://registry.axual.io/axual-charts/distributor --version 5.3.4 -f values.yaml

Deploying the Distributor

The Helm configuration options for Distributor is split into three sections:

Init

Configures the working of the initialisation jobs. It creates the required topics and access control list entries.

Connect

Configures the Kafka Connect deployment

Distribution

Configures the actual configuration for distribution

Configuring initialisation jobs

Both Kafka Connect and the distribution connector plugins need access to several topics and consumer groups for proper deployment. The init section configures which topics and access control list entries to create and which user should have access.

The initialisation scripts will use configuration from the connect and the distribution sections to determine the correct Kafka cluster, topic and consumer group names.

Example init configuration

The configuration below contains an example configuration for a user principal CN=Example distributor client for test instance cluster 1,OU=Infra,O=Axual. The Kubernetes secret cluster-1-root contains the key data to connect to the Kafka Cluster, and the secret cluster-1-ca contains the trusted certificate authorities.

If the Kafka cluster uses the Principal Chain Builder then the fully chain principal name should be used. In this example that is [0] CN=RootCA,OU=Infra,O=Axual, [1] CN=Intermediate CA 1,OU=Infra,O=Axual, [2] CN=Example distributor client for test instance cluster 1,OU=Infra,O=Axual

Initialisation configuration
# Initialise Kafka Cluster for Connect or Connector
init:
  # Initialisation calls are executed when enabled and the connect and/or distribution is also enabled
  # Default value is false
  enabled: true

  # Set the principal names matching the Distinguished Name of the Client Certificate
  # Set multiple names if distributors connecting from other clusters to this cluster use different certifcates
  principals:
    # Names without using Principal Chain Builder
    # The principal used by a distributor connecting from cluster 1
    - "User:CN=Example distributor client for test instance cluster 1,OU=Infra,O=Axual"
    # The principal used by a distributor connecting from cluster 2
    - "User:CN=Example distributor client for test instance cluster 2,OU=Infra,O=Axual"

    # Names using Principal Chain Builder
    - "User:[0] CN=RootCA,OU=Infra,O=Axual, [1] CN=Intermediate CA 1,OU=Infra,O=Axual, [2] CN=Example distributor client for test instance cluster 1,OU=Infra,O=Axual"
    - "User:[0] CN=RootCA,OU=Infra,O=Axual, [1] CN=Intermediate CA 2,OU=Infra,O=Axual, [2] CN=Example distributor client for test instance cluster 2,OU=Infra,O=Axual"

  # Define optional extra configuration options for the admin client. For properties, see https://kafka.apache.org/documentation/#adminclientconfigs
  additionalClientConfigs:
    # Set the initial TLS protocol to TLSv1.2
    ssl.protocol: TLSv1.2
    # Don't support pushing client metrics to server for initialisation calls
    enable.metrics.push: false

  # Secrets for connecting to Kafka Cluster to create topics and access control entries
  tls:
    # Name of and existing Secret containing the private key and certificate to access Kafka cluster
    keypairSecretName: "cluster-1-root"
    # The key of the Secret field containing the private key data
    keypairSecretKeyName: "key"
    # The key of the Secret field containing the public certificate chain for the key
    keypairSecretCertName: "cert"
    # Name of and existing Secret containing the list of trusted certificate authorities
    truststoreCaSecretName: "cluster-1-ca"
    # The key of the Secret container the trusted certificate authorities
    truststoreCaSecretCertName: "cacert"

  # Resource allocation for the init job pod
  #resources: {}
  # Defines the security options the container should be run with
  #securityContext: {}

  # Initialisation image information
  #image:
  #  registry: "registry.axual.io"
  #  repository: "axual/distributor"
  #  tag: "5.3.4-0.43.0"
  #  pullSecrets:
  #   - axual-credentials

Configuring the Kafka Connect deployment

The configuration of the Kafka Connect component of Distributor is mostly based on the Strimzi Kafka Connect resource definition.

The following information is needed to deploy connect

Kafka Cluster bootstrap servers

The bootstrap servers is a list of the hostname and port combination used to set up an initial connection to with the local Kafka Cluster. Make sure that the firewall and/or network policies allow access from Kafka Connect to all brokers in the Kafka cluster.

Kafka Cluster trusted certificate authorities

Kafka Connect needs to determine which Certificate Authorities are trusted when connecting to the Kafka Cluster. This can be as a Kubernetes Secret or in the values.yaml as PEM encoded data.

Client certificate for local Kafka cluster

The deployment needs the private key and certificate chain for the connection to the Kafka cluster. This can be as a Kubernetes Secret or in the values.yaml as PEM encoded data.

Topic and Consumer Group names

Kafka Connect needs three topics to store configurations, status and connector offset information. It also uses a consumer group to allow worker nodes to become aware of each other. The names need to be unique per distributor installation, and should not match the naming patterns of the local Kafka cluster to prevent distributing the data in each topic. Naming often starts with an underscore and contains the {instance-sn} short names.

Strimzi Operator version

The Strimzi Cluster Operator controls the deployment and operation of the Distributor. It requires that the base image of the Distributor is matching the operator, to prevent compatibility issues. A release of the Axual Distributor will support multiple Strimzi versions.

Example connect configuration

The following example will connect to the Kafka Cluster cluster01 in namespace kafka of the same Kubernetes cluster. It is deployed for tenant demo and instance prod and uses the Secrets demo-prod-distributor-client and kafka-cluster01-ca for connecting to the Kafka Cluster. The internal consumer group is _demo-prod-distribitor and the Connect topics are _demo-prod-distribitor-config, _demo-prod-distribitor-status and _demo-prod-distribitor-offset.

Strimzi 0.43.0 is used, so the distributor image tag is set to 5.3.4-0.43.0 to select the Strimzi 0.43.0 version of Distributor 5.3.4.

Connect Configuration
connect:
  # Which image and registry should be used for connect
  image:
    # Set the specific tag of the repository to use for the init containers
    tag: "5.3.4-0.43.0"

  # The number of connect workers to start for parallel processing
  replicas: 3

  # Rack aware assignment should be used
  rack:
    enabled: true
    topologyKey: topology.kubernetes.io/zone

  # The bootstrap url used to connect to the kafka cluster
  bootstrapServers: "cluster01-kafka-bootstrap.kafka:9093"
  groupId: "_demo-prod-distributor"
  # Contains the internal connect topics settings
  topics:
    # Make sure to set this to a proper value for your Kafka cluster. In production this is at least 3
    replicationFactor: 3
    config:
      name: "_demo-prod-distributor-config"
    status:
      name: "_demo-prod-distributor-status"
      partitions: 2
    offset:
      name: "_demo-prod-distributor-offset"
      partitions: 3

  # Set extra connect configuration values
  config:
    key.converter: JsonConverter
    value.converter: JsonConverter


  # SASL PLAIN/SCRAM-SHA-25
  sasl:
    # Set to true to enable a SASL connection
    enabled: false
    type: "PLAIN"
    username: "hello"
    password: "world"

  tls:
    enabled: true
    createCaCertsSecret: false
    # if createTruststoreCaSecret is true, set the CA certs below
    #    caCerts:
    #      one_ca.crt: |
    #        -----BEGIN CERTIFICATE-----
    #      other_ca.crt: |
    #        -----BEGIN CERTIFICATE-----
    # if createTruststoreCaSecret is false, the caCerts need to be set
    # with an existing secret (name) and the name of the cert inside the
    # secret
    # caSecret:
    #  secretName: your_custom_ca_secret
    #  keyForCertificate: your_custom_ca_cert
    caSecret:
      secretName: "cluster01-cluster-ca-cert"
      keyForCertificate: "ca.crt"

    # Configure authentication using a client certificate
    authentication:
      enabled: true
      createTlsClientSecret: false
      # if createTlsClientSecret is true, set the client key and certificate chain below
      #clientCert: |
      #  -----BEGIN CERTIFICATE-----
      #  -----END CERTIFICATE-----
      #clientKey: |
      #  -----BEGIN PRIVATE KEY-----
      #  -----END PRIVATE KEY-----

      # if a TLS secret already exists with the client credentials, provide the name here
      secretName: "demo-prod-cluster01-distributor"

  # Configure logging and log levels, see the Strimzi API documentation on how to configure https://strimzi.io/docs/operators/latest/configuring#type-KafkaConnectSpec-schema-reference
  logging:
    type: inline
    loggers:
      log4j.rootLogger: "INFO"
      log4j.logger.io.axual.distributor.common: "INFO"
      log4j.logger.io.axual.distributor.message: "INFO"
      log4j.logger.io.axual.distributor.offset: "INFO"
      log4j.logger.io.axual.distributor.schema: "INFO"
      log4j.logger.org.apache.kafka.connect.runtime.rest: "WARN"

  # expose metrics using the JMX Prometheus Exporter
  metrics:
    enabled: false

  # Instruct Prometheus to scrape the metrics using a PodMonitor resource
  podMonitor:
    enabled: false
  #  additionalLabels:
  #    label/my1: hi
  #    lbl2: there
  #  additionalAnnotations:
  #    annotation/my1: hi
  #    ann2: there
  #  interval: 60s
  #  scrapeTimeout: 12s

  # Create Prometheus Alert Manager Rules
  prometheusRule:
    enabled: false
  #  additionalAnnotations:
  #    my-annotation: help
  #    some-annotation: none
  #  additionalLabels:
  #    my-label: lab-help
  #    some-label: 123
  #  rules:
  #    - alert: MyCustomRuleName
  #      annotations:
  #        message: '{{ "{{ $labels.connector }}" }} send rate has dropped to 0'
  #      expr: sum by (connector) ( kafka_connect_sink_task_sink_record_send_rate{connector=~".*-message-distributor-.*"}) == 0
  #      for: 5m
  #      labels:
  #        severity: high
  #        callout: false

  # Add additional configuration here, see the Strimzi API documentation on how to use them https://strimzi.io/docs/operators/latest/configuring#type-KafkaConnectSpec-schema-reference
  #resources: {}
  #livenessProbe: {}
  #readinessProbe: {}
  #jvmOptions: {}
  #jmxOptions: {}
  #tracing: {}
  #template: {}
  #build: {}

Configuring distribution

A distribution model is needed to properly set up the data replication between clusters. The Axual multi level distribution model is designed to distribute the data to all clusters with a minimum of number of steps. It is designed to minimize the crossing of network boundaries and reducing the amount of data send from one cluster to another.

Requirements distribution

Tenant and Instance short names

Each Distributor deployment is dedicated for a single tenant instance. The shortnames are often used in the naming patterns of a cluster, and is also used to identify the connector.

Distribution model

Design a distribution model and determine what the static target clusters are for each. This information is needed to determine the contents of the following requirements.

The cluster names used in the model are static, renaming a cluster, or removing and adding with a different name later can cause in existing data not being distributed, or being distributed again.
Target cluster connectivity information

The connection information needed to reach each if the remote Kafka Cluster. This includes bootstrap servers, security protocols, but can also contain specific settings to work around network issues like firewalls or routers cutting off inactive connections.

Trusted certificate authorities for each remote Kafka cluster

Kafka Connect needs to determine which Certificate Authorities are trusted when connecting to each of the remote Kafka Clusters. This must be specified in the values.yaml as PEM encoded data.

Client certificate for each remote Kafka cluster

The deployment needs the private key and certificate chain for the connection to each of the remote Kafka clusters. his must be specified in the values.yaml as PEM encoded data.

Distribution uses references to cluster configuration in a map. This allows potential target clusters to be defined next to the active target clusters. Doing this allows for quicker reconfiguration of distribution in case one of the clusters goes offline. You just have to update the reference for the target cluster.

Configure local and remote clusters

The following example adds the cluster information to the values.yaml.

In this example there are three Kafka clusters:

cluster01

The local Kafka Cluster from which the records and offsets are read, and distributed to the remote Kafka Clusters.

cluster02

The first remote Kafka Cluster.

cluster03

The second remote Kafka Cluster.

The Distributor configurations running on Cluster02 and Cluster03 are not shown on this page, but they should also be running and distributing messages to Cluster01.
distribution:
  # If set to false, all distribution are disabled. If true, the individual distribution enabling flags are evaluated
  enabled: true

    # Setting the tenant information
  tenantShortName: "demo"
  instanceShortName: "prod"

  # Prefix values for topic and group ACLs on the local Kafka Cluster.
  prefixAclSettings:
    # The local topic pattern is {tenant}-{instance}-{environment}-{topic}. environment and topic are dynamic and are ignored for the prefix ACL. Results in all topics with a name starting with this prefix to be distributed
    topicPrefix: "demo-prod-"
    # The local group pattern is {tenant}-{instance}-{environment}-{group.id}. environment and group id are dynamic and are ignored for the prefix ACL. Results in all offsets for a consumer group with a name starting with this prefix to be distributed
    groupPrefix: "demo-prod-"

  # Information about the source cluster. This is the cluster Kafka Connect is connecting with and distributors read their data from
  sourceCluster:
    # Cluster name, this is also used in the connector and consumer group names
    name: cluster01
    # Contains instance topic information for the cluster
    topics:
      # The replication factor to use for the local distributor specific topics
      # Make sure to set this to a proper value for your Kafka cluster. In production this is at least 3
      replicationFactor: 3
      # Timestamps topic for offset distribution, name is _<tenant>-<instance>-consumer-timestamps
      timestamps:
        nameOverride: ""
        # The number of partitions to use for the timestamps topic
        partitions: 25

    # The resource naming pattern used on this cluster
    topicPattern: '{tenant}-{instance}-{environment}-{topic}'
    groupPattern: '{tenant}-{instance}-{environment}-{group.id}'
    # If a pattern value is missing the value specified in this dictionary should be used
    patternDefaultValues:
      environment: prod

    # Connection information, used by offset distributor and committer to update/query data from the source cluster
    bootstrapServers: "cluster01-kafka-bootstrap.kafka:9093"

    # What is the protocol for the Kafka connection using the bootstrap server.
    # Valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, SASL
    securityProtocol: "SSL"

    # Authentication configuration is provided separately, but other client connection settings
    # can be specified here. For example, TLS protocols and buffer sizes
    additionalClientConfigs:
      ssl.enabled.protocols: TLSv1.2


    # If the security protocol is SASL_PLAINTEXT or SASL_SSL then this section must be used
    # sasl:
      # Which SASL mechanism should be used. Accepts PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512
    #  mechanism: SCRAM-SHA-512
    #  username: my-user
    #  password: my-password

    # Contains the tls settings for the cluster
    tls:
      # A list of PEM encoded CA certificates to use when connecting to the Kafka cluster
      # This is used to verify the server certificate
      caCerts:
        - |
          -----BEGIN CERTIFICATE-----
          -----END CERTIFICATE-----

      # The PEM encoded client certificate chain to use if Mutual TLS, or certificate based authentication is required
      # Only when both clientCert and clientKey are provided will Mutual TLS be enabled
      clientCert: |
        -----BEGIN CERTIFICATE-----
        -----END CERTIFICATE-----

      # A PEM encoded PKCS#8 key to use if Mutual TLS, or certificate based authentication is required
      # Encrypted keys are supported, the password should go in the clientKeyPassword configuration
      # Only when both clientCert and clientKey are provided will Mutual TLS be enabled
      clientKey: |
        -----BEGIN PRIVATE KEY-----
        -----END PRIVATE KEY-----

      # The password for the clientKey, only set this if the clientKey is encrypted
      # clientKeyPassword: somethingVerySecret

  # Information about all potential target clusters. These are the cluster where data will be written to
  clusters:
    # The key/id of the cluster to use. Recommend to keep this the same as the cluster name
    cluster02:
      # Cluster name, this is also used in the connector and consumer group names
      name: cluster02
      # Contains instance topic information for the cluster
      topics:
        # Timestamps topic for offset distribution
        timestamps:
          # Use name override if the target cluster does not use default name of _<tenant>-<instance>-consumer-timestamps
          nameOverride: ""
        # Schema topic for schema distribution
        schemas:
          # Use name override if the target cluster does not use default name of _<tenant>-<instance>-schemas
          nameOverride: ""

      # The resource naming pattern used on this cluster
      topicPattern: '{tenant}-{instance}-{environment}-{topic}'
      groupPattern: '{tenant}-{instance}-{environment}-{group.id}'
      # If a pattern value is missing the value specified in this dictionary should be used
      patternDefaultValues:
        environment: prod

      # Connection information, used by connectors to connect to the target
      bootstrapServers: "cluster02-kafka-bootstrap.region2.demo.com:9093"
      # What is the protocol for the Kafka connection using the bootstrap server.
      # Valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
      securityProtocol: SSL

      # Authentication configuration is provided separately, but other client connection settings
      # can be specified here. For example, TLS protocols and buffer sizes
      additionalClientConfigs:
        ssl.enabled.protocols: TLSv1.2

      # If the security protocol is SASL_PLAINTEXT or SASL_SSL then this section must be used
      #sasl:
      #  # Which SASL mechanism should be used. Accepts PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512
      #  mechanism: SCRAM-SHA-512
      #  username: my-user
      #  password: my-password

      # Contains the tls settings for the cluster
      tls:
        # A list of PEM encoded CA certificates to use when connecting to the target Kafka cluster
        # This is used to verify the server certificate
        caCerts:
          - |
            -----BEGIN CERTIFICATE-----
            -----END CERTIFICATE-----

        # The PEM encoded client certificate chain to use if Mutual TLS, or certificate based authentication is required
        # Only when both clientCert and clientKey are provided will Mutual TLS be enabled
        clientCert: |
          -----BEGIN CERTIFICATE-----
          -----END CERTIFICATE-----

        # A PEM encoded PKCS#8 key to use if Mutual TLS, or certificate based authentication is required
        # Only when both clientCert and clientKey are provided will Mutual TLS be enabled
        # Encrypted keys are supported, the password should go in the clientKeyPassword configuration
        clientKey: |
          -----BEGIN PRIVATE KEY-----
          -----END PRIVATE KEY-----

        # The password for the clientKey, only set this if the clientKey is encrypted
        #clientKeyPassword: somethingVerySecret

    # The key/id of the cluster to use. Recommend to keep this the same as the cluster name
    cluster03:
      # Cluster name, this is also used in the connector and consumer group names
      name: cluster3
      # Contains instance topic information for the cluster
      topics:
        # Timestamps topic for offset distribution
        timestamps:
          # Use name override to not use default name of _<tenant>-<instance>-consumer-timestamps
          nameOverride: ""
        # Schema topic for schema distribution
        schemas:
          # Use name override to not use default name of _<tenant>-<instance>-schemas
          nameOverride: ""

      # The resource naming pattern used on this cluster
      topicPattern: '{tenant}-{instance}-{environment}-{topic}'
      groupPattern: '{tenant}-{instance}-{environment}-{group.id}'
      # If a pattern value is missing the value specified in this dictionary should be used
      patternDefaultValues:
        environment: prod

      # Connection information, used by connectors to connect to the target
      bootstrapServers: "cluster03-kafka-bootstrap.region3.demo.com:9093"

      # What is the protocol for the Kafka connection using the bootstrap server.
      # Valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, SASL
      securityProtocol: SSL

      # Authentication configuration is provided separately, but other client connection settings
      # can be specified here. For example, TLS protocols and buffer sizes
      additionalClientConfigs:
        ssl.enabled.protocols: TLSv1.2

      # If the security protocol is SASL_PLAINTEXT or SASL_SSL then this section must be used
      #sasl:
      #  # Which SASL mechanism should be used. Accepts PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512
      #  mechanism: SCRAM-SHA-512
      #  username: my-user
      #  password: my-password

      # Contains the tls settings for the cluster
      tls:
        # A list of PEM encoded CA certificates to use when connecting to the target Kafka cluster
        # This is used to verify the server certificate
        caCerts:
          - |
            -----BEGIN CERTIFICATE-----
            -----END CERTIFICATE-----

        # The PEM encoded client certificate chain to use if Mutual TLS, or certificate based authentication is required
        # Only when both clientCert and clientKey are provided will Mutual TLS be enabled
        clientCert: |
          -----BEGIN CERTIFICATE-----
          -----END CERTIFICATE-----

        # A PEM encoded PKCS#8 key to use if Mutual TLS, or certificate based authentication is required
        # Only when both clientCert and clientKey are provided will Mutual TLS be enabled
        # Encrypted keys are supported, the password should go in the clientKeyPassword configuration
        clientKey: |
          -----BEGIN PRIVATE KEY-----
          -----END PRIVATE KEY-----

        # The password for the clientKey, only set this if the clientKey is encrypted
        #clientKeyPassword: somethingVerySecret

Configuring Schema Distributor

Only start the Schema Distributor on the primary cluster. Schema Distributor is meant to synchronize schema topics from a leading schema registry to following schema registries. The other way around may delete schemas from the leading topic and enabling Schema Distributors on multiple clusters should be avoided to prevent looping behaviour and schema id mismatches.

The following schema distributor configuration distributes schemas to remote Kafka cluster cluster01 and cluster02. The connection information and topic names are retrieved from the sourceCluster and clusters` configuration set in Configure local and remote clusters.

The configuration map from additionalClientConfigs is merged with the clusters.[].additionalClientConfigs and used to configure the producer to the remote Kafka Cluster.

distribution:
  # Schema Distribution
  schemaDistributor:
    enabled: true
    # Which clusters should it distribute to, uses the {clusters.[].name} to find cluster information
    targetClusterNames:
      - "cluster02"
      - "cluster03"

    # Override the settings for the internal Kafka Connect consumer for the tasks. This can be used to tweak performance
    consumerOverrides:
      # Override how many records will be read in a single poll of Kafka Connect
      max.poll.records: 50

    # Any additional client configs needed to connect to the remote cluster.
    # For example, TLS protocols and buffer sizes and retries
    additionalClientConfigs:
      acks: all
      retries: 5
      max.in.flight.requests.per.connection: 5
      enable.idempotence: true
      ssl.enabled.protocols: TLSv1.2,TLSv1.3
      ssl.endpoint.identification.algorithm: ''

Configuring Offset Committer

The offset committer reads offset timestamps for consumer groups from the local Kafka Cluster, translates those to offsets and commits the offsets on the local Kafka Cluster.

The offset committer can be started immediately since it relies on data send to the local timestamps topic by the Offset Distributors running on remote clusters. When those are not active the Offset Committer will just wait.

The following configuration enables the Offset Committer, running with 5 parallel tasks. The setting distributionOffsetMs determines how much time to subtract from the received timestamp before determining the offset to commit. This ensures that the client application does not miss records when migrating from one cluster to another.

Resolving the names of consumer groups and topic names can be expensive. A cache is used to store the resolved names. The size of the caches can be set with topicCacheSize and groupCacheSize

The configuration map from offsetCommiter.additionalClientConfigs is merged with the sourceCluster.additionalClientConfigs and used to configure the clients needed to look up and commit offsets on the local Kafka Cluster.

distribution:
  offsetCommitter:
    enabled: true
    # how many parallel tasks should the connector have
    maxTasks: 5

    # Setting this to true will skip any connectivity check when creating and validating the connector config
    # skipConnectionValidation: true

    # On severe failures there will be continuous processing errors.
    # Internal connector resources can be reinitialized when reaching the thresholds defined here
    reinitializeOnContinuousError:
      # Reinitialize if the continuous error count reaches this number
      maximumCount: 5000
      # Reinitialize if the continues error count has existed for this long
      maximumTimeMs: 30000

    # The offset committer reads timestamps that relate to records produced on the source topic.
    # This value pushed the timestamp back to make sure that any application will continue before or at
    # the offset of the original record, guaranteeing at-least-once consuming of records
    # If the message distribution to this cluster slows down a higher value might be needed
    distributionOffsetMs: 30000

    # The maximum size of the cache used to optimize group and topic resolving.
    topicCacheSize: 1000
    groupCacheSize: 1000

    # Override the settings for the internal Kafka Connect consumer for the tasks. This can be used to tweak performance
    consumerOverrides:
      # Override how many records will be read in a single poll of Kafka Connect
      max.poll.records: 50

    # Any additional client configs needed to connect to the source cluster to commit the offsets.
    # For example, TLS protocols and buffer sizes
    additionalClientConfigs:
      ssl.enabled.protocols: TLSv1.2,TLSv1.3
      ssl.endpoint.identification.algorithm: ''

Configuring Message Distributor

The message distributor uses the distribution model to determine if a record should be distributed or not. The configuration for distributor is split in separate configuration per level.

Each level distributes to a remote Kafka Cluster, and a level consists of Message and Offset Distributor settings.

Message Distributor requires a regular expression to subscribe to the local topics which should be distributed. This regular expression should match the topic pattern to prevent issues during distribution.

The following example configures a level 1 distribution to cluster01 using 5 tasks and a level 2 distribution to cluster02 using 2 tasks.

The topicRegex is set to demo-prod-.* since tenant and instance are static values for the topic pattern used. All topics for this instance begin with demo-prod-.

The configuration map from levels.[].messageDistributor.additionalClientConfigs is merged with the clusters.[].additionalClientConfigs and used to configure the clients produce records to the remote Kafka Cluster.

distribution:
  # Axual uses a multi level distribution model to make sure records and offsets arrive at the correct cluster
  # This is a dictionary containing the distribution levels relevant to the source cluster.
  # Each level can be activated independent and contains settings for the offset and message distributors for that
  # level
  # The level number is 1 to 32
  levels:
    # level numbers should be numbers, and are used in connector and group naming
    1:
      enabled: true
      # To which cluster does the data go, uses the {clusters.[].name} to find cluster information
      targetClusterName: "cluster02"
      # The message distributor settings to copy records from the source cluster to the target cluster
      messageDistributor:
        enabled: true
        # how many parallel tasks should the connector have
        maxTasks: 5
        # which topic pattern should be used for subscription. Form is dependent on to {sourceCluster.topicPattern}
        topicRegex: 'demo-prod-.*'

        # The maximum size of the cache used to optimize topic resolving.
        topicCacheSize: 1000

        # Override the settings for the internal Kafka Connect consumer for the tasks. This can be used to tweak performance
        consumerOverrides:
          # Override how many records will be read in a single poll of Kafka Connect
          max.poll.records: 50

        # Any additional client configs needed to connect to the target cluster.
        # For example, TLS protocols and buffer sizes
        additionalTargetClientConfigs: {}
    2:
      enabled: true
      targetClusterName: "cluster03"
      messageDistributor:
        enabled: true
        maxTasks: 2
        topicRegex: 'demo-prod-.*'
        topicCacheSize: 1000
        consumerOverrides:
          max.poll.records: 50
        additionalTargetClientConfigs: {}

Configuring Offset Distributor

The Offset Distributor uses the distribution model to determine if a committed offset should be distributed or not. The configuration for this distributor is split in separate configuration per level.

Each level controls the distribution to a remote Kafka Cluster, and a level configuration contains both Offset and Message Distributor settings.

The offset distributor reads from the __consumer_offsets on the local Kafka Cluster, filters out all commits that are not for the specific tenant and instance, or that should not be distributed according to the distribution model.

Offset commits can happen often for a consumer group, as this is controlled by the consumer application and the load on the partition. The offset distributor only produces a single timestamp in a time window for each consumer group. The window size is set using the windowSizeMs property.

The following example configures a level 1 distribution to cluster01 using 5 tasks and a level 2 distribution to cluster02 using 5 tasks. The window size for both is 15 seconds (15000 milliseconds).

The configuration map from level[].offsetDistributor.additionalLocalClientConfigs is merged with the sourceCluster.additionalClientConfigs and used to configure the clients needed to look up and commit offsets on the local Kafka Cluster

The configuration map from level[].offsetDistributor.additionalTargetClientConfigs is merged with the clusters.[].additionalClientConfigs and used to configure the clients produce records to the remote Kafka Cluster.

distribution:
  # Axual uses a multi level distribution model to make sure records and offsets arrive at the correct cluster
  # This is a dictionary containing the distribution levels relevant to the source cluster.
  # Each level can be activated independent and contains settings for the offset and message distributors for that
  # level
  # The level number is 1 to 32
  levels:
    # level numbers should be numbers, and are used in connector and group naming
    1:
      enabled: true
      # To which cluster does the data go, uses the {clusters.[].name} to find cluster information
      targetClusterName: "cluster02"
      # The offset distributor settings to distribute offsets from the source cluster as a timestamp
      # to the target cluster where it can be committed.
      # The distributor produces to the timestamps topic on te target cluster
      # To prevent a flood of offset commits only the latest value for a specific consumer group and topic partition
      # combination in a time window is translated and produced as a timestamp to the remote cluster
      offsetDistributor:
        enabled: true
        # how many parallel tasks should the connector have
        maxTasks: 5

        # The size of the window to group offset commits in.
        # Grouping is done on group.id+topic+partition.
        windowSizeMs: 15000

        # The maximum size of the cache used to optimize group and topic resolving.
        topicCacheSize: 1000
        groupCacheSize: 1000

        # Override the settings for the internal Kafka Connect consumer for the tasks. This can be used to tweak performance
        consumerOverrides:
          # Override how many records will be read in a single poll of Kafka Connect
          max.poll.records: 50

        # Any additional client configs needed to connect to the source cluster.
        # For example, TLS protocols and buffer sizes
        # The local connection is used to determine the timestamps of a specific record
        additionalLocalClientConfigs: {}

        # Any additional client configs needed to connect to the target cluster.
        # For example, TLS protocols and buffer sizes
        additionalTargetClientConfigs: {}
    2:
      enabled: true
      targetClusterName: "cluster03"
      offsetDistributor:
        enabled: true
        maxTasks: 5
        windowSizeMs: 15000
        topicCacheSize: 1000
        groupCacheSize: 1000
        consumerOverrides:
          max.poll.records: 50
        additionalLocalClientConfigs: {}
        additionalTargetClientConfigs: {}

Order of deployment of the Distributor connectors

The order to start connectors is important. The normal order is:

  1. Schema Distributor on the cluster where Self Service registers the schema

  2. Offset Committer runs on all clusters and looks up and commits offsets to the local cluster.

  3. Message Distributor to distribute all records to the target cluster

  4. Wait for Message Distributor to have distributed all topics and is keeping up with new production. This is needed to ensure that Offset Distribution does refer to message offsets that haven’t been distributed

  5. Offset Distributor to determine the timestamp and distribute that timestamp to remote clusters.

How to configure remote SSL security through K8S Secret resource?

In case you want to use a K8S Secret resources to configure remote SSL config for the Distributors you have to add the following code block:

The K8S Secret with the client certificate, private key and CA certificates in PEM format should be available when the Distributor is started.

Modify your values.yaml and update externalConfiguration in distributor configuration as follows:

connect:
  # You can mount K8S ConfigMaps or Secrets into a distributor pod as environment variables or volumes. Volumes and environment variables are configured in the externalConfiguration property
  # for full documentation visit: https://strimzi.io/docs/operators/latest/configuring.html#type-ExternalConfiguration-reference
  externalConfiguration:
    volumes:
      - name: distribution-secret-cluster02
        secret:
          secretName: <distribution-secret-name>
      - name: distribution-secret-cluster03
        secret:
          secretName: <distribution-secret-name>

and update the distributor configuration to remove the explicit client certificate configuration and add additional configuration

Using the DirectoryConfigProvider to load the use the Kubernetes Secrets
distribution:
  clusters:
    cluster02:
      # Skipping all other settings to illustrate needed settings
      additionalClientConfigs:
        # define the DirectoryConfigProvider
        config.providers: directory
        config.providers.directory.class: org.apache.kafka.common.config.provider.DirectoryConfigProvider
        # The placeholder structure is directory:PATH:FILE-NAME. DirectoryConfigProvider reads and extracts the credentials from the mounted Secret in schema distributor configurations.
        ssl.keystore.certificate.chain: "${directory:/opt/kafka/external-configuration/distribution-secret-cluster02:cert-chain.pem}"
        ssl.keystore.key: "${directory:/opt/kafka/external-configuration/distribution-secret-cluster02:private-key.pem}"
        ssl.truststore.certificates: "${directory:/opt/kafka/external-configuration/distribution-secret-cluster02:trusted-ca.pem}"
      # Disable the tls settings for the cluster
      tls: {}

    cluster03:
      # Skipping all other settings to illustrate needed settings
      additionalClientConfigs:
        # define the DirectoryConfigProvider
        config.providers: directory
        config.providers.directory.class: org.apache.kafka.common.config.provider.DirectoryConfigProvider
        # The placeholder structure is directory:PATH:FILE-NAME. DirectoryConfigProvider reads and extracts the credentials from the mounted Secret in schema distributor configurations.
        ssl.keystore.certificate.chain: "${directory:/opt/kafka/external-configuration/distribution-secret-cluster03:cert-chain.pem}"
        ssl.keystore.key: "${directory:/opt/kafka/external-configuration/distribution-secret-cluster03:private-key.pem}"
        ssl.truststore.certificates: "${directory:/opt/kafka/external-configuration/distribution-secret-cluster03:trusted-ca.pem}"
      # Disable the tls settings for the cluster
      tls: {}

How to enable JSON Logging in Distributor?

Writing the log levels in JSON format often allows for easier parsing of the logs by a log collector. The distributor can also write the log entries in JSON

Not all log lines in Distributor will be written as JSON. There are several subcomponents which will NOT write in JSON format to the console. This happens usually on starts. This may cause warnings from your log collection stack.

To enable logging an extra configmap needs to be created and referenced as an external logging configuration

Json Logging Config Map

Create the following configmap in the same namespace as the Distributor.

kind: ConfigMap
metadata:
 name: distributor-json-logging-generic
data:
 log4j.properties: |-
   ### Configure logging to write to the console
   log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
   ### Configure logging to in Json format to the console
   log4j.appender.CONSOLE.layout=net.logstash.log4j.JSONEventLayoutV1
   ### Default log level is set to INFO, and uses the CONSOLE appender
   log4j.rootLogger=INFO, CONSOLE
   ### Specify some default log levels for the Distributor
   log4j.logger.io.axual.distributor.common=INFO
   log4j.logger.io.axual.distributor.message=INFO
   log4j.logger.io.axual.distributor.offset=INFO
   log4j.logger.io.axual.distributor.schema=INFO
   log4j.logger.org.apache.kafka.clients.consumer=WARN
   log4j.logger.org.apache.kafka.clients.producer=WARN
   log4j.logger.org.apache.kafka.clients.admin=WARN

Enable external logging

Update your distributor values.yaml to enable.

connect:
  # Configure logging and log levels, see the Strimzi API documentation on how to configure https://strimzi.io/docs/operators/latest/configuring#type-KafkaConnectSpec-schema-reference
  logging:
    type: external
    valueFrom:
      configMapKeyRef:
        name: distributor-json-logging-generic
        key: log4j.properties

The distributor pods will be stopped by the Strimzi Cluster Operator when applying this config, and recreated with the config map mounted and Kafka Connect configured to use the specified log4j.properties file.