Consuming Using Python Kafka Client

Creating A Python Consumer

When you have completed this step, you will have set up a consumer application that is consuming data from the stream configured in Creating topics in Avro format. To get some data onto the stream, follow Create A Producer Application.

Before continuing, please check you fulfill all the prerequisites.

Building The Application

Start by including the dependency on the Confluent Kafka client as you would do with any dependency.

For full executable Kafka client SASL/SSL consumer example, please refer examples repository.
pip install confluent-kafka

Consuming using SASL

In order to create a very basic consumer application that use SASL, you need to create the configuration for the Kafka Consumer.

    # Resolved stream name
    # ex : <tenant>-<instance>-<environment>-<streamName>
    # If the topic pattern in different in your case, use that pattern to resolve topic name
    topic = 'demo-local-example-avro-applicationlog'

    # Replace SASL username and password with one generated in Self-Service
    sasl_username = 'username'
    sasl_password = 'password'

    # Schema registry client configuration
    schema_registry_conf = {'url': 'https://platform.local:24000/',
                            'ssl.ca.location': _full_path_of('/resources/client-cert/standalone/standalone-caroot.cer'),
                            'basic.auth.user.info': sasl_username+':'+sasl_password
                            }
    schema_registry_client = SchemaRegistryClient(schema_registry_conf)

    key_deserializer = AvroDeserializer(
        schema_registry_client,
        Application.SCHEMA,
        dict_to_application
    )

    value_deserializer = AvroDeserializer(
        schema_registry_client,
        ApplicationLogEvent.SCHEMA,
        dict_to_application_log_event,
    )

    # Kafka consumer configuration
    configuration = {
        'bootstrap.servers': 'platform.local:9097',
        # Resolved groupId
        # ex: <tenant>-<instance>-<environment>-<applicationName>
        # If the 'group.id' pattern in different in your case, use that pattern to resolve group.id
        'group.id': 'demo-local-localenv-io.axual.example.proxy.avro.consumer',
        # security configuration
        'sasl.username': sasl_username,
        'sasl.password': sasl_password,
        'security.protocol': 'SASL_SSL',
        'sasl.mechanism': 'SCRAM-SHA-256',
        'ssl.endpoint.identification.algorithm': 'none',
        'ssl.ca.location': _full_path_of('/resources/client-cert/standalone/standalone-caroot.cer'),
        # key and value deserializer
        'key.deserializer': key_deserializer,
        'value.deserializer': value_deserializer,

        'auto.offset.reset': 'earliest',
        'on_commit': on_commit_callback,
        'error_cb': on_error_callback,
        # 'debug': 'all',
        'logger': logger
    }

When using kafka client, you need to provide the fully resolved topic name and group.id to the configuration. You can resolve the topic name and group.id by looking into the patterns.

The default patterns are:

  • Topic Pattern: {tenant}-{instance}-{environment}-{streamName}

  • Group Id Pattern: {tenant}-{instance}-{environment}-{applicationId}

  • Check your essentials-package for ssl CA file, see also Security

  • Replace sasl_username and sasl_password with credentials generated while configuring the application in Creating Applications

  • Values of fields tenant, instance can be found in the on-boarding information, environment value is example in this Getting Started. If unsure, contact support or your technical system administrator.

  • For bootstrap.servers and Schema Registry url contact support or your technical system administrator.

With above configurations, instantiate a Kafka Consumer and start consuming records.

consumer = DeserializingConsumer(configuration)

logger.info(f'Starting kafka avro SASL consumer loop, topic: {topic}. ^C to exit.')
try:
    consumer.subscribe([topic], on_assign=log_assignment)
    while True:
        msg = consumer.poll()

        if msg is None:
            continue

        if msg.error():
            logger.error(f'Error returned by poll: {msg.error()}')
        else:
            logger.info(
                f'Received message on topic {msg.topic()} partition {msg.partition()} '
                f'offset {msg.offset()} key: {str(msg.key())} value: {str(msg.value())}'
            )
            consumer.commit()
except KeyboardInterrupt:
    logger.info('Caught KeyboardInterrupt, stopping.')
finally:
    if consumer is not None:
        logger.info('Committing final offsets and leaving group.')
        consumer.commit()
        consumer.close()

