Consuming Using Python Axual Client

Creating Python Consumer

When you have completed this step, you will have a consumer application that is consuming from a stream you have configured in step 2 in Avro format. To get some data onto the stream, follow step 5:Create A Producer Application.

Axual client doesn’t support SASL authentication. If you want to use SASL, please refer Kafka client consume using SASL

Before continuing, please check you fulfill all the prerequisites.

Building The Application

Install the latest axual-client-python

pip install axual-client-python

In order to create a very basic consumer application, create the configuration for the AxualClient and also for the Consumer that will be used.

For full executable Axual client Avro consumer example, please refer examples repository.
    application_id = 'io.axual.example.client.avro.consumer'
    topic = 'avro-applicationlog'

    key_deserializer = AvroDeserializer(
        # Optional parameters
        schema_str=Application.SCHEMA,
        from_dict=dict_to_application
    )

    value_deserializer = AvroDeserializer(
        # Optional parameters
        schema_str=ApplicationLogEvent.SCHEMA,
        from_dict=dict_to_application_log_event,
    )

    configuration = {
        # Axual configuration
        'application_id': application_id,
        'endpoint': 'http://localhost:8081/',
        'tenant': 'axual',
        'environment': 'local',
        # SSL configuration
        'ssl.certificate.location': _full_path_of('/client-cert/standalone/standalone.cer'),
        'ssl.key.location': _full_path_of('/client-cert/standalone/standalone-private.key'),
        'ssl.ca.location': _full_path_of('/client-cert/standalone/standalone-caroot.cer'),
        # Consumer configuration
        'key.deserializer': key_deserializer,
        'value.deserializer': value_deserializer,

        'auto.offset.reset': 'earliest',
        'on_commit': on_commit_callback,
        'error_cb': on_error_callback,
        # 'debug': 'all',
        'logger': logger
    }
Check your care package for the truststore and keystore files, see also Step 3

With these configurations, we can instantiate a Consumer and start consuming records.

consumer = DeserializingConsumer(configuration)

logger.info(f'Starting consumer loop, topic: {topic}. ^C to exit.')
try:
    consumer.subscribe([topic], on_assign=log_assignment)
    while True:
        msg = consumer.poll()

        if msg is None:
            continue

        if msg.error():
            logger.error(f'Error returned by poll: {msg.error()}')
        else:
            logger.info(
                f'Received message on topic {msg.topic()} partition {msg.partition()} '
                f'offset {msg.offset()} key: {str(msg.key())} value: {str(msg.value())}'
            )
            consumer.commit()
except KeyboardInterrupt:
    logger.info('Caught KeyboardInterrupt, stopping.')
finally:
    if consumer is not None:
        logger.info('Committing final offsets and leaving group.')
        consumer.commit()
        consumer.close()

When all of the above steps have been done correctly, start your consumer app and this will produce logging that will look like this:

2022-03-08 18:00:09.464|INFO|avro-consumer.py| Starting consumer loop, topic: avro-applicationlog. ^C to exit.
2022-03-08 18:00:09.464|INFO|discovery.py| Waiting for Discovery Result.
2022-03-08 18:00:53.164|INFO|avro-consumer.py| Consumer assignment: 10 partitions:
2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [0] @ -1001
2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [1] @ -1001
2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [2] @ -1001
2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [3] @ -1001
2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [4] @ -1001
2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [5] @ -1001
2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [6] @ -1001
2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [7] @ -1001
2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [8] @ -1001
2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [9] @ -1001
2022-03-08 18:00:53.838|INFO|avro-consumer.py| Received message on topic axual-example-local-avro-applicationlog partition 4 offset 0 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646762417651, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #0
2022-03-08 18:00:53.838|INFO|avro-consumer.py| Received message on topic axual-example-local-avro-applicationlog partition 4 offset 1 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646762428745, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #1
2022-03-08 18:00:53.838|INFO|avro-consumer.py| Received message on topic axual-example-local-avro-applicationlog partition 4 offset 2 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646762429749, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #2
2022-03-08 18:00:53.838|INFO|avro-consumer.py| Received message on topic axual-example-local-avro-applicationlog partition 4 offset 3 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646762430752, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #3
2022-03-08 18:00:53.838|INFO|avro-consumer.py| Received message on topic axual-example-local-avro-applicationlog partition 4 offset 4 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646762431755, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #4
2022-03-08 18:00:53.838|INFO|avro-consumer.py| Received message on topic axual-example-local-avro-applicationlog partition 4 offset 5 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646762432759, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #5

This is all the coding required to make a successful consumer.

Wrapping up

You have concluded the getting started section by preparing your stream & applications, requesting access to the stream and actually producing and consuming some data. If you are going to deploy your application in another environment, it is advised to enable monitoring.

You can also use the menu on the left to find information about other platform features, that might not have been touched in this Getting Started.