Producing Using Python Kafka Client

Creating Producer Application

When you have completed this step, you will have set up a producer application that is producing some randomly generated data in Avro format to the topic you have configured in Creating topics.

Before continuing, please check you fulfill all the prerequisites.

Building The Application

Start by including the dependency on the Confluent Kafka client as you would do with any dependency.

For full executable Kafka client SASL/SSL producer example, please refer examples repository.
pip install confluent-kafka

Producing using SASL

In order to create a very basic producer application that use SASL, you need to create the configuration for the Kafka Producer.

    # Resolved topic name
    # ex : <tenant>-<instance>-<environment>-<topicName>
    # If the topic pattern in different in your case, use that pattern to resolve topic name
    topic = 'demo-local-example-avro-applicationlog'

    # Replace SASL username and password with one generated in Self-Service
    sasl_username = 'username'
    sasl_password = 'password'

    # Schema registry client configuration
    schema_registry_conf = {'url': 'https://platform.local:24000/',
                            'ssl.ca.location': _full_path_of('/resources/client-cert/standalone/standalone-caroot.cer'),
                            'basic.auth.user.info': sasl_username+':'+sasl_password
                            }
    schema_registry_client = SchemaRegistryClient(schema_registry_conf)

    key_serializer = AvroSerializer(
        schema_registry_client,
        Application.SCHEMA,
        application_to_dict
    )

    value_serializer = AvroSerializer(
        schema_registry_client,
        ApplicationLogEvent.SCHEMA,
        application_log_event_to_dict
    )

    # Kafka producer configuration
    configuration = {
        'bootstrap.servers': 'platform.local:9097',

        # Client.ID config
        # An id string to pass to the server when making requests.
        # The purpose of this is to be able to track the source of requests beyond
        # just ip/port by allowing a logical application name to be included in server-side request logging.
        'client.id': 'demo-local-example-io.axual.example.client.avro.producer',

        # security configuration
        'sasl.username': sasl_username,
        'sasl.password': sasl_password,
        'security.protocol': 'SASL_SSL',
        'sasl.mechanism': 'SCRAM-SHA-256',
        'ssl.endpoint.identification.algorithm': 'none',
        # Producer client CA location so client can trust brokers
        'ssl.ca.location': _full_path_of('/resources/client-cert/standalone/standalone-caroot.cer'),
        # key and value serializer
        'key.serializer': key_serializer,
        'value.serializer': value_serializer,
        'acks': 'all',
    }

When using confluent kafka client, you need to provide the fully resolved topic name to the configuration. You can resolve the topic name by looking into the pattern.

The default topic pattern is {tenant}-{instance}-{environment}-{topicName}

  • Check your essentials-package for the ssl CA file, see also Security

  • Replace sasl_username and sasl_password with credentials generated while configuring the application in Creating Applications

  • Values of fields tenant, instance can be found in the on-boarding information, environment value is example in this Getting Started. If unsure, contact support or your technical system administrator.

  • For bootstrap.servers and Schema Registry url contact support or your technical system administrator.

With above configurations, instantiate a Kafka Producer and start sending records.

producer = SerializingProducer(configuration)

try:
    logger.info(f'Starting to produce to topic: {topic}. ^C to exit.')
    n = 0
    while True:
        key = Application(name='value_log_event_producer',
                          version='0.0.1',
                          owner='Axual')
        value = ApplicationLogEvent(timestamp=int(round(time.time() * 1000)),
                                    application=key,
                                    context={'Some key': 'Some Value'},
                                    level=ApplicationLogLevel.INFO,
                                    message=f'Message #{n}')
        producer.poll(0)
        producer.produce(topic=topic, value=value, key=key, on_delivery=delivery_callback)

        time.sleep(1.)
        n += 1
except KeyboardInterrupt:
    logger.info('Caught KeyboardInterrupt, stopping.')
finally:
    if producer is not None:
        logger.info('Flushing producer.')
        producer.flush()

When all of the above steps have been done correctly, start your producer app. Your app will produce logging that will look like this:

2022-03-09 13:21:15.848|INFO|avro-producer.py| Starting kafka avro SASL producer to produce to topic: demo-local-localenv-applicationlogevent. ^C to exit.
2022-03-09 13:21:15.860|DEBUG|connectionpool.py| Starting new HTTPS connection (1): platform.local:24000
2022-03-09 13:21:22.112|DEBUG|connectionpool.py| https://platform.local:24000 "POST /subjects/demo-local-localenv-applicationlogevent-key/versions HTTP/1.1" 200 8
2022-03-09 13:21:22.226|DEBUG|connectionpool.py| https://platform.local:24000 "POST /subjects/demo-local-localenv-applicationlogevent-value/versions HTTP/1.1" 200 8
2022-03-09 13:21:23.239|INFO|avro-producer.py| Produced record to topic demo-local-localenv-applicationlogevent partition [0] @ offset 259
2022-03-09 13:21:24.239|INFO|avro-producer.py| Produced record to topic demo-local-localenv-applicationlogevent partition [0] @ offset 260
2022-03-09 13:21:25.243|INFO|avro-producer.py| Produced record to topic demo-local-localenv-applicationlogevent partition [0] @ offset 261
2022-03-09 13:21:26.244|INFO|avro-producer.py| Produced record to topic demo-local-localenv-applicationlogevent partition [0] @ offset 262

