Schema Distributor

Enabling Schema Distributor in HELM Charts

Prerequisite: Make sure that the following services are up and running before starting schema distributor:

  • Distributor

By default, Schema Distributor is enabled. When you enable distributor, Schema distributor will be automatically deployed by Strimzi cluster operator. To disable Schema Distributor, set global configuration global.instance.schemaDistributor.enabled to false.

Usage:

instance:
  enabled: true
  schemaDistributor:
    enabled: true

Configuring Schema Distributor

The default values.yaml is provided for the Schema Distributor that is suitable for a local k8s deployment. You can override below configurations in your values.yaml as follows:

instance:
  distribution:
    #  If cluster, tenant & instance are not overridden, then they will be picked up from global settings.
    #  clusterOverride:
    #  tenantOverride:
    #  instanceOverride:

    schemaDistributor:
      # Schema distributor name. This will resolve to {tenant}-{instance}-{schemaDistributor.name}-from-{schemaDistributor.config.from}-to-{schemaDistributor.config.to}
      name: schema-distributor

      # SchemaDistributor class
      class: io.axual.distributor.schema.connector.SchemaDistributor
      tasksMax: 1

      # `topics.regex` default is `{{ .Values.global.instance.fullName }}-.*`
      #  topicsRegexOverride:

      # remote cluster topic pattern and resolver config
      remoteTopicPattern: '{tenant}-{instance}-{environment}-{topic}'
      remoteTopicResolver: io.axual.common.resolver.TopicPatternResolver

      # add/override any schema-distributor configuration here
      config:
        # Source and destination cluster. They will be used to resolve the schema distributor name
        from: clusterA
        to: clusterB

        tasks.max: 1
        environment: example
        # converter config
        header.converter: org.apache.kafka.connect.converters.ByteArrayConverter
        key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
        value.converter: org.apache.kafka.connect.converters.ByteArrayConverter

        # transformer config
        transforms: SchemaRecordTransformation
        transforms.SchemaRecordTransformation.type: io.axual.distributor.schema.transform.SchemaRecordTransformation

        # remote cluster config for producer
        remote.bootstrap.servers: kafka2-kafka-bootstrap.kafka-cluster-a.svc.cluster.local:9093
        remote.key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
        remote.value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
        remote.acks: all
        remote.retries: 0
        remote.max.in.flight.requests.per.connection: 1

        # remote cluster security settings
        remote.security.protocol: SSL
        remote.ssl.protocol: TLSv1.2
        remote.ssl.keystore.type: PEM
        remote.ssl.truststore.type: PEM
        remote.ssl.endpoint.identification.algorithm:
        # PEM string format certificate-chain
        remote.ssl.keystore.certificate.chain: ...
        # PEM string format private key
        remote.ssl.keystore.key: ...
        # PEM string format CA certificates
        remote.ssl.truststore.certificates: ...

How to configure remote SSL security through K8S Secret resource?

In case you want to use a K8S Secret resource to configure remote SSL config remote.ssl.keystore.certificate.chain, remote.ssl.keystore.key & remote.ssl.truststore.certificates, you have to add following code block:

It is assumed a K8S Secret with the client certificate, private key and CA certificates in PEM format is already available.

Modify your values.yaml and update externalConfiguration in distributor configuration as follows:

core:
  distributor:
    # You can mount K8S ConfigMaps or Secrets into a distributor pod as environment variables or volumes. Volumes and environment variables are configured in the externalConfiguration property
    # for full documentation visit: https://strimzi.io/docs/operators/latest/configuring.html#type-ExternalConfiguration-reference
    externalConfiguration:
      volumes:
        - name: distribution-secret
          secret:
            secretName: <distribution-secret-name>

and update following schema distributor configuration:

instance:
  distribution:
    schemaDistributor:
      config:
        # The placeholder structure is directory:PATH:FILE-NAME. DirectoryConfigProvider reads and extracts the credentials from the mounted Secret in schema distributor configurations.
        remote.ssl.keystore.certificate.chain: "${directory:/opt/kafka/external-configuration/distribution-secret:<keystore certificate chain file name defined in secret>}"
        # example: "${directory:/opt/kafka/external-configuration/distribution-secret:remote_keystore_certificate_chain.crt}"
        remote.ssl.keystore.key: "${directory:/opt/kafka/external-configuration/distribution-secret:<keystore key file name defined in secret>}"
        # example: "${directory:/opt/kafka/external-configuration/distribution-secret:remote_keystore.key}"
        remote.ssl.truststore.certificates: "${directory:/opt/kafka/external-configuration/distribution-secret:<truststore certificates file name defined in secret>}"
        # example: "${directory:/opt/kafka/external-configuration/distribution-secret:remote_truststore_certificates.crt}"