Message Distributor

Enabling Message Distributor in HELM Charts

Distributor version 5.2.0 introduced static message distribution. Previous releases used the Axual Discovery API to provide the cluster information of the target cluster. The static configuration allows the operator to provide this information directly in the configuration.

By default, Message Distributor is enabled. When you enable distributor, Message Distributor will be automatically deployed by Strimzi cluster operator. To disable Message Distributor, set global configuration global.instance.messageDistributor.enabled to false.

Usage:

instance:
  enabled: true
  messageDistributor:
    enabled: true

Configuring Message Distributor in Static mode

Prerequisite: Make sure that the Distributor version 5.2.0 or newer has been configured and is running

The default values.yaml is provided for the Message Distributor that is suitable for a local k8s deployment. You can override below Message Distributor configurations in your values.yaml as follows:

global:
  cluster:
    name: local
    topicPattern: "{tenant}-{instance}-{environment}-{topic}"
  tenant:
    name: "Axual B.V."
    shortName: axual
  instance:
    enabled: true
    fullName: axual-local
    name: local

instance:
  distribution:
    #  If cluster, tenant & instance are not overridden, then they will be picked up from global settings.
    #  tenantOverride:
    #  instanceOverride:
    #  clusterOverride:

    messageDistributor:
      # Message distributor name. This will resolve to {tenant}-{instance}-{distribution.name}-level-{distribution.config.level}
      name: message-distributor
      # MessageDistributor class
      class: MessageDistributor
      # Number of tasks handling message distribution (consumer threads)
      tasksMax: 1

      # `topics.regex` default is `{{ .Values.global.instance.fullName }}-.*`
      #  topicsRegexOverride:

      # add/override any message distributor configuration here
      config:
        # Set the distributor mode, valid values are `static` and `discovery` defaults to `discovery`
        distributor.mode: static
        # Set up remote bootstrap servers, the name and topic pattern of the target cluster
        remote.bootstrap.servers: remote-kafka-bootstrap:9093
        target.cluster: "remote"
        target.topic.pattern: "{instance}-{environment}-{topic}"

        # Distribution level
        level: 1
        environment: example

        # converter config
        header.converter: org.apache.kafka.connect.converters.ByteArrayConverter
        key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
        value.converter: org.apache.kafka.connect.converters.ByteArrayConverter

        # transformer config
        transforms: CopyFlagFilter,CopyFlagUpdate
        # Configuration of transformer CopyFlagFilter, drops a record if it is already distributed
        transforms.CopyFlagFilter.type: io.axual.distributor.transform.CopyFlagFilter
        transforms.CopyFlagFilter.level: 1

        # Configuration of transformer CopyFlagUpdate, marks the record that this level of distribution is set.
        transforms.CopyFlagUpdate.type: io.axual.distributor.transform.CopyFlagUpdate
        transforms.CopyFlagUpdate.level: 1

        # remote cluster config for producer
        remote.acks: all
        remote.value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
        remote.key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
        remote.max.in.flight.requests.per.connection: 1
        remote.retries: 0
        remote.max.request.size: 2097152
        remote.send.buffer.bytes: 2097152
        remote.receive.buffer.bytes: 2097152
        remote.batch.size: 524288
        remote.linger.ms: 250
        remote.request.timeout.ms: 30000

        # Controls the consumer settings for the distributor task to tweak consume sizes
        consumer.override.max.poll.records: 500
#        consumer.override.fetch.min.bytes: 1
#        consumer.override.fetch.max.bytes: 52428800
#        consumer.override.fetch.max.wait.ms: 500
#        consumer.override.metadata.max.age.ms: 30000
#        consumer.override.max.partition.fetch.bytes: 1048576
#        consumer.override.connections.max.idle.ms: 540000
#        consumer.override.isolation.level: read_committed
#        consumer.override.receive.buffer.bytes: 65536
#        consumer.override.send.buffer.bytes: 131072

        # remote cluster security settings
        remote.security.protocol: SSL
        remote.ssl.protocol: TLSv1.2
        remote.ssl.keystore.type: PEM
        remote.ssl.truststore.type: PEM
        remote.ssl.endpoint.identification.algorithm:
        # PEM string format certificate-chain
        remote.ssl.keystore.certificate.chain: ...
        # PEM string format private key
        remote.ssl.keystore.key: ...
        # PEM string format CA certificates
        remote.ssl.truststore.certificates: ...

Configuring Message Distributor in Discovery mode

Prerequisite: Make sure that the following services are up and running before starting message distributor in discovery mode:

  • Distributor

  • Discovery API, with remote kafka clusters configured.

The default values.yaml is provided for the Message Distributor that is suitable for a local k8s deployment. You can override below Message Distributor configurations in your values.yaml as follows:

# Global settings defined for the tenant, instance and cluster
global:
  cluster:
    name: local
    topicPattern: "{tenant}-{instance}-{environment}-{topic}"
  tenant:
    name: "Axual B.V."
    shortName: axual
  instance:
    enabled: true
    fullName: axual-local
    name: local

