Cosuming Using Java Kafka Client

Creating A Consumer Application

When you have completed this step, you will have set up a consumer application that is consuming data from the stream configured in step 2 in Avro format. To get some data onto the stream, follow step 5: Create A Producer Application.

Before continuing, please check you fulfill all the prerequisites .

Building The Application

Start by including the dependency on the Kafka client and Avro serializer library same we do for producer application.

For full executable Kafka client SASL/SSL consumer example, please refer examples repository.
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.1.0</version>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>7.0.1</version>
</dependency>

Consuming using SASL

In order to create a very basic consumer application that use SASL, you need to create the configuration for the Kafka Consumer.

private static final String STREAM = "demo-local-localenv-applicationlogevent";
private static final String GROUP_ID = "demo-local-localenv-JAVA_CONSUMER";

Map<String, Object> config = new HashMap<>();

// Update the bootstrap servers to use SASL port
config.put(BOOTSTRAP_SERVERS_CONFIG, "platform.local:9097");
// SASL protocol used to communicate with brokers
config.put(SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
// SASL mechanism used for client connections
config.put(SASL_MECHANISM, "SCRAM-SHA-256");
config.put(SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", SASL_USERNAME, SASL_PASSWORD));
config.put(GROUP_ID_CONFIG, GROUP_ID);
config.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(MAX_POLL_RECORDS_CONFIG, 1);
config.put(ENABLE_AUTO_COMMIT_CONFIG, "false");

// Consumer client truststore location so client can trust brokers
config.put(SSL_TRUSTSTORE_LOCATION_CONFIG, getResourceFilePath("client-certs/axual.client.truststore.jks"));
// truststore password
config.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, new Password("notsecret"));

// Schema Registry config
config.put(SCHEMA_REGISTRY_URL_CONFIG, "https://platform.local:24000/");
// This will extract user credential from kafka client
config.put(BASIC_AUTH_CREDENTIALS_SOURCE, "SASL_INHERIT");
config.put("schema.registry." + SSL_TRUSTSTORE_LOCATION_CONFIG, getResourceFilePath("client-certs/axual.client.truststore.jks"));
config.put("schema.registry." + SSL_TRUSTSTORE_PASSWORD_CONFIG, "notsecret");

config.put(KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

When using kafka client, you need to provide the fully resolved stream name and group.id to the configuration. You can resolve the stream name and group.id by looking into the patterns.

The default patterns are:

  • Topic Pattern: {tenant}-{instance}-{environment}-{streamName}

  • Group Id Pattern: {tenant}-{instance}-{environment}-{applicationId}

  • Check your care package for the truststore file, see also Step 3

  • Replace SASL_USERNAME and SASL_PASSWORD with credentials generated while configuring the application in Step 4

  • Values of fields tenant, instance can be found in the on-boarding information, environment value is example in this Getting Started. If unsure, contact support or your technical system administrator.

  • For BOOTSTRAP_SERVERS_CONFIG and SCHEMA_REGISTRY_URL_CONFIG config values, contact support or your technical system administrator.

With above configurations, instantiate a Kafka Consumer and start consuming records.

try (KafkaConsumer<Application, ApplicationLogEvent> consumer = new KafkaConsumer<>(config)) {
    consumer.subscribe(Collections.singletonList(STREAM));
    while (true) {
        ConsumerRecords<Application, ApplicationLogEvent> records = consumer.poll(Duration.ofMillis(500));
        for (ConsumerRecord<Application, ApplicationLogEvent> record : records) {
            LOG.info("Received message on topic {} partition {} offset {} key {} value {}",
                    record.topic(), record.partition(), record.offset(), record.key(), record.value());
        }
        consumer.commitSync();
    }
}
When all of the above steps have been done correctly, start your consumer app. With the logging level to INFO, Your app should log
[main] INFO io.axual.client.example.kafkaclientsasl.avro.SaslAvroConsumerApp - Creating consumer config map
[main] INFO io.axual.client.example.kafkaclientsasl.avro.SaslAvroConsumerApp - Creating kafka consumer
[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: ...
[main] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: ...
[main] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: ...

[main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1646656149260
[main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Subscribed to topic(s): demo-local-localenv-applicationlogevent
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Cluster ID: T8kKA-6FRwOMBDq1q1HoZQ
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Discovered group coordinator 192.168.99.100:9097 (id: 2147483646 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Request joining group due to: need to re-join with the given member-id
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Successfully joined group with generation Generation{generationId=14, memberId='consumer-demo-local-localenv-JAVA_CONSUMER-1-1783dae9-915d-4b84-b8ab-b05fc02b0f5e', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Finished assignment for group at generation 14: {consumer-demo-local-localenv-JAVA_CONSUMER-1-1783dae9-915d-4b84-b8ab-b05fc02b0f5e=Assignment(partitions=[demo-local-localenv-applicationlogevent-0])}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Successfully synced group in generation Generation{generationId=14, memberId='consumer-demo-local-localenv-JAVA_CONSUMER-1-1783dae9-915d-4b84-b8ab-b05fc02b0f5e', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Notifying assignor about the new Assignment(partitions=[demo-local-localenv-applicationlogevent-0])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Adding newly assigned partitions: demo-local-localenv-applicationlogevent-0
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Setting offset for partition demo-local-localenv-applicationlogevent-0 to the committed offset FetchPosition{offset=200, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.99.100:9097 (id: 1 rack: rack-1)], epoch=0}}
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Fetch position FetchPosition{offset=200, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.99.100:9097 (id: 1 rack: rack-1)], epoch=0}} is out of range for partition demo-local-localenv-applicationlogevent-0, resetting offset
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Resetting offset for partition demo-local-localenv-applicationlogevent-0 to position FetchPosition{offset=210, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.99.100:9097 (id: 1 rack: rack-1)], epoch=0}}.
[main] INFO io.axual.client.example.kafkaclientsasl.avro.SaslAvroConsumerApp - Received message on topic demo-local-localenv-applicationlogevent partition 0 offset 210 key {"name": "applicationName", "version": null, "owner": null} value {"timestamp": 1646656165436, "source": {"name": "applicationName", "version": null, "owner": null}, "context": {"Some key": "Some Value"}, "level": "INFO", "message": "Message"}

This is all the coding required to start a basic consumer!

Consuming using Mutual TLS

In order to create a very basic consumer application using mTLS, you need create the configuration for the Kafka Consumer.

Map<String, Object> config = new HashMap<>();

// Update the bootstrap servers to use SASL port
config.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:8084");
config.put(GROUP_ID_CONFIG, GROUP_ID);
config.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(MAX_POLL_RECORDS_CONFIG, 1);
config.put(ENABLE_AUTO_COMMIT_CONFIG, "false");

config.put(SECURITY_PROTOCOL_CONFIG, "SSL");
config.put(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

// SSL Configuration
LOG.info("Using JKS type keystore and truststore.");
config.put(SSL_KEYSTORE_LOCATION_CONFIG, getResourceFilePath("client-certs/axual.client.keystore.jks"));
config.put(SSL_KEYSTORE_PASSWORD_CONFIG, "notsecret");
config.put(SSL_KEY_PASSWORD_CONFIG, "notsecret");
config.put(SSL_TRUSTSTORE_LOCATION_CONFIG, getResourceFilePath("client-certs/axual.client.truststore.jks"));
config.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, "notsecret");

// Schema Registry config
config.put(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");

// key and value deserializer
config.put(KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

When using kafka client, you need to provide the fully resolved stream name and group.id to the configuration. You can resolve the stream name and group.id by looking into the patterns.

The default patterns are:

  • Topic Pattern: {tenant}-{instance}-{environment}-{streamName}

  • Group Id Pattern: {tenant}-{instance}-{environment}-{applicationId}

  • Check your care package for the truststore file, see also Step 3

  • Values of fields tenant, instance can be found in the on-boarding information, environment value is example in this Getting Started. If unsure, contact support or your technical system administrator.

  • For BOOTSTRAP_SERVERS_CONFIG and SCHEMA_REGISTRY_URL_CONFIG config values, contact support or your technical system administrator.

With above configurations, instantiate a Kafka Consumer and start consuming records.

try (KafkaConsumer<Application, ApplicationLogEvent> consumer = new KafkaConsumer<>(config)) {
    consumer.subscribe(Collections.singletonList(STREAM));
    while (true) {
        ConsumerRecords<Application, ApplicationLogEvent> records = consumer.poll(Duration.ofMillis(500));
        for (ConsumerRecord<Application, ApplicationLogEvent> record : records) {
            LOG.info("Received message on topic {} partition {} offset {} key {} value {}",
                    record.topic(), record.partition(), record.offset(), record.key(), record.value());
        }
        consumer.commitSync();
    }
}
When all of the above steps have been done correctly, start your consumer app. With the logging level to INFO, Your app should log
[main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroConsumerApp - Creating consumer config map
[main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroConsumerApp - Using JKS type keystore and truststore.
[main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroConsumerApp - Creating kafka consumer
[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: ...
[main] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: ...
[main] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: ...

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1646654136174
[main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Subscribed to topic(s): axual-example-local-avro-applicationlog
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-0 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-5 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-2 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-8 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-9 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-4 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-1 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-6 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-7 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-3 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Cluster ID: 8rd8-LboS2WQEqxL5dBpAQ
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Discovered group coordinator localhost:8084 (id: 2147483646 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Request joining group due to: need to re-join with the given member-id
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Successfully joined group with generation Generation{generationId=1, memberId='consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1-a5b3329f-55c6-4fb2-9658-b26ef574e649', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Finished assignment for group at generation 1: {consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1-a5b3329f-55c6-4fb2-9658-b26ef574e649=Assignment(partitions=[axual-example-local-avro-applicationlog-0, axual-example-local-avro-applicationlog-1, axual-example-local-avro-applicationlog-2, axual-example-local-avro-applicationlog-3, axual-example-local-avro-applicationlog-4, axual-example-local-avro-applicationlog-5, axual-example-local-avro-applicationlog-6, axual-example-local-avro-applicationlog-7, axual-example-local-avro-applicationlog-8, axual-example-local-avro-applicationlog-9])}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Successfully synced group in generation Generation{generationId=1, memberId='consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1-a5b3329f-55c6-4fb2-9658-b26ef574e649', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Notifying assignor about the new Assignment(partitions=[axual-example-local-avro-applicationlog-0, axual-example-local-avro-applicationlog-1, axual-example-local-avro-applicationlog-2, axual-example-local-avro-applicationlog-3, axual-example-local-avro-applicationlog-4, axual-example-local-avro-applicationlog-5, axual-example-local-avro-applicationlog-6, axual-example-local-avro-applicationlog-7, axual-example-local-avro-applicationlog-8, axual-example-local-avro-applicationlog-9])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Adding newly assigned partitions: axual-example-local-avro-applicationlog-9, axual-example-local-avro-applicationlog-8, axual-example-local-avro-applicationlog-7, axual-example-local-avro-applicationlog-6, axual-example-local-avro-applicationlog-5, axual-example-local-avro-applicationlog-4, axual-example-local-avro-applicationlog-3, axual-example-local-avro-applicationlog-2, axual-example-local-avro-applicationlog-1, axual-example-local-avro-applicationlog-0
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-9
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-8
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-7
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-6
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-5
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-4
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-3
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-2
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-1
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-0
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-9 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-8 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-7 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-6 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-5 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-4 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-3 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-2 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
[main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroConsumerApp - Received message on topic axual-example-local-avro-applicationlog partition 6 offset 0 key {"name": "axual speaks spring", "version": "1.0.0", "owner": "none"} value {"timestamp": 1646652376145, "source": {"name": "Axual Consumer", "version": "1.0.0", "owner": "none"}, "context": {"Some key": "Some Value"}, "level": "INFO", "message": "Sending a message!"}

This is all the coding required to start a basic consumer!

Wrapping up

You have concluded the getting started section by preparing your stream & applications, requesting access to the stream and actually producing and consuming some data. If you are going to deploy your application in another environment, it is advised to enable monitoring.

You can also use the menu on the left to find information about other platform features, that might not have been touched in this Getting Started.