Producing Using Java Kafka Client
Creating A Producer Application
When you have completed this step, you will have set up a producer application that is producing some randomly generated data in Avro format to the stream you have configured in Creating streams.
Before continuing, please check you fulfill all the Prerequisites.
Building The Application
Start by including the dependency on the Kafka client and Avro serializer library as you would do with any dependency.
For full executable Kafka client SASL/SSL producer example, please refer examples repository. |
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.0.1</version>
</dependency>
Producing using SASL
In order to create a very basic producer application that use SASL, you need to create the configuration for the Kafka Producer
.
private static final String STREAM = "demo-local-localenv-applicationlogevent";
Map<String, Object> config = new HashMap<>();
// Update the bootstrap servers to use SASL port
config.put(BOOTSTRAP_SERVERS_CONFIG, "platform.local:9097");
// SASL protocol used to communicate with brokers
config.put(SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
// SASL mechanism used for client connections
config.put(SASL_MECHANISM, "SCRAM-SHA-256");
config.put(SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", SASL_USERNAME, SASL_PASSWORD));
// Producer client truststore location so client can trust brokers
config.put(SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/axual.client.truststore.jks");
// truststore password
config.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, new Password("notsecret"));
// Schema Registry config
config.put(SCHEMA_REGISTRY_URL_CONFIG, "https://platform.local:24000/");
config.put(AUTO_REGISTER_SCHEMAS, false);
// This will extract user credential from kafka client
config.put(BASIC_AUTH_CREDENTIALS_SOURCE, "SASL_INHERIT");
config.put("schema.registry." + SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/axual.client.truststore.jks");
config.put("schema.registry." + SSL_TRUSTSTORE_PASSWORD_CONFIG, "notsecret");
// Key and value serializer
config.put(KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
config.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
When using kafka client, you need to provide the fully resolved The default topic pattern is |
|
With above configurations, instantiate a Kafka Producer
and start sending records.
try (final Producer<Application, ApplicationLogEvent> producer = new KafkaProducer<>(config)) {
futures = IntStream.range(0, 1)
.mapToObj(i ->
producer.send(
new ProducerRecord<Application, ApplicationLogEvent>(STREAM, key, value))
).collect(Collectors.toList());
do {
futures.removeIf(future -> {
if (!future.isDone()) {
return false;
}
try {
RecordMetadata producedMessage = future.get();
LOG.info("Produced message to partition {} offset {}", producedMessage.partition(), producedMessage.offset());
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error getting future, produce failed", e);
}
return true;
});
sleep(100);
} while (!futures.isEmpty());
- When all of the above steps have been done correctly, start your producer app. With the logging level to INFO, Your app should log
[main] INFO io.axual.client.example.kafkaclientsasl.avro.SaslAvroProducerApp - Creating producer config map [main] INFO io.axual.client.example.kafkaclientsasl.avro.SaslAvroProducerApp - Creating kafka producer [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ... [main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ... [main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ... [main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in. [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1646396311784 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: T8kKA-6FRwOMBDq1q1HoZQ [main] INFO io.axual.client.example.kafkaclientsasl.avro.SaslAvroProducerApp - Produced message to partition 0 offset 1 [main] INFO io.axual.client.example.kafkaclientsasl.avro.SaslAvroProducerApp - Done! [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed [main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed [main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered
This is all the coding required to make a successful produce happen.
Producing using Mutual TLS
In order to create a very basic producer application using mTLS, you need create the configuration for the Kafka Producer
.
Map<String, Object> config = new HashMap<>();
// Update the bootstrap servers to use SSL port
config.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:8084");
config.put(SECURITY_PROTOCOL_CONFIG, "SSL");
config.put(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
// SSL Configuration
config.put(SSL_KEYSTORE_LOCATION_CONFIG, "/path/to/axual.client.keystore.jks");
LOG.info("Using JKS type keystore and truststore.");
config.put(SSL_KEYSTORE_PASSWORD_CONFIG, "notsecret");
config.put(SSL_KEY_PASSWORD_CONFIG, "notsecret");
config.put(SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/axual.client.truststore.jks");
config.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, "notsecret");
// Schema Registry config
config.put(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
config.put(AUTO_REGISTER_SCHEMAS, false);
// Key and value serializer
config.put(KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
config.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
When using kafka client, you need to provide the fully resolved The default topic pattern is |
|
With above configurations, instantiate a Kafka Producer
and start sending records.
try (final Producer<Application, ApplicationLogEvent> producer = new KafkaProducer<>(config)) {
futures = IntStream.range(0, 1)
.mapToObj(i ->
producer.send(
new ProducerRecord<Application, ApplicationLogEvent>(STREAM, key, value))
).collect(Collectors.toList());
do {
futures.removeIf(future -> {
if (!future.isDone()) {
return false;
}
try {
RecordMetadata producedMessage = future.get();
LOG.info("Produced message to partition {} offset {}", producedMessage.partition(), producedMessage.offset());
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error getting future, produce failed", e);
}
return true;
});
sleep(100);
} while (!futures.isEmpty());
- When all of the above steps have been done correctly, start your producer app. With the logging level to INFO, Your app should log
[main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroProducerApp - Creating producer config map [main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroProducerApp - Using JKS type keystore and truststore. [main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroProducerApp - Creating kafka producer [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: ... [main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ... [main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ... [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1646394569975 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-0 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-5 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-2 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-8 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-9 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-4 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-1 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-6 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-7 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-3 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: CSq54uNjQNOwk8y6pVnP4Q [main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroProducerApp - Produced message to partition 5 offset 0 [main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroProducerApp - Done! [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed [main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed [main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered
This is all the coding required to make a successful produce happen.
Next Step: Consuming Data Using A Java Consumer Application
In the next step you will create a Java consumer application to use the data you just produced.
Proceed to Consuming Data (Java Kafka Client)