Cosuming Using Java Kafka Client
Creating A Consumer Application
When you have completed this step, you will have set up a consumer application that is consuming data from the stream configured in Creating streams in Avro format. To get some data onto the stream, follow Create A Producer Application.
Before continuing, please check you fulfill all the prerequisites .
Building The Application
Start by including the dependency on the Kafka client and Avro serializer library same we do for producer application.
For full executable Kafka client SASL/SSL consumer example, please refer examples repository. |
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.0.1</version>
</dependency>
Consuming using SASL
In order to create a very basic consumer application that use SASL, you need to create the configuration for the Kafka Consumer
.
private static final String STREAM = "demo-local-localenv-applicationlogevent";
private static final String GROUP_ID = "demo-local-localenv-JAVA_CONSUMER";
Map<String, Object> config = new HashMap<>();
// Update the bootstrap servers to use SASL port
config.put(BOOTSTRAP_SERVERS_CONFIG, "platform.local:9097");
// SASL protocol used to communicate with brokers
config.put(SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
// SASL mechanism used for client connections
config.put(SASL_MECHANISM, "SCRAM-SHA-256");
config.put(SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", SASL_USERNAME, SASL_PASSWORD));
config.put(GROUP_ID_CONFIG, GROUP_ID);
config.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(MAX_POLL_RECORDS_CONFIG, 1);
config.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
// Consumer client truststore location so client can trust brokers
config.put(SSL_TRUSTSTORE_LOCATION_CONFIG, getResourceFilePath("client-certs/axual.client.truststore.jks"));
// truststore password
config.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, new Password("notsecret"));
// Schema Registry config
config.put(SCHEMA_REGISTRY_URL_CONFIG, "https://platform.local:24000/");
// This will extract user credential from kafka client
config.put(BASIC_AUTH_CREDENTIALS_SOURCE, "SASL_INHERIT");
config.put("schema.registry." + SSL_TRUSTSTORE_LOCATION_CONFIG, getResourceFilePath("client-certs/axual.client.truststore.jks"));
config.put("schema.registry." + SSL_TRUSTSTORE_PASSWORD_CONFIG, "notsecret");
config.put(KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
When using kafka client, you need to provide the fully resolved The default patterns are:
|
|
With above configurations, instantiate a Kafka Consumer
and start consuming records.
try (KafkaConsumer<Application, ApplicationLogEvent> consumer = new KafkaConsumer<>(config)) {
consumer.subscribe(Collections.singletonList(STREAM));
while (true) {
ConsumerRecords<Application, ApplicationLogEvent> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<Application, ApplicationLogEvent> record : records) {
LOG.info("Received message on topic {} partition {} offset {} key {} value {}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
}
- When all of the above steps have been done correctly, start your consumer app. With the logging level to INFO, Your app should log
[main] INFO io.axual.client.example.kafkaclientsasl.avro.SaslAvroConsumerApp - Creating consumer config map [main] INFO io.axual.client.example.kafkaclientsasl.avro.SaslAvroConsumerApp - Creating kafka consumer [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: ... [main] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: ... [main] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: ... [main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in. [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1646656149260 [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Subscribed to topic(s): demo-local-localenv-applicationlogevent [main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Cluster ID: T8kKA-6FRwOMBDq1q1HoZQ [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Discovered group coordinator 192.168.99.100:9097 (id: 2147483646 rack: null) [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] (Re-)joining group [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Request joining group due to: need to re-join with the given member-id [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] (Re-)joining group [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Successfully joined group with generation Generation{generationId=14, memberId='consumer-demo-local-localenv-JAVA_CONSUMER-1-1783dae9-915d-4b84-b8ab-b05fc02b0f5e', protocol='range'} [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Finished assignment for group at generation 14: {consumer-demo-local-localenv-JAVA_CONSUMER-1-1783dae9-915d-4b84-b8ab-b05fc02b0f5e=Assignment(partitions=[demo-local-localenv-applicationlogevent-0])} [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Successfully synced group in generation Generation{generationId=14, memberId='consumer-demo-local-localenv-JAVA_CONSUMER-1-1783dae9-915d-4b84-b8ab-b05fc02b0f5e', protocol='range'} [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Notifying assignor about the new Assignment(partitions=[demo-local-localenv-applicationlogevent-0]) [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Adding newly assigned partitions: demo-local-localenv-applicationlogevent-0 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Setting offset for partition demo-local-localenv-applicationlogevent-0 to the committed offset FetchPosition{offset=200, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.99.100:9097 (id: 1 rack: rack-1)], epoch=0}} [main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Fetch position FetchPosition{offset=200, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.99.100:9097 (id: 1 rack: rack-1)], epoch=0}} is out of range for partition demo-local-localenv-applicationlogevent-0, resetting offset [main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-demo-local-localenv-JAVA_CONSUMER-1, groupId=demo-local-localenv-JAVA_CONSUMER] Resetting offset for partition demo-local-localenv-applicationlogevent-0 to position FetchPosition{offset=210, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.99.100:9097 (id: 1 rack: rack-1)], epoch=0}}. [main] INFO io.axual.client.example.kafkaclientsasl.avro.SaslAvroConsumerApp - Received message on topic demo-local-localenv-applicationlogevent partition 0 offset 210 key {"name": "applicationName", "version": null, "owner": null} value {"timestamp": 1646656165436, "source": {"name": "applicationName", "version": null, "owner": null}, "context": {"Some key": "Some Value"}, "level": "INFO", "message": "Message"}
This is all the coding required to start a basic consumer!
Consuming using Mutual TLS
In order to create a very basic consumer application using mTLS, you need create the configuration for the Kafka Consumer
.
Map<String, Object> config = new HashMap<>();
// Update the bootstrap servers to use SASL port
config.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:8084");
config.put(GROUP_ID_CONFIG, GROUP_ID);
config.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(MAX_POLL_RECORDS_CONFIG, 1);
config.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(SECURITY_PROTOCOL_CONFIG, "SSL");
config.put(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
// SSL Configuration
LOG.info("Using JKS type keystore and truststore.");
config.put(SSL_KEYSTORE_LOCATION_CONFIG, getResourceFilePath("client-certs/axual.client.keystore.jks"));
config.put(SSL_KEYSTORE_PASSWORD_CONFIG, "notsecret");
config.put(SSL_KEY_PASSWORD_CONFIG, "notsecret");
config.put(SSL_TRUSTSTORE_LOCATION_CONFIG, getResourceFilePath("client-certs/axual.client.truststore.jks"));
config.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, "notsecret");
// Schema Registry config
config.put(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
// key and value deserializer
config.put(KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
When using kafka client, you need to provide the fully resolved The default patterns are:
|
|
With above configurations, instantiate a Kafka Consumer
and start consuming records.
try (KafkaConsumer<Application, ApplicationLogEvent> consumer = new KafkaConsumer<>(config)) {
consumer.subscribe(Collections.singletonList(STREAM));
while (true) {
ConsumerRecords<Application, ApplicationLogEvent> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<Application, ApplicationLogEvent> record : records) {
LOG.info("Received message on topic {} partition {} offset {} key {} value {}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
}
- When all of the above steps have been done correctly, start your consumer app. With the logging level to INFO, Your app should log
[main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroConsumerApp - Creating consumer config map [main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroConsumerApp - Using JKS type keystore and truststore. [main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroConsumerApp - Creating kafka consumer [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: ... [main] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: ... [main] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: ... [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1646654136174 [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Subscribed to topic(s): axual-example-local-avro-applicationlog [main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-0 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ [main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-5 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ [main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-2 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ [main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-8 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ [main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-9 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ [main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-4 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ [main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-1 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ [main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-6 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ [main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-7 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ [main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting the last seen epoch of partition axual-example-local-avro-applicationlog-3 to 0 since the associated topicId changed from null to IoM4NcSgSoCJZ6OqKusvNQ [main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Cluster ID: 8rd8-LboS2WQEqxL5dBpAQ [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Discovered group coordinator localhost:8084 (id: 2147483646 rack: null) [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] (Re-)joining group [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Request joining group due to: need to re-join with the given member-id [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] (Re-)joining group [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Successfully joined group with generation Generation{generationId=1, memberId='consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1-a5b3329f-55c6-4fb2-9658-b26ef574e649', protocol='range'} [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Finished assignment for group at generation 1: {consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1-a5b3329f-55c6-4fb2-9658-b26ef574e649=Assignment(partitions=[axual-example-local-avro-applicationlog-0, axual-example-local-avro-applicationlog-1, axual-example-local-avro-applicationlog-2, axual-example-local-avro-applicationlog-3, axual-example-local-avro-applicationlog-4, axual-example-local-avro-applicationlog-5, axual-example-local-avro-applicationlog-6, axual-example-local-avro-applicationlog-7, axual-example-local-avro-applicationlog-8, axual-example-local-avro-applicationlog-9])} [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Successfully synced group in generation Generation{generationId=1, memberId='consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1-a5b3329f-55c6-4fb2-9658-b26ef574e649', protocol='range'} [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Notifying assignor about the new Assignment(partitions=[axual-example-local-avro-applicationlog-0, axual-example-local-avro-applicationlog-1, axual-example-local-avro-applicationlog-2, axual-example-local-avro-applicationlog-3, axual-example-local-avro-applicationlog-4, axual-example-local-avro-applicationlog-5, axual-example-local-avro-applicationlog-6, axual-example-local-avro-applicationlog-7, axual-example-local-avro-applicationlog-8, axual-example-local-avro-applicationlog-9]) [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Adding newly assigned partitions: axual-example-local-avro-applicationlog-9, axual-example-local-avro-applicationlog-8, axual-example-local-avro-applicationlog-7, axual-example-local-avro-applicationlog-6, axual-example-local-avro-applicationlog-5, axual-example-local-avro-applicationlog-4, axual-example-local-avro-applicationlog-3, axual-example-local-avro-applicationlog-2, axual-example-local-avro-applicationlog-1, axual-example-local-avro-applicationlog-0 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-9 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-8 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-7 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-6 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-5 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-4 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-3 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-2 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-1 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-0 [main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-9 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. [main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-8 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. [main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-7 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. [main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-6 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. [main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-5 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. [main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-4 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. [main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-3 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. [main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-2 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. [main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. [main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-axual-example-local-io.axual.example.proxy.avro.consumer-1, groupId=axual-example-local-io.axual.example.proxy.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. [main] INFO io.axual.client.example.kafkaclientssl.avro.SslAvroConsumerApp - Received message on topic axual-example-local-avro-applicationlog partition 6 offset 0 key {"name": "axual speaks spring", "version": "1.0.0", "owner": "none"} value {"timestamp": 1646652376145, "source": {"name": "Axual Consumer", "version": "1.0.0", "owner": "none"}, "context": {"Some key": "Some Value"}, "level": "INFO", "message": "Sending a message!"}
This is all the coding required to start a basic consumer!
Wrapping up
You have concluded the getting started section by preparing your stream & applications, requesting access to the stream and actually producing and consuming some data. If you are going to deploy your application in another environment, it is advised to enable monitoring.
Proceed to Enabling Monitoring
You can also use the menu on the left to find information about other platform features, that might not have been touched in this Getting Started.