instance:
  distribution:
    #  If cluster, tenant & instance are not overridden, then they will be picked up from global settings.
    #  tenantOverride:
    #  instanceOverride:
    #  clusterOverride:

    messageDistributor:
      # Message distributor name. This will resolve to {tenant}-{instance}-{distribution.name}-level-{distribution.config.level}
      name: message-distributor
      # MessageDistributor class
      class: MessageDistributor
      # Number of tasks handling message distribution (consumer threads)
      tasksMax: 1

      # `topics.regex` default is `{{ .Values.global.instance.fullName }}-.*`
      #  topicsRegexOverride:

      # add/override any message distributor configuration here
      config:
        # Set the distributor mode, valid values are `static` and `discovery` defaults to `discovery`
        distributor.mode: discovery

        # discovery-api url
        endpoint: https://axual-local-discovery-api:29000
        # Retry settings for discovery api client, delay is in millisconds
        discovery.api.retry.count: 100
        discovery.api.retry.delay : 5000

        # Distribution level
        level: 1
        environment: example

        # converter config
        header.converter: org.apache.kafka.connect.converters.ByteArrayConverter
        key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
        value.converter: org.apache.kafka.connect.converters.ByteArrayConverter

        # transformer config
        transforms: CopyFlagFilter,CopyFlagUpdate
        # Configuration of transformer CopyFlagFilter, drops a record if it is already distributed
        transforms.CopyFlagFilter.type: io.axual.distributor.transform.CopyFlagFilter
        transforms.CopyFlagFilter.level: 1

        # Configuration of transformer CopyFlagUpdate, marks the record that this level of distribution is set.
        transforms.CopyFlagUpdate.type: io.axual.distributor.transform.CopyFlagUpdate
        transforms.CopyFlagUpdate.level: 1

        # remote cluster config for producer
        remote.acks: all
        remote.value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
        remote.key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
        remote.max.in.flight.requests.per.connection: 1
        remote.retries: 0
        remote.max.request.size: 2097152
        remote.send.buffer.bytes: 2097152
        remote.receive.buffer.bytes: 2097152
        remote.batch.size: 524288
        remote.linger.ms: 250
        remote.request.timeout.ms: 30000

        # Controls the consumer settings for the distributor task to tweak consume sizes
        consumer.override.max.poll.records: 500
#        consumer.override.fetch.min.bytes: 1
#        consumer.override.fetch.max.bytes: 52428800
#        consumer.override.fetch.max.wait.ms: 500
#        consumer.override.metadata.max.age.ms: 30000
#        consumer.override.max.partition.fetch.bytes: 1048576
#        consumer.override.connections.max.idle.ms: 540000
#        consumer.override.isolation.level: read_committed
#        consumer.override.receive.buffer.bytes: 65536
#        consumer.override.send.buffer.bytes: 131072


        # remote cluster security settings
        remote.security.protocol: SSL
        remote.ssl.protocol: TLSv1.2
        remote.ssl.keystore.type: PEM
        remote.ssl.truststore.type: PEM
        remote.ssl.endpoint.identification.algorithm:
        # PEM string format certificate-chain
        remote.ssl.keystore.certificate.chain: ...
        # PEM string format private key
        remote.ssl.keystore.key: ...
        # PEM string format CA certificates
        remote.ssl.truststore.certificates: ...

How to configure remote SSL security through K8S Secret resource?

In case you want to use a K8S Secret resource to configure remote SSL config remote.ssl.keystore.certificate.chain, remote.ssl.keystore.key & remote.ssl.truststore.certificates, you have to add following code block:

It is assumed a K8S Secret with the client certificate, private key and CA certificates in PEM format is already available.

Modify your values.yaml and update externalConfiguration in distributor configuration as follows:

core:
  distributor:
    # You can mount K8S ConfigMaps or Secrets into a distributor pod as environment variables or volumes. Volumes and environment variables are configured in the externalConfiguration property
    # for full documentation visit: https://strimzi.io/docs/operators/latest/configuring.html#type-ExternalConfiguration-reference
    externalConfiguration:
      volumes:
        - name: distribution-secret
          secret:
            secretName: <distribution-secret-name>

and update following message distributor configuration:

instance:
  distribution:
    messageDistributor:
      config:
        # The placeholder structure is directory:PATH:FILE-NAME. DirectoryConfigProvider reads and extracts the credentials from the mounted Secret in message distributor configurations.
        remote.ssl.keystore.certificate.chain: "${directory:/opt/kafka/external-configuration/distribution-secret:<keystore certificate chain file name defined in secret>}"
        # example: "${directory:/opt/kafka/external-configuration/distribution-secret:remote_keystore_certificate_chain.crt}"
        remote.ssl.keystore.key: "${directory:/opt/kafka/external-configuration/distribution-secret:<keystore key file name defined in secret>}"
        # example: "${directory:/opt/kafka/external-configuration/distribution-secret:remote_keystore.key}"
        remote.ssl.truststore.certificates: "${directory:/opt/kafka/external-configuration/distribution-secret:<truststore certificates file name defined in secret>}"
        # example: "${directory:/opt/kafka/external-configuration/distribution-secret:remote_truststore_certificates.crt}"