Producing Using Java Kafka Client

Creating A Producer Application

When you have completed this step, you will have set up a producer application that is producing some randomly generated data in Avro format to the stream you have configured in Creating streams.

Before continuing, please check you fulfill all the Prerequisites.

Building The Application

Start by including the dependency on the Kafka client and Avro serializer library as you would do with any dependency.

For full executable Kafka client SASL/SSL producer example, please refer examples repository.
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.1.0</version>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>7.0.1</version>
</dependency>

Producing using SASL

In order to create a very basic producer application that use SASL, you need to create the configuration for the Kafka Producer.

private static final String STREAM = "demo-local-localenv-applicationlogevent";

Map<String, Object> config = new HashMap<>();

// Update the bootstrap servers to use SASL port
config.put(BOOTSTRAP_SERVERS_CONFIG, "platform.local:9097");
// SASL protocol used to communicate with brokers
config.put(SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
// SASL mechanism used for client connections
config.put(SASL_MECHANISM, "SCRAM-SHA-256");
config.put(SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", SASL_USERNAME, SASL_PASSWORD));

// Producer client truststore location so client can trust brokers
config.put(SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/axual.client.truststore.jks");
// truststore password
config.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, new Password("notsecret"));

// Schema Registry config
config.put(SCHEMA_REGISTRY_URL_CONFIG, "https://platform.local:24000/");
config.put(AUTO_REGISTER_SCHEMAS, false);
// This will extract user credential from kafka client
config.put(BASIC_AUTH_CREDENTIALS_SOURCE, "SASL_INHERIT");
config.put("schema.registry." + SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/axual.client.truststore.jks");
config.put("schema.registry." + SSL_TRUSTSTORE_PASSWORD_CONFIG, "notsecret");

// Key and value serializer
config.put(KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
config.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

When using kafka client, you need to provide the fully resolved stream name to the configuration. You can resolve the stream name by looking into the pattern.

The default topic pattern is {tenant}-{instance}-{environment}-{streamName}

  • Check your care package for the truststore file, see also Security

  • Replace SASL_USERNAME and SASL_PASSWORD with credentials generated while configuring the application in Creating Applications

  • Values of fields tenant, instance can be found in the on-boarding information, environment value is example in this Getting Started. If unsure, contact support or your technical system administrator.

  • For BOOTSTRAP_SERVERS_CONFIG and SCHEMA_REGISTRY_URL_CONFIG config values, contact support or your technical system administrator.

With above configurations, instantiate a Kafka Producer and start sending records.

try (final Producer<Application, ApplicationLogEvent> producer = new KafkaProducer<>(config)) {

    futures = IntStream.range(0, 1)
            .mapToObj(i ->
                    producer.send(
                            new ProducerRecord<Application, ApplicationLogEvent>(STREAM, key, value))
            ).collect(Collectors.toList());

    do {
        futures.removeIf(future -> {
            if (!future.isDone()) {
                return false;
            }

            try {
                RecordMetadata producedMessage = future.get();
                LOG.info("Produced message to partition {} offset {}", producedMessage.partition(), producedMessage.offset());
            } catch (InterruptedException | ExecutionException e) {
                LOG.error("Error getting future, produce failed", e);
            }
            return true;
        });
        sleep(100);
    } while (!futures.isEmpty());
When all of the above steps have been done correctly, start your producer app. With the logging level to INFO, Your app should log
[main] INFO io.axual.client.example.kafkaclientsasl.avro.SaslAvroProducerApp - Creating producer config map
[main] INFO io.axual.client.example.kafkaclientsasl.avro.SaslAvroProducerApp - Creating kafka producer
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ...

[main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ...
[main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ...
[main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1646396311784
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: T8kKA-6FRwOMBDq1q1HoZQ
[main] INFO io.axual.client.example.kafkaclientsasl.avro.SaslAvroProducerApp - Produced message to partition 0 offset 1
[main] INFO io.axual.client.example.kafkaclientsasl.avro.SaslAvroProducerApp - Done!
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered

This is all the coding required to make a successful produce happen.

Producing using Mutual TLS

In order to create a very basic producer application using mTLS, you need create the configuration for the Kafka Producer.

Map<String, Object> config = new HashMap<>();

        // Update the bootstrap servers to use SSL port
        config.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:8084");

        config.put(SECURITY_PROTOCOL_CONFIG, "SSL");
        config.put(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

        // SSL Configuration
        config.put(SSL_KEYSTORE_LOCATION_CONFIG, "/path/to/axual.client.keystore.jks");
        LOG.info("Using JKS type keystore and truststore.");
        config.put(SSL_KEYSTORE_PASSWORD_CONFIG, "notsecret");
        config.put(SSL_KEY_PASSWORD_CONFIG, "notsecret");
        config.put(SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/axual.client.truststore.jks");
        config.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, "notsecret");

        // Schema Registry config
        config.put(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
        config.put(AUTO_REGISTER_SCHEMAS, false);

        // Key and value serializer
        config.put(KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        config.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

When using kafka client, you need to provide the fully resolved stream name to the configuration. You can resolve the stream name by looking into the pattern.

The default topic pattern is {tenant}-{instance}-{environment}-{streamName}

  • Check your care package for the truststore file, see also Security

  • Values of fields tenant, instance can be found in the on-boarding information, environment value is example in this Getting Started. If unsure, contact support or your technical system administrator.

  • For BOOTSTRAP_SERVERS_CONFIG and SCHEMA_REGISTRY_URL_CONFIG config values, contact support or your technical system administrator.

With above configurations, instantiate a Kafka Producer and start sending records.

try (final Producer<Application, ApplicationLogEvent> producer = new KafkaProducer<>(config)) {

    futures = IntStream.range(0, 1)
            .mapToObj(i ->
                    producer.send(
                            new ProducerRecord<Application, ApplicationLogEvent>(STREAM, key, value))
            ).collect(Collectors.toList());

    do {
        futures.removeIf(future -> {
            if (!future.isDone()) {
                return false;
            }

            try {
                RecordMetadata producedMessage = future.get();
                LOG.info("Produced message to partition {} offset {}", producedMessage.partition(), producedMessage.offset());
            } catch (InterruptedException | ExecutionException e) {
                LOG.error("Error getting future, produce failed", e);
            }
            return true;
        });
        sleep(100);
    } while (!futures.isEmpty());
When all of the above steps have been done correctly, start your producer app. With the logging level to INFO, Your app should log
[main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroProducerApp - Creating producer config map
[main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroProducerApp - Using JKS type keystore and truststore.
[main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroProducerApp - Creating kafka producer
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: ...

[main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ...

[main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ...

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1646394569975
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-0 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-5 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-2 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-8 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-9 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-4 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-1 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-6 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-7 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-3 to 0 since the associated topicId changed from null to D_ayGObHTJCJiCd40B0kHA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: CSq54uNjQNOwk8y6pVnP4Q
[main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroProducerApp - Produced message to partition 5 offset 0
[main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroProducerApp - Done!
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered

This is all the coding required to make a successful produce happen.

Next Step: Consuming Data Using A Java Consumer Application

In the next step you will create a Java consumer application to use the data you just produced.