Consuming Using Python Axual Client
Creating Python Consumer
When you have completed this step, you will have a consumer application that is consuming from a stream you have configured in step 2 in Avro format. To get some data onto the stream, follow step 5:Create A Producer Application.
Axual client doesn’t support SASL authentication. If you want to use SASL , please refer Kafka client consume using SASL
|
Before continuing, please check you fulfill all the prerequisites.
Building The Application
Install the latest axual-client-python
pip install axual-client-python
In order to create a very basic consumer application, create the configuration for the AxualClient
and also for the Consumer that will be used.
For full executable Axual client Avro consumer example, please refer examples repository. |
application_id = 'io.axual.example.client.avro.consumer'
topic = 'avro-applicationlog'
key_deserializer = AvroDeserializer(
# Optional parameters
schema_str=Application.SCHEMA,
from_dict=dict_to_application
)
value_deserializer = AvroDeserializer(
# Optional parameters
schema_str=ApplicationLogEvent.SCHEMA,
from_dict=dict_to_application_log_event,
)
configuration = {
# Axual configuration
'application_id': application_id,
'endpoint': 'http://localhost:8081/',
'tenant': 'axual',
'environment': 'local',
# SSL configuration
'ssl.certificate.location': _full_path_of('/client-cert/standalone/standalone.cer'),
'ssl.key.location': _full_path_of('/client-cert/standalone/standalone-private.key'),
'ssl.ca.location': _full_path_of('/client-cert/standalone/standalone-caroot.cer'),
# Consumer configuration
'key.deserializer': key_deserializer,
'value.deserializer': value_deserializer,
'auto.offset.reset': 'earliest',
'on_commit': on_commit_callback,
'error_cb': on_error_callback,
# 'debug': 'all',
'logger': logger
}
Check your care package for the truststore and keystore files, see also Step 3 |
With these configurations, we can instantiate a Consumer
and start consuming records.
consumer = DeserializingConsumer(configuration)
logger.info(f'Starting consumer loop, topic: {topic}. ^C to exit.')
try:
consumer.subscribe([topic], on_assign=log_assignment)
while True:
msg = consumer.poll()
if msg is None:
continue
if msg.error():
logger.error(f'Error returned by poll: {msg.error()}')
else:
logger.info(
f'Received message on topic {msg.topic()} partition {msg.partition()} '
f'offset {msg.offset()} key: {str(msg.key())} value: {str(msg.value())}'
)
consumer.commit()
except KeyboardInterrupt:
logger.info('Caught KeyboardInterrupt, stopping.')
finally:
if consumer is not None:
logger.info('Committing final offsets and leaving group.')
consumer.commit()
consumer.close()
When all of the above steps have been done correctly, start your consumer app and this will produce logging that will look like this:
2022-03-08 18:00:09.464|INFO|avro-consumer.py| Starting consumer loop, topic: avro-applicationlog. ^C to exit. 2022-03-08 18:00:09.464|INFO|discovery.py| Waiting for Discovery Result. 2022-03-08 18:00:53.164|INFO|avro-consumer.py| Consumer assignment: 10 partitions: 2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [0] @ -1001 2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [1] @ -1001 2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [2] @ -1001 2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [3] @ -1001 2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [4] @ -1001 2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [5] @ -1001 2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [6] @ -1001 2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [7] @ -1001 2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [8] @ -1001 2022-03-08 18:00:53.164|INFO|avro-consumer.py| axual-example-local-avro-applicationlog [9] @ -1001 2022-03-08 18:00:53.838|INFO|avro-consumer.py| Received message on topic axual-example-local-avro-applicationlog partition 4 offset 0 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646762417651, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #0 2022-03-08 18:00:53.838|INFO|avro-consumer.py| Received message on topic axual-example-local-avro-applicationlog partition 4 offset 1 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646762428745, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #1 2022-03-08 18:00:53.838|INFO|avro-consumer.py| Received message on topic axual-example-local-avro-applicationlog partition 4 offset 2 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646762429749, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #2 2022-03-08 18:00:53.838|INFO|avro-consumer.py| Received message on topic axual-example-local-avro-applicationlog partition 4 offset 3 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646762430752, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #3 2022-03-08 18:00:53.838|INFO|avro-consumer.py| Received message on topic axual-example-local-avro-applicationlog partition 4 offset 4 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646762431755, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #4 2022-03-08 18:00:53.838|INFO|avro-consumer.py| Received message on topic axual-example-local-avro-applicationlog partition 4 offset 5 key: name:value_log_event_producer, version:0.0.1, owner:Axual value: timestamp:1646762432759, source:name:value_log_event_producer, version:0.0.1, owner:Axual, context:{'Some key': 'Some Value'}, level:INFO, message:Message #5
This is all the coding required to make a successful consumer.
Wrapping up
You have concluded the getting started section by preparing your stream & applications, requesting access to the stream and actually producing and consuming some data. If you are going to deploy your application in another environment, it is advised to enable monitoring.
Proceed to Step 7: Enabling Monitoring
You can also use the menu on the left to find information about other platform features, that might not have been touched in this Getting Started.