Producing Using Python Axual Client
Creating Python Producer
When you have completed this step, you will have set up a producer application that is producing some randomly generated data in Avro format to the stream you have configured in Creating topics.
Axual client doesn’t support SASL authentication. If you want to use SASL , please refer Kafka client producer using SASL
|
Before continuing, please check you fulfill all the prerequisites.
Building The Application
Install the latest axual-client-python
For full executable Axual client Avro producer example, please refer examples repository.. |
pip install axual-client-python
In order to create a very basic producer application, create the configuration for the AxualClient
and also for the Producer that will be used.
application_id = 'io.axual.example.client.avro.producer'
topic = 'avro-applicationlog'
key_serializer = AvroSerializer(
schema_str=Application.SCHEMA,
to_dict=application_to_dict
)
value_serializer = AvroSerializer(
schema_str=ApplicationLogEvent.SCHEMA,
to_dict=application_log_event_to_dict
)
configuration = {
# Axual configuration
'application_id': application_id,
'endpoint': 'http://localhost:8081/',
'tenant': 'axual',
'environment': 'local',
# SSL configuration
'ssl.certificate.location': _full_path_of('/client-cert/standalone/standalone.cer'),
'ssl.key.location': _full_path_of('/client-cert/standalone/standalone-private.key'),
'ssl.ca.location': _full_path_of('/client-cert/standalone/standalone-caroot.cer'),
# Producer configuration
'key.serializer': key_serializer,
'value.serializer': value_serializer,
'acks': 'all',
}
Check your essentials-package for the truststore and keystore files, see also Security |
With these configurations, we can instantiate a Producer
and use those to start sending records.
producer = SerializingProducer(configuration)
try:
logger.info(f'Starting to produce to topic: {topic}. ^C to exit.')
n = 0
while True:
key = Application(name='value_log_event_producer',
version='0.0.1',
owner='Axual')
value = ApplicationLogEvent(timestamp=int(round(time.time() * 1000)),
application=key,
context={'Some key': 'Some Value'},
level=ApplicationLogLevel.INFO,
message=f'Message #{n}')
producer.poll(0)
producer.produce(topic=topic, value=value, key=key, on_delivery=delivery_callback)
time.sleep(1.)
n += 1
except KeyboardInterrupt:
logger.info('Caught KeyboardInterrupt, stopping.')
finally:
if producer is not None:
logger.info('Flushing producer.')
producer.flush()
When all of the above steps have been done correctly, start your producer app and this will produce logging that will look like this:
2022-03-04 15:26:57.000|INFO|avro-producer.py| Starting to produce to topic: avro-applicationlog. ^C to exit. 2022-03-04 15:26:57.001|INFO|discovery.py| Waiting for Discovery Result. 2022-03-04 15:27:09.092|INFO|avro-producer.py| Produced record to topic axual-example-local-avro-applicationlog partition [4] @ offset 12 2022-03-04 15:27:09.093|INFO|avro-producer.py| Produced record to topic axual-example-local-avro-applicationlog partition [4] @ offset 13 2022-03-04 15:27:10.096|INFO|avro-producer.py| Produced record to topic axual-example-local-avro-applicationlog partition [4] @ offset 14 2022-03-04 15:27:11.101|INFO|avro-producer.py| Produced record to topic axual-example-local-avro-applicationlog partition [4] @ offset 15 2022-03-04 15:27:12.103|INFO|avro-producer.py| Produced record to topic axual-example-local-avro-applicationlog partition [4] @ offset 16 2022-03-04 15:27:13.107|INFO|avro-producer.py| Produced record to topic axual-example-local-avro-applicationlog partition [4] @ offset 17 2022-03-04 15:27:14.110|INFO|avro-producer.py| Produced record to topic axual-example-local-avro-applicationlog partition [4] @ offset 18
This is all the coding required to make a successful produce happen.
Next Step: Consuming Data Using A Python Consumer Application
In the next step you will create a Python consumer application to use the data you just produced.
Proceed to Consuming Data (Python Axual Client)