This is all the coding required to make a successful produce happen.

Producing using Mutual TLS

In order to create a very basic producer application using mTLS, you need to create the configuration for the Kafka Producer.

    # Resolved topic name
    # ex : <tenant>-<instance>-<environment>-<topicName>
    # If the topic pattern in different in your case, use that pattern to resolve topic name
    topic = 'axual-example-local-avro-applicationlog'

    # Schema registry client config
    schema_registry_conf = {'url': 'http://localhost:8082'}
    schema_registry_client = SchemaRegistryClient(schema_registry_conf)

    key_serializer = AvroSerializer(
        schema_registry_client,
        Application.SCHEMA,
        application_to_dict
    )

    value_serializer = AvroSerializer(
        schema_registry_client,
        ApplicationLogEvent.SCHEMA,
        application_log_event_to_dict
    )

    # Kafka producer configuration
    configuration = {
        'bootstrap.servers': 'localhost:8084',

        # Client.ID config
        # An id string to pass to the server when making requests.
        # The purpose of this is to be able to track the source of requests beyond
        # just ip/port by allowing a logical application name to be included in server-side request logging.
        'client.id': 'demo-local-example-io.axual.example.client.avro.producer',

        # SSL configuration
        'security.protocol': 'SSL',
        'ssl.endpoint.identification.algorithm': 'none',
        'ssl.certificate.location': _full_path_of('/resources/client-cert/standalone/standalone.cer'),
        'ssl.key.location': _full_path_of('/resources/client-cert/standalone/standalone-private.key'),
        'ssl.ca.location': _full_path_of('/resources/client-cert/standalone/standalone-caroot.cer'),
        # key and value serializer
        'key.serializer': key_serializer,
        'value.serializer': value_serializer,
        'acks': 'all',
        # 'debug': 'all',
        'logger': logger
    }

When using confluent kafka client, you need to provide the fully resolved topic name to the configuration. You can resolve the topic name by looking into the pattern.

The default topic pattern is {tenant}-{instance}-{environment}-{topicName}

  • Check your essentials-package for ssl security files, see also Security

  • Values of fields tenant, instance can be found in the on-boarding information, environment value is example in this Getting Started. If unsure, contact support or your technical system administrator.

  • For bootstrap.servers and Schema Registry url contact support or your technical system administrator.

With above configurations, instantiate a Kafka Producer and start sending records.

producer = SerializingProducer(configuration)

try:
    logger.info(f'Starting to kafka avro produce to topic: {topic}. ^C to exit.')
    n = 0
    while True:
        key = Application(name='value_log_event_producer',
                          version='0.0.1',
                          owner='Axual')
        value = ApplicationLogEvent(timestamp=int(round(time.time() * 1000)),
                                    application=key,
                                    context={'Some key': 'Some Value'},
                                    level=ApplicationLogLevel.INFO,
                                    message=f'Message #{n}')
        producer.poll(0)
        producer.produce(topic=topic, value=value, key=key, on_delivery=delivery_callback)

        time.sleep(1.)
        n += 1
except KeyboardInterrupt:
    logger.info('Caught KeyboardInterrupt, stopping.')
finally:
    if producer is not None:
        logger.info('Flushing producer.')
        producer.flush()

When all of the above steps have been done correctly, start your producer app. Your app will produce logging that will look like this:

2022-03-09 13:25:13.223|INFO|avro-producer.py| Starting kafka avro producer to produce to topic: axual-example-local-avro-applicationlog. ^C to exit.
2022-03-09 13:25:13.238|DEBUG|connectionpool.py| Starting new HTTP connection (1): localhost:8082
2022-03-09 13:25:13.296|DEBUG|connectionpool.py| http://localhost:8082 "POST /subjects/axual-example-local-avro-applicationlog-key/versions HTTP/1.1" 200 None
2022-03-09 13:25:13.320|DEBUG|connectionpool.py| http://localhost:8082 "POST /subjects/axual-example-local-avro-applicationlog-value/versions HTTP/1.1" 200 None
2022-03-09 13:25:14.325|INFO|avro-producer.py| Produced record to topic axual-example-local-avro-applicationlog partition [0] @ offset 16
2022-03-09 13:25:15.329|INFO|avro-producer.py| Produced record to topic axual-example-local-avro-applicationlog partition [0] @ offset 17
2022-03-09 13:25:16.332|INFO|avro-producer.py| Produced record to topic axual-example-local-avro-applicationlog partition [0] @ offset 18
2022-03-09 13:25:17.333|INFO|avro-producer.py| Produced record to topic axual-example-local-avro-applicationlog partition [0] @ offset 19
2022-03-09 13:25:18.334|INFO|avro-producer.py| Produced record to topic axual-example-local-avro-applicationlog partition [0] @ offset 20

This is all the coding required to make a successful produce happen.

Next Step: Consuming Data Using A Python Consumer Application

In the next step you will create a Java consumer application to use the data you just produced.