Producing Using Java Kafka Client
Creating A Producer Application
When you have completed this step, you will have set up a producer application that is producing some randomly generated data in Avro format to the stream you have configured in step 2.
Before continuing, please check you fulfill all the prerequisites .
Building The Application
Start by including the dependency on the Kafka client and Avro serializer library as you would do with any dependency.
For full executable Kafka client SASL/SSL producer example, please refer examples repository. |
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.0.1</version>
</dependency>
Producing using SASL
In order to create a very basic producer application that use SASL, you need to create the configuration for the Kafka Producer
.
private static final String STREAM = "demo-local-localenv-applicationlogevent";
Map<String, Object> config = new HashMap<>();
// Update the bootstrap servers to use SASL port
config.put(BOOTSTRAP_SERVERS_CONFIG, "platform.local:9097");
// SASL protocol used to communicate with brokers
config.put(SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
// SASL mechanism used for client connections
config.put(SASL_MECHANISM, "SCRAM-SHA-256");
config.put(SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", SASL_USERNAME, SASL_PASSWORD));
// Producer client truststore location so client can trust brokers
config.put(SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/axual.client.truststore.jks");
// truststore password
config.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, new Password("notsecret"));
// Schema Registry config
config.put(SCHEMA_REGISTRY_URL_CONFIG, "https://platform.local:24000/");
config.put(AUTO_REGISTER_SCHEMAS, false);
// This will extract user credential from kafka client
config.put(BASIC_AUTH_CREDENTIALS_SOURCE, "SASL_INHERIT");
config.put("schema.registry." + SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/axual.client.truststore.jks");
config.put("schema.registry." + SSL_TRUSTSTORE_PASSWORD_CONFIG, "notsecret");
// Key and value serializer
config.put(KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
config.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
When using kafka client, you need to provide the fully resolved The default topic pattern is |
|
With above configurations, instantiate a Kafka Producer
and start sending records.
try (final Producer<Application, ApplicationLogEvent> producer = new KafkaProducer<>(config)) {
futures = IntStream.range(0, 1)
.mapToObj(i ->
producer.send(
new ProducerRecord<Application, ApplicationLogEvent>(STREAM, key, value))
).collect(Collectors.toList());
do {
futures.removeIf(future -> {
if (!future.isDone()) {
return false;
}
try {
RecordMetadata producedMessage = future.get();
LOG.info("Produced message to partition {} offset {}", producedMessage.partition(), producedMessage.offset());
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error getting future, produce failed", e);
}
return true;
});
sleep(100);
} while (!futures.isEmpty());
- When all of the above steps have been done correctly, start your producer app. With the logging level to INFO, Your app should log
[main] INFO io.axual.client.example.kafkaclientsasl.avro.SaslAvroProducerApp - Creating producer config map [main] INFO io.axual.client.example.kafkaclientsasl.avro.SaslAvroProducerApp - Creating kafka producer [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ... [main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ... [main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ... [main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in. [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1646396311784 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: T8kKA-6FRwOMBDq1q1HoZQ [main] INFO io.axual.client.example.kafkaclientsasl.avro.SaslAvroProducerApp - Produced message to partition 0 offset 1 [main] INFO io.axual.client.example.kafkaclientsasl.avro.SaslAvroProducerApp - Done! [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed [main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed [main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered
This is all the coding required to make a successful produce happen.
Producing using Mutual TLS
In order to create a very basic producer application using mTLS, you need create the configuration for the Kafka Producer
.
Map<String, Object> config = new HashMap<>();
// Update the bootstrap servers to use SSL port
config.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:8084");
config.put(SECURITY_PROTOCOL_CONFIG, "SSL");
config.put(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
// SSL Configuration
config.put(SSL_KEYSTORE_LOCATION_CONFIG, "/path/to/axual.client.keystore.jks");
LOG.info("Using JKS type keystore and truststore.");
config.put(SSL_KEYSTORE_PASSWORD_CONFIG, "notsecret");
config.put(SSL_KEY_PASSWORD_CONFIG, "notsecret");
config.put(SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/axual.client.truststore.jks");
config.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, "notsecret");
// Schema Registry config
config.put(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
config.put(AUTO_REGISTER_SCHEMAS, false);
// Key and value serializer
config.put(KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
config.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
When using kafka client, you need to provide the fully resolved The default topic pattern is |
|
With above configurations, instantiate a Kafka Producer
and start sending records.
try (final Producer<Application, ApplicationLogEvent> producer = new KafkaProducer<>(config)) {
futures = IntStream.range(0, 1)
.mapToObj(i ->
producer.send(
new ProducerRecord<Application, ApplicationLogEvent>(STREAM, key, value))
).collect(Collectors.toList());
do {
futures.removeIf(future -> {
if (!future.isDone()) {
return false;
}
try {
RecordMetadata producedMessage = future.get();
LOG.info("Produced message to partition {} offset {}", producedMessage.partition(), producedMessage.offset());
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error getting future, produce failed", e);
}
return true;
});
sleep(100);
} while (!futures.isEmpty());
- When all of the above steps have been done correctly, start your producer app. With the logging level to INFO, Your app should log
[main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroProducerApp - Creating producer config map [main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroProducerApp - Using JKS type keystore and truststore. [main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroProducerApp - Creating kafka producer [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: ... [main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ... [main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ... [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1646394569975 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-0 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-5 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-2 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-8 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-9 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-4 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-1 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-6 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-7 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-3 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: CSq54uNjQNOwk8y6pVnP4Q [main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroProducerApp - Produced message to partition 5 offset 0 [main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroProducerApp - Done! [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed [main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed [main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered
This is all the coding required to make a successful produce happen.
Next Step: Consuming Data Using A Java Consumer Application
In the next step you will create a Java consumer application to use the data you just produced.
Proceed to Step 6: Consuming Data (Java Kafka Client)