Message Distributor
Enabling Message Distributor in HELM Charts
Prerequisite: Make sure that the following services are up and running before starting message distributor:
-
Distributor
-
Discovery API (with remote kafka clusters configured)
By default, Message Distributor is enabled.
When you enable distributor, Message Distributor will be automatically deployed by Strimzi cluster operator.
To disable Message Distributor, set global configuration global.instance.messageDistributor.enabled
to false.
Usage:
instance:
enabled: true
messageDistributor:
enabled: true
Configuring Message Distributor
The default values.yaml
is provided for the Message Distributor that is suitable for a local k8s deployment.
You can override below Message Distributor configurations in your values.yaml
as follows:
instance:
distribution:
# If cluster, tenant & instance are not overridden, then they will be picked up from global settings.
# clusterOverride:
# tenantOverride:
# instanceOverride:
messageDistributor:
# Message distributor name. This will resolve to {tenant}-{instance}-{distribution.name}-level-{distribution.config.level}
name: message-distributor
# MessageDistributor class
class: io.axual.distributor.connector.MessageDistributor
# Number of tasks handling message distribution (consumer threads)
tasksMax: 1
# `topics.regex` default is `{{ .Values.global.instance.fullName }}-.*`
# topicsRegexOverride:
# add/override any message distributor configuration here
config:
# cluster level
level: 1
environment: example
# discovery-api url
endpoint: https://axual-local-discovery-api:29000
# converter config
header.converter: org.apache.kafka.connect.converters.ByteArrayConverter
key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
# transformer config
transforms: CopyFlagFilter,CopyFlagUpdate
transforms.CopyFlagUpdate.level: 1
transforms.CopyFlagFilter.level: 1
transforms.CopyFlagUpdate.type: io.axual.distributor.transform.CopyFlagUpdate
transforms.CopyFlagFilter.type: io.axual.distributor.transform.CopyFlagFilter
# remote cluster config for producer
remote.acks: all
remote.value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
remote.key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
remote.max.in.flight.requests.per.connection: 1
remote.retries: 0
# remote cluster security settings
remote.security.protocol: SSL
remote.ssl.protocol: TLSv1.2
remote.ssl.keystore.type: PEM
remote.ssl.truststore.type: PEM
remote.ssl.endpoint.identification.algorithm:
# PEM string format certificate-chain
remote.ssl.keystore.certificate.chain: ...
# PEM string format private key
remote.ssl.keystore.key: ...
# PEM string format CA certificates
remote.ssl.truststore.certificates: ...
How to configure remote SSL security through K8S Secret resource?
In case you want to use a K8S Secret resource to configure remote SSL config remote.ssl.keystore.certificate.chain
, remote.ssl.keystore.key
& remote.ssl.truststore.certificates
, you have to add following code block:
It is assumed a K8S Secret with the client certificate, private key and CA certificates in PEM format is already available. |
Modify your values.yaml
and update externalConfiguration
in distributor configuration as follows:
core:
distributor:
# You can mount K8S ConfigMaps or Secrets into a distributor pod as environment variables or volumes. Volumes and environment variables are configured in the externalConfiguration property
# for full documentation visit: https://strimzi.io/docs/operators/latest/configuring.html#type-ExternalConfiguration-reference
externalConfiguration:
volumes:
- name: distribution-secret
secret:
secretName: <distribution-secret-name>
and update following message distributor configuration:
instance:
distribution:
messageDistributor:
config:
# The placeholder structure is directory:PATH:FILE-NAME. DirectoryConfigProvider reads and extracts the credentials from the mounted Secret in message distributor configurations.
remote.ssl.keystore.certificate.chain: "${directory:/opt/kafka/external-configuration/distribution-secret:<keystore certificate chain file name defined in secret>}"
# example: "${directory:/opt/kafka/external-configuration/distribution-secret:remote_keystore_certificate_chain.crt}"
remote.ssl.keystore.key: "${directory:/opt/kafka/external-configuration/distribution-secret:<keystore key file name defined in secret>}"
# example: "${directory:/opt/kafka/external-configuration/distribution-secret:remote_keystore.key}"
remote.ssl.truststore.certificates: "${directory:/opt/kafka/external-configuration/distribution-secret:<truststore certificates file name defined in secret>}"
# example: "${directory:/opt/kafka/external-configuration/distribution-secret:remote_truststore_certificates.crt}"