Producing Using Python Kafka Client
Creating Producer Application
When you have completed this step, you will have set up a producer application that is producing some randomly generated data in Avro format to the topic you have configured in Creating topics.
Before continuing, please check you fulfill all the prerequisites.
Building The Application
Start by including the dependency on the Confluent Kafka client as you would do with any dependency.
For full executable Kafka client SASL/SSL producer example, please refer examples repository. |
pip install confluent-kafka
Producing using SASL
In order to create a very basic producer application that use SASL, you need to create the configuration for the Kafka Producer
.
# Resolved stream name
# ex : <tenant>-<instance>-<environment>-<streamName>
# If the topic pattern in different in your case, use that pattern to resolve topic name
topic = 'demo-local-example-avro-applicationlog'
# Replace SASL username and password with one generated in Self-Service
sasl_username = 'username'
sasl_password = 'password'
# Schema registry client configuration
schema_registry_conf = {'url': 'https://platform.local:24000/',
'ssl.ca.location': _full_path_of('/resources/client-cert/standalone/standalone-caroot.cer'),
'basic.auth.user.info': sasl_username+':'+sasl_password
}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
key_serializer = AvroSerializer(
schema_registry_client,
Application.SCHEMA,
application_to_dict
)
value_serializer = AvroSerializer(
schema_registry_client,
ApplicationLogEvent.SCHEMA,
application_log_event_to_dict
)
# Kafka producer configuration
configuration = {
'bootstrap.servers': 'platform.local:9097',
# security configuration
'sasl.username': sasl_username,
'sasl.password': sasl_password,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'SCRAM-SHA-256',
'ssl.endpoint.identification.algorithm': 'none',
# Producer client CA location so client can trust brokers
'ssl.ca.location': _full_path_of('/resources/client-cert/standalone/standalone-caroot.cer'),
# key and value serializer
'key.serializer': key_serializer,
'value.serializer': value_serializer,
'acks': 'all',
}
When using confluent kafka client, you need to provide the fully resolved The default topic pattern is |
|
With above configurations, instantiate a Kafka Producer
and start sending records.
producer = SerializingProducer(configuration)
try:
logger.info(f'Starting to produce to topic: {topic}. ^C to exit.')
n = 0
while True:
key = Application(name='value_log_event_producer',
version='0.0.1',
owner='Axual')
value = ApplicationLogEvent(timestamp=int(round(time.time() * 1000)),
application=key,
context={'Some key': 'Some Value'},
level=ApplicationLogLevel.INFO,
message=f'Message #{n}')
producer.poll(0)
producer.produce(topic=topic, value=value, key=key, on_delivery=delivery_callback)
time.sleep(1.)
n += 1
except KeyboardInterrupt:
logger.info('Caught KeyboardInterrupt, stopping.')
finally:
if producer is not None:
logger.info('Flushing producer.')
producer.flush()
When all of the above steps have been done correctly, start your producer app. Your app will produce logging that will look like this:
2022-03-09 13:21:15.848|INFO|avro-producer.py| Starting kafka avro SASL producer to produce to topic: demo-local-localenv-applicationlogevent. ^C to exit. 2022-03-09 13:21:15.860|DEBUG|connectionpool.py| Starting new HTTPS connection (1): platform.local:24000 2022-03-09 13:21:22.112|DEBUG|connectionpool.py| https://platform.local:24000 "POST /subjects/demo-local-localenv-applicationlogevent-key/versions HTTP/1.1" 200 8 2022-03-09 13:21:22.226|DEBUG|connectionpool.py| https://platform.local:24000 "POST /subjects/demo-local-localenv-applicationlogevent-value/versions HTTP/1.1" 200 8 2022-03-09 13:21:23.239|INFO|avro-producer.py| Produced record to topic demo-local-localenv-applicationlogevent partition [0] @ offset 259 2022-03-09 13:21:24.239|INFO|avro-producer.py| Produced record to topic demo-local-localenv-applicationlogevent partition [0] @ offset 260 2022-03-09 13:21:25.243|INFO|avro-producer.py| Produced record to topic demo-local-localenv-applicationlogevent partition [0] @ offset 261 2022-03-09 13:21:26.244|INFO|avro-producer.py| Produced record to topic demo-local-localenv-applicationlogevent partition [0] @ offset 262
This is all the coding required to make a successful produce happen.
Producing using Mutual TLS
In order to create a very basic producer application using mTLS, you need to create the configuration for the Kafka Producer
.
# Resolved stream name
# ex : <tenant>-<instance>-<environment>-<streamName>
# If the topic pattern in different in your case, use that pattern to resolve topic name
topic = 'axual-example-local-avro-applicationlog'
# Schema registry client config
schema_registry_conf = {'url': 'http://localhost:8082'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
key_serializer = AvroSerializer(
schema_registry_client,
Application.SCHEMA,
application_to_dict
)
value_serializer = AvroSerializer(
schema_registry_client,
ApplicationLogEvent.SCHEMA,
application_log_event_to_dict
)
# Kafka producer configuration
configuration = {
'bootstrap.servers': 'localhost:8084',
# SSL configuration
'security.protocol': 'SSL',
'ssl.endpoint.identification.algorithm': 'none',
'ssl.certificate.location': _full_path_of('/resources/client-cert/standalone/standalone.cer'),
'ssl.key.location': _full_path_of('/resources/client-cert/standalone/standalone-private.key'),
'ssl.ca.location': _full_path_of('/resources/client-cert/standalone/standalone-caroot.cer'),
# key and value serializer
'key.serializer': key_serializer,
'value.serializer': value_serializer,
'acks': 'all',
# 'debug': 'all',
'logger': logger
}
When using confluent kafka client, you need to provide the fully resolved The default topic pattern is |
|
With above configurations, instantiate a Kafka Producer
and start sending records.
producer = SerializingProducer(configuration)
try:
logger.info(f'Starting to kafka avro produce to topic: {topic}. ^C to exit.')
n = 0
while True:
key = Application(name='value_log_event_producer',
version='0.0.1',
owner='Axual')
value = ApplicationLogEvent(timestamp=int(round(time.time() * 1000)),
application=key,
context={'Some key': 'Some Value'},
level=ApplicationLogLevel.INFO,
message=f'Message #{n}')
producer.poll(0)
producer.produce(topic=topic, value=value, key=key, on_delivery=delivery_callback)
time.sleep(1.)
n += 1
except KeyboardInterrupt:
logger.info('Caught KeyboardInterrupt, stopping.')
finally:
if producer is not None:
logger.info('Flushing producer.')
producer.flush()
When all of the above steps have been done correctly, start your producer app. Your app will produce logging that will look like this:
2022-03-09 13:25:13.223|INFO|avro-producer.py| Starting kafka avro producer to produce to topic: axual-example-local-avro-applicationlog. ^C to exit. 2022-03-09 13:25:13.238|DEBUG|connectionpool.py| Starting new HTTP connection (1): localhost:8082 2022-03-09 13:25:13.296|DEBUG|connectionpool.py| http://localhost:8082 "POST /subjects/axual-example-local-avro-applicationlog-key/versions HTTP/1.1" 200 None 2022-03-09 13:25:13.320|DEBUG|connectionpool.py| http://localhost:8082 "POST /subjects/axual-example-local-avro-applicationlog-value/versions HTTP/1.1" 200 None 2022-03-09 13:25:14.325|INFO|avro-producer.py| Produced record to topic axual-example-local-avro-applicationlog partition [0] @ offset 16 2022-03-09 13:25:15.329|INFO|avro-producer.py| Produced record to topic axual-example-local-avro-applicationlog partition [0] @ offset 17 2022-03-09 13:25:16.332|INFO|avro-producer.py| Produced record to topic axual-example-local-avro-applicationlog partition [0] @ offset 18 2022-03-09 13:25:17.333|INFO|avro-producer.py| Produced record to topic axual-example-local-avro-applicationlog partition [0] @ offset 19 2022-03-09 13:25:18.334|INFO|avro-producer.py| Produced record to topic axual-example-local-avro-applicationlog partition [0] @ offset 20
This is all the coding required to make a successful produce happen.
Next Step: Consuming Data Using A Python Consumer Application
In the next step you will create a Java consumer application to use the data you just produced.
Proceed to Consuming Data (Python Kafka Client)