Consuming Using Java Spring Kafka
Creating A Consumer Application
Like producer application, consumer application uses Axual Platform combined with Spring Boot.
When you have completed this step, you will have set up a consumer application that is consuming data from the stream configured in Creating topics in Avro format. To get some data onto the stream, follow Create A Producer Application.
Create Spring Boot project using your IDE or Spring Initializr.
Spring doesn’t support SASL authentication. If you want to use SASL , please refer Kafka client consumer using SASL
|
Building The Application
The start is similar to creating a producer application. You start by including the maven dependency on the Axual Client and Spring framework.
For full executable Spring kafka Avro consumer example, please refer examples repository. |
<!-- Axual client proxy-->
<dependency>
<groupId>io.axual.client</groupId>
<artifactId>axual-client-proxy</artifactId>
<version>5.8.1</version>
</dependency>
<!-- Enables SpringBoot Kafka functionality -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- SpringBoot starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Optionally when using Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.1</version>
</dependency>
In order to create a consumer application, you need to provide the configuration for the consumer in resources/application.yaml
file.
axual:
endpoint: "http://127.0.0.1:8081"
tenant: "axual"
environment: "local"
stream: "avro-applicationlog"
spring:
kafka:
consumer:
client-id: io.axual.example.client.avro.consumer
group-id: io.axual.example.client.avro.consumer
max-poll-records: 100
auto-offset-reset: earliest
enable-auto-commit: false
properties:
security.protocol: SSL
ssl:
# Using JKS
key-store-location: client-certs/axual.client.keystore.jks
key-store-password: notsecret
key-password: notsecret
trust-store-location: client-certs/axual.client.truststore.jks
trust-store-password: notsecret
Check your essentials-package for the truststore and keystore files, see also Security |
Create AxualKafkaConsumerFactory<K, V>
bean class by extending DefaultKafkaConsumerFactory<K, V>
.
public class AxualKafkaConsumerFactory<K, V> extends DefaultKafkaConsumerFactory<K, V> {
public AxualKafkaConsumerFactory(Map<String, Object> configs) {
super(configs);
}
@Override
public Consumer<K, V> createConsumer(String groupId,
String clientIdPrefix,
String clientIdSuffixArg,
Properties properties) {
return new AxualConsumer<>(getConfigurationProperties());
}
}
Define consumer beans configuration.
@EnableKafka
@Configuration
public class SpringAvroConsumer {
@Value("${axual.endpoint}")
private String endpoint;
@Value("${axual.tenant}")
private String tenant;
@Value("${axual.environment}")
private String environment;
private final KafkaProperties kafkaProperties;
@Autowired
public SpringAvroConsumer(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(kafkaProperties.buildConsumerProperties());
/*
* The Proxy chain config determines what added features you want when connecting to Axual.
*
* SWITCHING PROXY: Enables automatic failover to a different Kafka cluster. Team Speed can
* trigger a switch when performing Kafka cluster maintenance. By passing the CommonConfig.ENDPOINT
* config, the consumer discovers the Kafka broker coordinates automatically.
*
* RESOLVING PROXY: Enables resolution of topic and group id names to actual names in Kafka
* cluster. This allows multi-tenancy and multi-environment support on a single Kafka cluster.
*
* LINEAGE PROXY: Adds useful metadata in message headers to allow tracing of messages through Axual.
*
* LOGGING PROXY: Can be activated anywhere in the chain to log the state.
*/
props.put(AxualConsumerConfig.CHAIN_CONFIG, ProxyChain.newBuilder()
.append(ProxyTypeRegistry.SWITCHING_PROXY_ID)
.append(ProxyTypeRegistry.RESOLVING_PROXY_ID)
.append(ProxyTypeRegistry.HEADER_PROXY_ID)
.append(ProxyTypeRegistry.LINEAGE_PROXY_ID)
.build());
props.put(CommonConfig.APPLICATION_ID, kafkaProperties.getConsumer().getClientId());
props.put(CommonConfig.APPLICATION_VERSION, "1.0.0");
props.put(CommonConfig.TENANT, tenant);
props.put(CommonConfig.ENVIRONMENT, environment);
props.put(CommonConfig.ENDPOINT, endpoint);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class.getName());
return props;
}
@Bean
public ConsumerFactory<Application, ApplicationLogEvent> consumerFactory() {
return new AxualKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Application, ApplicationLogEvent> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Application, ApplicationLogEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Since Spring Kafka 2.3, container fails to start if topic is missing. This check is done by
// initializing an Admin Client. We disable this to skip checking topic existence.
factory.setMissingTopicsFatal(false);
return factory;
}
}
Update your SpringBootApplication class to start consuming records using spring kafka listener annotation @KafkaListener
@SpringBootApplication
public class AvroConsumerApplication {
private static final Logger LOG = LoggerFactory.getLogger(AvroConsumerApplication.class);
public static void main(String[] args) {
SpringApplication.run(AvroConsumerApplication.class, args);
}
@KafkaListener(topics = "#{'${axual.stream}'}")
public void listen(ConsumerRecord<Application, ApplicationLogEvent> event) {
long now = System.currentTimeMillis();
LOG.info("Received message on topic {} partition {} offset {} key '{}' value '{}'. Time to consume {} ms",
event.topic(), event.partition(), event.offset(), event.key(), event.value(), now - event.timestamp());
}
}
- When all of the above steps have been done correctly, start your consumer app. With the logging level
INFO
, Your app should log
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.5.4) 2022-03-07 12:25:52.032 INFO 7292 --- [ main] i.a.c.examples.AvroConsumerApplication : Starting AvroConsumerApplication using Java 11.0.2 on MacBook-Pro.local with PID 7292 (/Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples/spring-examples/axual-client-spring-examples/axual-spring-kafka-avro-consumer/target/classes started by shalabh in /Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples) 2022-03-07 12:25:52.040 INFO 7292 --- [ main] i.a.c.examples.AvroConsumerApplication : No active profile set, falling back to default profiles: default 2022-03-07 12:25:53.981 INFO 7292 --- [ main] i.a.d.client.fetcher.DiscoveryLoader : TTL updated to: 5000 (was: 0) 2022-03-07 12:25:53.981 INFO 7292 --- [ main] i.a.d.client.fetcher.DiscoveryLoader : Fetched discovery properties: DiscoveryResult: {group.id.pattern={tenant}-{instance}-{environment}-{group}, acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, topic.pattern={tenant}-{instance}-{environment}-{topic}, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ttl=5000, distributor.distance=2, enable.value.headers=false, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, schema.registry.url=http://localhost:8082, environment=local, system=platform-test-standalone, distributor.timeout=20000, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, tenant=axual} 2022-03-07 12:25:53.982 INFO 7292 --- [ main] i.a.c.p.s.discovery.DiscoverySubscriber : Received new DiscoveryResult for SwitchingConsumer: DiscoveryResult: {group.id.pattern={tenant}-{instance}-{environment}-{group}, acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, topic.pattern={tenant}-{instance}-{environment}-{topic}, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ttl=5000, distributor.distance=2, enable.value.headers=false, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, schema.registry.url=http://localhost:8082, environment=local, system=platform-test-standalone, distributor.timeout=20000, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, tenant=axual} 2022-03-07 12:25:53.982 INFO 7292 --- [ main] i.a.c.p.s.discovery.DiscoverySubscriber : Switching SwitchingConsumer from null to clusterA 2022-03-07 12:25:53.982 INFO 7292 --- [ main] i.a.c.p.s.g.BaseClientProxySwitcher : Creating new backing consumer with Discovery API result: DiscoveryResult: {group.id.pattern={tenant}-{instance}-{environment}-{group}, acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, topic.pattern={tenant}-{instance}-{environment}-{topic}, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ttl=5000, distributor.distance=2, enable.value.headers=false, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, schema.registry.url=http://localhost:8082, environment=local, system=platform-test-standalone, distributor.timeout=20000, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, tenant=axual} 2022-03-07 12:25:53.982 INFO 7292 --- [ main] i.a.c.p.s.consumer.ConsumerSwitcher : Creating a new consumer with properties: {acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, value.deserializer=io.axual.serde.avro.SpecificAvroDeserializer, instance=example, group.id=io.axual.example.client.avro.consumer, partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor, topic.pattern={tenant}-{instance}-{environment}-{topic}, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ssl.keystore.type=JKS, schema.registry.url=http://localhost:8082, endpoint=http://127.0.0.1:8081, enable.auto.commit=false, ssl.key.password=[hidden], ssl.truststore.password=[hidden], tenant=axual, client.id=consumer-io.axual.example.client.avro.consumer-1, ssl.endpoint.identification.algorithm=, key.deserializer=io.axual.serde.avro.SpecificAvroDeserializer, group.id.pattern={tenant}-{instance}-{environment}-{group}, ssl.protocol=TLSv1.3, max.poll.records=100, ssl.enabled.protocols=[TLSv1.2, TLSv1.3], ssl.keystore.location=/Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples/spring-examples/axual-client-spring-examples/axual-spring-kafka-avro-consumer/target/classes/client-certs/axual.client.keystore.jks, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, app.id=io.axual.example.client.avro.consumer, ttl=5000, distributor.distance=2, ssl.truststore.type=JKS, enable.value.headers=false, app.version=1.0.0, security.protocol=SSL, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, environment=local, ssl.truststore.location=/Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples/spring-examples/axual-client-spring-examples/axual-spring-kafka-avro-consumer/target/classes/client-certs/axual.client.truststore.jks, system=platform-test-standalone, distributor.timeout=20000, ssl.keystore.password=[hidden], isolation.level=read_uncommitted, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, auto.offset.reset=earliest} 2022-03-07 12:25:54.011 INFO 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: ... 2022-03-07 12:25:54.027 INFO 7292 --- [ main] i.c.k.s.KafkaAvroDeserializerConfig : KafkaAvroDeserializerConfig values: ... 2022-03-07 12:25:54.034 INFO 7292 --- [ main] io.axual.common.config.SslEngineConfig : SslEngineConfig values: ... 2022-03-07 12:25:54.093 INFO 7292 --- [ main] i.c.k.s.KafkaAvroDeserializerConfig : KafkaAvroDeserializerConfig values: ... 2022-03-07 12:25:54.094 INFO 7292 --- [ main] i.c.k.s.KafkaAvroDeserializerConfig : KafkaAvroDeserializerConfig values: ... 2022-03-07 12:25:54.150 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'acl.principal.builder' was supplied but isn't a known config. 2022-03-07 12:25:54.150 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'cluster' was supplied but isn't a known config. 2022-03-07 12:25:54.150 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'instance' was supplied but isn't a known config. 2022-03-07 12:25:54.150 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'resolvingconsumerpartitionassignor.topic.resolver' was supplied but isn't a known config. 2022-03-07 12:25:54.150 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'topic.pattern' was supplied but isn't a known config. 2022-03-07 12:25:54.150 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'schema.registry.url' was supplied but isn't a known config. 2022-03-07 12:25:54.150 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'endpoint' was supplied but isn't a known config. 2022-03-07 12:25:54.150 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'resolvingconsumerpartitionassignor.backing.assignor' was supplied but isn't a known config. 2022-03-07 12:25:54.151 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'tenant' was supplied but isn't a known config. 2022-03-07 12:25:54.151 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'group.id.pattern' was supplied but isn't a known config. 2022-03-07 12:25:54.151 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'transactional.id.resolver' was supplied but isn't a known config. 2022-03-07 12:25:54.151 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'app.id' was supplied but isn't a known config. 2022-03-07 12:25:54.151 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'ttl' was supplied but isn't a known config. 2022-03-07 12:25:54.151 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'distributor.distance' was supplied but isn't a known config. 2022-03-07 12:25:54.151 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'enable.value.headers' was supplied but isn't a known config. 2022-03-07 12:25:54.151 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'app.version' was supplied but isn't a known config. 2022-03-07 12:25:54.151 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'transactional.id.pattern' was supplied but isn't a known config. 2022-03-07 12:25:54.151 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'environment' was supplied but isn't a known config. 2022-03-07 12:25:54.151 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'system' was supplied but isn't a known config. 2022-03-07 12:25:54.151 WARN 7292 --- [ main] o.a.k.clients.consumer.ConsumerConfig : The configuration 'distributor.timeout' was supplied but isn't a known config. 2022-03-07 12:25:54.153 INFO 7292 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.7.1 2022-03-07 12:25:54.153 INFO 7292 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 61dbce85d0d41457 2022-03-07 12:25:54.153 INFO 7292 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1646652354152 2022-03-07 12:25:54.155 INFO 7292 --- [ main] i.a.c.p.s.g.BaseClientProxySwitcher : Created new backing consumer 2022-03-07 12:25:54.156 INFO 7292 --- [ main] i.a.c.p.s.consumer.ConsumerSwitcher : Consumer switched, applying assignments and subscriptions 2022-03-07 12:25:54.156 INFO 7292 --- [ main] i.a.c.p.s.consumer.ConsumerSwitcher : Consumer switch finished 2022-03-07 12:25:54.157 INFO 7292 --- [ main] i.a.c.p.s.consumer.SwitchingConsumer : Subscribing to topics: [avro-applicationlog] 2022-03-07 12:25:54.158 INFO 7292 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Subscribed to topic(s): axual-example-local-avro-applicationlog 2022-03-07 12:25:54.224 INFO 7292 --- [ main] i.a.c.examples.AvroConsumerApplication : Started AvroConsumerApplication in 2.797 seconds (JVM running for 3.8) 2022-03-07 12:25:54.785 INFO 7292 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Cluster ID: 8rd8-LboS2WQEqxL5dBpAQ 2022-03-07 12:25:56.082 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Discovered group coordinator localhost:8084 (id: 2147483646 rack: null) 2022-03-07 12:25:56.086 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] (Re-)joining group 2022-03-07 12:25:56.185 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] (Re-)joining group 2022-03-07 12:25:59.235 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Successfully joined group with generation Generation{generationId=1, memberId='consumer-io.axual.example.client.avro.consumer-1-d06edc45-6a13-4458-b43a-234c4e893787', protocol='range'} 2022-03-07 12:25:59.241 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Finished assignment for group at generation 1: {consumer-io.axual.example.client.avro.consumer-1-d06edc45-6a13-4458-b43a-234c4e893787=Assignment(partitions=[axual-example-local-avro-applicationlog-9, axual-example-local-avro-applicationlog-8, axual-example-local-avro-applicationlog-7, axual-example-local-avro-applicationlog-6, axual-example-local-avro-applicationlog-5, axual-example-local-avro-applicationlog-4, axual-example-local-avro-applicationlog-3, axual-example-local-avro-applicationlog-2, axual-example-local-avro-applicationlog-1, axual-example-local-avro-applicationlog-0])} 2022-03-07 12:25:59.390 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Successfully synced group in generation Generation{generationId=1, memberId='consumer-io.axual.example.client.avro.consumer-1-d06edc45-6a13-4458-b43a-234c4e893787', protocol='range'} 2022-03-07 12:25:59.391 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Notifying assignor about the new Assignment(partitions=[axual-example-local-avro-applicationlog-9, axual-example-local-avro-applicationlog-8, axual-example-local-avro-applicationlog-7, axual-example-local-avro-applicationlog-6, axual-example-local-avro-applicationlog-5, axual-example-local-avro-applicationlog-4, axual-example-local-avro-applicationlog-3, axual-example-local-avro-applicationlog-2, axual-example-local-avro-applicationlog-1, axual-example-local-avro-applicationlog-0]) 2022-03-07 12:25:59.394 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Adding newly assigned partitions: axual-example-local-avro-applicationlog-9, axual-example-local-avro-applicationlog-8, axual-example-local-avro-applicationlog-7, axual-example-local-avro-applicationlog-6, axual-example-local-avro-applicationlog-5, axual-example-local-avro-applicationlog-4, axual-example-local-avro-applicationlog-3, axual-example-local-avro-applicationlog-2, axual-example-local-avro-applicationlog-1, axual-example-local-avro-applicationlog-0 2022-03-07 12:25:59.419 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-9 2022-03-07 12:25:59.419 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-8 2022-03-07 12:25:59.419 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-7 2022-03-07 12:25:59.419 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-6 2022-03-07 12:25:59.419 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-5 2022-03-07 12:25:59.419 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-4 2022-03-07 12:25:59.420 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-3 2022-03-07 12:25:59.420 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-2 2022-03-07 12:25:59.420 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-1 2022-03-07 12:25:59.420 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-0 2022-03-07 12:25:59.445 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-9 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. 2022-03-07 12:25:59.446 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-8 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. 2022-03-07 12:25:59.446 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-7 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. 2022-03-07 12:25:59.446 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-6 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. 2022-03-07 12:25:59.446 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-5 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. 2022-03-07 12:25:59.446 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-4 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. 2022-03-07 12:25:59.446 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-3 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. 2022-03-07 12:25:59.447 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-2 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. 2022-03-07 12:25:59.447 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. 2022-03-07 12:25:59.447 INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}. 2022-03-07 12:25:59.448 INFO 7292 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : io.axual.example.client.avro.consumer: partitions assigned: [avro-applicationlog-6, avro-applicationlog-5, avro-applicationlog-8, avro-applicationlog-7, avro-applicationlog-2, avro-applicationlog-1, avro-applicationlog-4, avro-applicationlog-3, avro-applicationlog-0, avro-applicationlog-9] 2022-03-07 12:26:17.276 INFO 7292 --- [ntainer#0-0-C-1] i.a.c.examples.AvroConsumerApplication : Received message on topic avro-applicationlog partition 6 offset 0 key '{"name": "axual speaks spring", "version": "1.0.0", "owner": "none"}' value '{"timestamp": 1646652376145, "source": {"name": "Axual ❤️ Spring", "version": "1.0.0", "owner": "none"}, "context": {"Some key": "Some Value"}, "level": "INFO", "message": "Sending a message!"}'. Time to consume 410 ms
This is all the coding required to start a consumer!
Wrapping up
You have concluded the getting started section by preparing your stream & applications, requesting access to the stream and actually producing and consuming some data. If you are going to deploy your application in another environment, it is advised to enable monitoring.
Proceed to Enabling Monitoring
You can also use the menu on the left to find information about other platform features, that might not have been touched in this Getting Started.