Distributor

About the Distributor

Distributor is a cluster level service that synchronizes (distributes) messages and schemas between multiple Kafka clusters.

Distributor is built on top of Kafka Connect. Message and Schema distributors run as Kafka Sink Connectors which receive messages on all instance topics and schemas topic respectively in the cluster. The received messages are distributed to other clusters in the instance. Distributor takes possible differences in internal (technical) topic names on different clusters into account.

Enabling Distributor in HELM Charts

Prerequisite: Make sure that the following services are up and running before starting distributor:

  • Strimzi Cluster Operator

  • Kafka Brokers

To enable distributor in the helm deployment, in your values.yaml toggle global configuration global.cluster.distributor.enabled to true. Default it’s false.

Usage:

global:
  cluster:
    enabled: true
    distributor:
      enabled: true

Configuring Distributor

You can specify each parameter using the --set key=value[,key=value] argument to helm install command.

Alternatively, a YAML file that specifies the values for the parameters can be provided while installing the chart. For example:

helm install platform axual-stable/platform --version=my-version -f my-values.yaml -n kafka

The default values.yaml is provided for the distributor that is suitable for a local k8s deployment. You can override below distributor configurations in your values.yaml as follows:

core:
  distributor:
    # Strimzi kafka version for distributor (kafka connect)
    version: x.x.x
    # Number of distributor worker replicas
    replicas: 3
    # distributor image path and tag
    image:
      name: docker.axual.io/axual/distributor
      tag: x.x.x
    # distributor image pull secrets
    imagePullSecrets:
      - name: dockercredssecret
    # If true, then connect will start the connectors specified in KafkaConnector CRDs
    useConnectorResources: "true"

    # Distributor(Kafka connect) worker configuration. For more information on the configuration,
    # please visit https://strimzi.io/docs/operators/latest/configuring.html#property-kafka-connect-config-reference
    config:
      config.providers: directory
      config.providers.directory.class: org.apache.kafka.common.config.provider.DirectoryConfigProvider
      key.converter: org.apache.kafka.connect.json.JsonConverter
      value.converter: org.apache.kafka.connect.json.JsonConverter
      key.converter.schemas.enable: true
      value.converter.schemas.enable: true
      # the replication must be in accordance with cluster `min.insync.replicas` config
      config.storage.replication.factor: 3
      offset.storage.replication.factor: 3
      status.storage.replication.factor: 3
    # Rack support is only available if rackEnabled is set to true
    rackEnabled: false
    rackTopologyKey: topology.kubernetes.io/zone

    # Pod Resource Limits
    resources:
      requests:
        cpu: 500m
        memory: 500Mi
    # Pod readinessProbe config
    readinessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    # Pod livenessProbe config
    livenessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    # Distributor logging config. You can add/modify logging level and loggers here
    logging:
      type: inline
      loggers:
        log4j.rootLogger: "INFO"
        log4j.logger.io.axual.distributor: "INFO"
        log4j.logger.io.axual.connect: "INFO"

    # You can mount K8S ConfigMaps or Secrets into a distributor pod as environment variables or volumes.
    # Volumes and environment variables are configured in the externalConfiguration property
    # for full documentation visit: https://strimzi.io/docs/operators/latest/configuring.html#type-ExternalConfiguration-reference
    externalConfiguration: {}

    tls:
      # if createTruststoreCaSecret is false, the caCerts need to be set
      # with an existing k8s secret resource (name) and the name of the cert inside the
      # k8s secret

      # truststoreCaSecretName: your_custom_ca_secret
      # truststoreCaCert: your_custom_ca_cert

      # if createTruststoreCaSecret is true, set the CA certs below
      createTruststoreCaSecret: true
      caCerts:
        axual_dummy_root_ca.crt: ...
        axual_root_ca.crt: ...

      # if createClientKeypairSecret is false, the clientKeypairSecretName need to be set
      # with an existing k8s secret resource (name). The only need is that the k8s secret need to define
      # two keys with the names of tls.crt and tls.key
      # clientKeypairSecretName: your_custom_keypair_secret

      # if createClientKeypairSecret is true, set the clientCert and clientKey below
      # under the authentication section
      createClientKeypairSecret: true
      authentication:
        # The core.distributor.tls.authentication section can be populated using two options

        # Option (1)
        clientCert: ...
        clientKey: ...

        # Option (2)
        automatedKeystores: true
        serverKeypairSecretName: ...
        clientKeypairSecretName: ...
        truststoreCaSecretName: ...
1 This cert/key pair can be defined in PEM format which will act as the client key pair of the distributor (connect cluster)
2 The combination of client key pair can be generated together with trust store CA certificates automatically when automatedKeystores is set to true. The keys serverKeypairSecretName, clientKeypairSecretName, truststoreCaSecretName just need to be set with the name which is going to be the secret name once generated automatically.