When all of the above steps have been done correctly, start your consumer app. Your app will produce logging that will look like this:

2022-03-09 13:40:58.397|INFO|avro-consumer.py| Starting kafka SASL avro consumer loop, topic: demo-local-localenv-applicationlogevent. ^C to exit.
2022-03-09 13:41:06.876|INFO|avro-consumer.py| Consumer assignment: 1 partitions:
2022-03-09 13:41:06.876|INFO|avro-consumer.py| demo-local-localenv-applicationlogevent [0] @ -1001
2022-03-09 13:41:06.956|DEBUG|connectionpool.py| Starting new HTTPS connection (1): platform.local:24000
2022-03-09 13:41:12.208|DEBUG|connectionpool.py| https://platform.local:24000 "GET /schemas/ids/2 HTTP/1.1" 200 847
2022-03-09 13:41:12.324|DEBUG|connectionpool.py| https://platform.local:24000 "GET /schemas/ids/1 HTTP/1.1" 200 353
2022-03-09 13:41:12.325|INFO|avro-consumer.py| Received message on topic demo-local-localenv-applicationlogevent partition 0 offset 257 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646827911230, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #15
2022-03-09 13:41:12.325|INFO|avro-consumer.py| Committed offsets for: [TopicPartition{topic=demo-local-localenv-applicationlogevent,partition=0,offset=258,error=None}]
2022-03-09 13:41:12.325|ERROR|avro-consumer.py| Failed to commit offsets: KafkaError{code=_NO_OFFSET,val=-168,str="Local: No offset stored"}: [TopicPartition{topic=demo-local-localenv-applicationlogevent,partition=0,offset=-1001,error=None}]
2022-03-09 13:41:12.325|INFO|avro-consumer.py| Received message on topic demo-local-localenv-applicationlogevent partition 0 offset 258 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646827912235, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #16
2022-03-09 13:41:12.326|INFO|avro-consumer.py| Received message on topic demo-local-localenv-applicationlogevent partition 0 offset 259 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646832075849, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #0
2022-03-09 13:41:12.326|INFO|avro-consumer.py| Received message on topic demo-local-localenv-applicationlogevent partition 0 offset 260 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646832083233, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #1
2022-03-09 13:41:12.326|INFO|avro-consumer.py| Received message on topic demo-local-localenv-applicationlogevent partition 0 offset 261 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646832084240, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #2
2022-03-09 13:41:12.327|INFO|avro-consumer.py| Received message on topic demo-local-localenv-applicationlogevent partition 0 offset 262 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646832085244, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #3
2022-03-09 13:41:12.327|INFO|avro-consumer.py| Received message on topic demo-local-localenv-applicationlogevent partition 0 offset 263 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646832086244, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #4
2022-03-09 13:41:12.328|INFO|avro-consumer.py| Received message on topic demo-local-localenv-applicationlogevent partition 0 offset 264 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646832355796, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #0

This is all the coding required to start a basic consumer!

Consuming using Mutual TLS

In order to create a very basic consumer application using mTLS, you need create the configuration for the Kafka Consumer.

    # Resolved stream name
    # ex : <tenant>-<instance>-<environment>-<streamName>
    # If the topic pattern in different in your case, use that pattern to resolve topic name
    topic = 'axual-example-local-avro-applicationlog'

    # Schema registry client configuration
    schema_registry_conf = {'url': 'http://localhost:8082'}
    schema_registry_client = SchemaRegistryClient(schema_registry_conf)

    key_deserializer = AvroDeserializer(
        schema_registry_client,
        Application.SCHEMA,
        dict_to_application
    )

    value_deserializer = AvroDeserializer(
        schema_registry_client,
        ApplicationLogEvent.SCHEMA,
        dict_to_application_log_event,
    )

    # Kafka consumer configuration
    configuration = {
        'bootstrap.servers': 'localhost:8084',
        # Resolved groupId
        # ex: <tenant>-<instance>-<environment>-<applicationName>
        # If the 'group.id' pattern in different in your case, use that pattern to resolve group.id
        'group.id': 'axual-example-local-io.axual.example.proxy.avro.consumer',
        # SSL configuration
        'security.protocol': 'SSL',
        'ssl.endpoint.identification.algorithm': 'none',
        'ssl.certificate.location': _full_path_of('/resources/client-cert/standalone/standalone.cer'),
        'ssl.key.location': _full_path_of('/resources/client-cert/standalone/standalone-private.key'),
        'ssl.ca.location': _full_path_of('/resources/client-cert/standalone/standalone-caroot.cer'),
        # key and value deserializer
        'key.deserializer': key_deserializer,
        'value.deserializer': value_deserializer,

        'auto.offset.reset': 'earliest',
        'on_commit': on_commit_callback,
        'error_cb': on_error_callback,
        # 'debug': 'all',
        'logger': logger
    }

When using confluent kafka client, you need to provide the fully resolved topic name to the configuration. You can resolve the topic name by looking into the pattern.

The default topic pattern is {tenant}-{instance}-{environment}-{streamName}

  • Check your essentials-package for ssl security files, see also Security

  • Values of fields tenant, instance can be found in the on-boarding information, environment value is example in this Getting Started. If unsure, contact support or your technical system administrator.

  • For bootstrap.servers and Schema Registry url contact support or your technical system administrator.

With above configurations, instantiate a Kafka Consumer and start consuming records.

consumer = DeserializingConsumer(configuration)

logger.info(f'Starting kafka avro consumer loop, topic: {topic}. ^C to exit.')
try:
    consumer.subscribe([topic], on_assign=log_assignment)
    while True:
        msg = consumer.poll()

        if msg is None:
            continue

        if msg.error():
            logger.error(f'Error returned by poll: {msg.error()}')
        else:
            logger.info(
                f'Received message on topic {msg.topic()} partition {msg.partition()} '
                f'offset {msg.offset()} key: {str(msg.key())} value: {str(msg.value())}'
            )
            consumer.commit()
except KeyboardInterrupt:
    logger.info('Caught KeyboardInterrupt, stopping.')
finally:
    if consumer is not None:
        logger.info('Committing final offsets and leaving group.')
        consumer.commit()
        consumer.close()

When all of the above steps have been done correctly, start your consumer app. Your app will produce logging that will look like this:

2022-03-09 13:45:03.214|INFO|avro-consumer.py| Starting kafka avro consumer loop, topic: axual-example-local-avro-applicationlog. ^C to exit.
2022-03-09 13:45:06.630|INFO|avro-consumer.py| Consumer assignment: 10 partitions:
2022-03-09 13:45:06.630|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [0] @ -1001
2022-03-09 13:45:06.630|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [1] @ -1001
2022-03-09 13:45:06.630|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [2] @ -1001
2022-03-09 13:45:06.630|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [3] @ -1001
2022-03-09 13:45:06.630|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [4] @ -1001
2022-03-09 13:45:06.630|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [5] @ -1001
2022-03-09 13:45:06.630|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [6] @ -1001
2022-03-09 13:45:06.630|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [7] @ -1001
2022-03-09 13:45:06.630|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [8] @ -1001
2022-03-09 13:45:06.630|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [9] @ -1001
2022-03-09 13:45:06.705|DEBUG|connectionpool.py| Starting new HTTP connection (1): localhost:8082
2022-03-09 13:45:06.711|DEBUG|connectionpool.py| http://localhost:8082 "GET /schemas/ids/1 HTTP/1.1" 200 None
2022-03-09 13:45:06.719|DEBUG|connectionpool.py| http://localhost:8082 "GET /schemas/ids/0 HTTP/1.1" 200 None
2022-03-09 13:45:06.719|INFO|avro-consumer.py| Received message on topic axual-example-local-avro-applicationlog partition 0 offset 7 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646830205965, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #0
2022-03-09 13:45:06.720|INFO|avro-consumer.py| Received message on topic axual-example-local-avro-applicationlog partition 0 offset 8 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646830207057, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #1
2022-03-09 13:45:06.720|INFO|avro-consumer.py| Received message on topic axual-example-local-avro-applicationlog partition 0 offset 9 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646830208062, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #2
...

This is all the coding required to start a basic consumer!

Wrapping up

You have concluded the getting started section by preparing your stream & applications, requesting access to the stream and actually producing and consuming some data. If you are going to deploy your application in another environment, it is advised to enable monitoring.

You can also use the menu on the left to find information about other platform features, that might not have been touched in this Getting Started.