Consuming Using Java Spring Kafka

Creating A Consumer Application

Like producer application, consumer application uses Axual Platform combined with Spring Boot.

When you have completed this step, you will have set up a consumer application that is consuming data from the stream configured in Creating topics in Avro format. To get some data onto the stream, follow Create A Producer Application.

Create Spring Boot project using your IDE or Spring Initializr.

Spring doesn’t support SASL authentication. If you want to use SASL, please refer Kafka client consumer using SASL

Building The Application

The start is similar to creating a producer application. You start by including the maven dependency on the Axual Client and Spring framework.

For full executable Spring kafka Avro consumer example, please refer examples repository.
<!-- Axual client proxy-->
<dependency>
    <groupId>io.axual.client</groupId>
    <artifactId>axual-client-proxy</artifactId>
    <version>5.8.1</version>
</dependency>
<!-- Enables SpringBoot Kafka functionality -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<!-- SpringBoot starter -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Optionally when using Avro -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.9.1</version>
</dependency>
xml

In order to create a consumer application, you need to provide the configuration for the consumer in resources/application.yaml file.

axual:
  endpoint: "http://127.0.0.1:8081"
  tenant: "axual"
  environment: "local"
  stream: "avro-applicationlog"

spring:
  kafka:
    consumer:
      client-id: io.axual.example.client.avro.consumer
      group-id: io.axual.example.client.avro.consumer

      max-poll-records: 100
      auto-offset-reset: earliest
      enable-auto-commit: false
      properties:
        security.protocol: SSL
      ssl:
        # Using JKS
        key-store-location: client-certs/axual.client.keystore.jks
        key-store-password: notsecret
        key-password: notsecret
        trust-store-location: client-certs/axual.client.truststore.jks
        trust-store-password: notsecret
yaml
Check your essentials-package for the truststore and keystore files, see also Security

Create AxualKafkaConsumerFactory<K, V> bean class by extending DefaultKafkaConsumerFactory<K, V>.

public class AxualKafkaConsumerFactory<K, V> extends DefaultKafkaConsumerFactory<K, V> {

    public AxualKafkaConsumerFactory(Map<String, Object> configs) {
        super(configs);
    }

    @Override
    public Consumer<K, V> createConsumer(String groupId,
                                         String clientIdPrefix,
                                         String clientIdSuffixArg,
                                         Properties properties) {
        return new AxualConsumer<>(getConfigurationProperties());
    }
}
java

Define consumer beans configuration.

@EnableKafka
@Configuration
public class SpringAvroConsumer {

    @Value("${axual.endpoint}")
    private String endpoint;

    @Value("${axual.tenant}")
    private String tenant;

    @Value("${axual.environment}")
    private String environment;

    private final KafkaProperties kafkaProperties;

    @Autowired
    public SpringAvroConsumer(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>(kafkaProperties.buildConsumerProperties());

        /*
         * The Proxy chain config determines what added features you want when connecting to Axual.
         *
         * SWITCHING PROXY: Enables automatic failover to a different Kafka cluster. Team Speed can
         * trigger a switch when performing Kafka cluster maintenance. By passing the CommonConfig.ENDPOINT
         * config, the consumer discovers the Kafka broker coordinates automatically.
         *
         * RESOLVING PROXY: Enables resolution of topic and group id names to actual names in Kafka
         * cluster. This allows multi-tenancy and multi-environment support on a single Kafka cluster.
         *
         * LINEAGE PROXY: Adds useful metadata in message headers to allow tracing of messages through Axual.
         *
         * LOGGING PROXY: Can be activated anywhere in the chain to log the state.
         */
        props.put(AxualConsumerConfig.CHAIN_CONFIG, ProxyChain.newBuilder()
                .append(ProxyTypeRegistry.SWITCHING_PROXY_ID)
                .append(ProxyTypeRegistry.RESOLVING_PROXY_ID)
                .append(ProxyTypeRegistry.HEADER_PROXY_ID)
                .append(ProxyTypeRegistry.LINEAGE_PROXY_ID)
                .build());

        props.put(CommonConfig.APPLICATION_ID, kafkaProperties.getConsumer().getClientId());
        props.put(CommonConfig.APPLICATION_VERSION, "1.0.0");
        props.put(CommonConfig.TENANT, tenant);
        props.put(CommonConfig.ENVIRONMENT, environment);
        props.put(CommonConfig.ENDPOINT, endpoint);

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class.getName());

        return props;
    }

    @Bean
    public ConsumerFactory<Application, ApplicationLogEvent> consumerFactory() {
        return new AxualKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<Application, ApplicationLogEvent> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Application, ApplicationLogEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // Since Spring Kafka 2.3, container fails to start if topic is missing. This check is done by
        // initializing an Admin Client. We disable this to skip checking topic existence.
        factory.setMissingTopicsFatal(false);
        return factory;
    }


}
java

Update your SpringBootApplication class to start consuming records using spring kafka listener annotation @KafkaListener

@SpringBootApplication
public class AvroConsumerApplication {
    private static final Logger LOG = LoggerFactory.getLogger(AvroConsumerApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(AvroConsumerApplication.class, args);
    }

    @KafkaListener(topics = "#{'${axual.stream}'}")
    public void listen(ConsumerRecord<Application, ApplicationLogEvent> event) {
        long now = System.currentTimeMillis();
        LOG.info("Received message on topic {} partition {} offset {} key '{}' value '{}'. Time to consume {} ms",
                event.topic(), event.partition(), event.offset(), event.key(), event.value(), now - event.timestamp());
    }
}
java
When all of the above steps have been done correctly, start your consumer app. With the logging level INFO, Your app should log
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.5.4)

2022-03-07 12:25:52.032  INFO 7292 --- [           main] i.a.c.examples.AvroConsumerApplication   : Starting AvroConsumerApplication using Java 11.0.2 on MacBook-Pro.local with PID 7292 (/Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples/spring-examples/axual-client-spring-examples/axual-spring-kafka-avro-consumer/target/classes started by shalabh in /Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples)
2022-03-07 12:25:52.040  INFO 7292 --- [           main] i.a.c.examples.AvroConsumerApplication   : No active profile set, falling back to default profiles: default
2022-03-07 12:25:53.981  INFO 7292 --- [           main] i.a.d.client.fetcher.DiscoveryLoader     : TTL updated to: 5000 (was: 0)
2022-03-07 12:25:53.981  INFO 7292 --- [           main] i.a.d.client.fetcher.DiscoveryLoader     : Fetched discovery properties: DiscoveryResult: {group.id.pattern={tenant}-{instance}-{environment}-{group}, acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, topic.pattern={tenant}-{instance}-{environment}-{topic}, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ttl=5000, distributor.distance=2, enable.value.headers=false, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, schema.registry.url=http://localhost:8082, environment=local, system=platform-test-standalone, distributor.timeout=20000, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, tenant=axual}
2022-03-07 12:25:53.982  INFO 7292 --- [           main] i.a.c.p.s.discovery.DiscoverySubscriber  : Received new DiscoveryResult for SwitchingConsumer: DiscoveryResult: {group.id.pattern={tenant}-{instance}-{environment}-{group}, acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, topic.pattern={tenant}-{instance}-{environment}-{topic}, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ttl=5000, distributor.distance=2, enable.value.headers=false, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, schema.registry.url=http://localhost:8082, environment=local, system=platform-test-standalone, distributor.timeout=20000, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, tenant=axual}
2022-03-07 12:25:53.982  INFO 7292 --- [           main] i.a.c.p.s.discovery.DiscoverySubscriber  : Switching SwitchingConsumer from null to clusterA
2022-03-07 12:25:53.982  INFO 7292 --- [           main] i.a.c.p.s.g.BaseClientProxySwitcher      : Creating new backing consumer with Discovery API result: DiscoveryResult: {group.id.pattern={tenant}-{instance}-{environment}-{group}, acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, topic.pattern={tenant}-{instance}-{environment}-{topic}, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ttl=5000, distributor.distance=2, enable.value.headers=false, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, schema.registry.url=http://localhost:8082, environment=local, system=platform-test-standalone, distributor.timeout=20000, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, tenant=axual}
2022-03-07 12:25:53.982  INFO 7292 --- [           main] i.a.c.p.s.consumer.ConsumerSwitcher      : Creating a new consumer with properties: {acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, value.deserializer=io.axual.serde.avro.SpecificAvroDeserializer, instance=example, group.id=io.axual.example.client.avro.consumer, partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor, topic.pattern={tenant}-{instance}-{environment}-{topic}, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ssl.keystore.type=JKS, schema.registry.url=http://localhost:8082, endpoint=http://127.0.0.1:8081, enable.auto.commit=false, ssl.key.password=[hidden], ssl.truststore.password=[hidden], tenant=axual, client.id=consumer-io.axual.example.client.avro.consumer-1, ssl.endpoint.identification.algorithm=, key.deserializer=io.axual.serde.avro.SpecificAvroDeserializer, group.id.pattern={tenant}-{instance}-{environment}-{group}, ssl.protocol=TLSv1.3, max.poll.records=100, ssl.enabled.protocols=[TLSv1.2, TLSv1.3], ssl.keystore.location=/Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples/spring-examples/axual-client-spring-examples/axual-spring-kafka-avro-consumer/target/classes/client-certs/axual.client.keystore.jks, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, app.id=io.axual.example.client.avro.consumer, ttl=5000, distributor.distance=2, ssl.truststore.type=JKS, enable.value.headers=false, app.version=1.0.0, security.protocol=SSL, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, environment=local, ssl.truststore.location=/Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples/spring-examples/axual-client-spring-examples/axual-spring-kafka-avro-consumer/target/classes/client-certs/axual.client.truststore.jks, system=platform-test-standalone, distributor.timeout=20000, ssl.keystore.password=[hidden], isolation.level=read_uncommitted, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, auto.offset.reset=earliest}
2022-03-07 12:25:54.011  INFO 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: ...
2022-03-07 12:25:54.027  INFO 7292 --- [           main] i.c.k.s.KafkaAvroDeserializerConfig      : KafkaAvroDeserializerConfig values: ...
2022-03-07 12:25:54.034  INFO 7292 --- [           main] io.axual.common.config.SslEngineConfig   : SslEngineConfig values: ...
	2022-03-07 12:25:54.093  INFO 7292 --- [           main] i.c.k.s.KafkaAvroDeserializerConfig      : KafkaAvroDeserializerConfig values: ...
2022-03-07 12:25:54.094  INFO 7292 --- [           main] i.c.k.s.KafkaAvroDeserializerConfig      : KafkaAvroDeserializerConfig values: ...
2022-03-07 12:25:54.150  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'acl.principal.builder' was supplied but isn't a known config.
2022-03-07 12:25:54.150  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'cluster' was supplied but isn't a known config.
2022-03-07 12:25:54.150  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'instance' was supplied but isn't a known config.
2022-03-07 12:25:54.150  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'resolvingconsumerpartitionassignor.topic.resolver' was supplied but isn't a known config.
2022-03-07 12:25:54.150  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'topic.pattern' was supplied but isn't a known config.
2022-03-07 12:25:54.150  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'schema.registry.url' was supplied but isn't a known config.
2022-03-07 12:25:54.150  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'endpoint' was supplied but isn't a known config.
2022-03-07 12:25:54.150  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'resolvingconsumerpartitionassignor.backing.assignor' was supplied but isn't a known config.
2022-03-07 12:25:54.151  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'tenant' was supplied but isn't a known config.
2022-03-07 12:25:54.151  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'group.id.pattern' was supplied but isn't a known config.
2022-03-07 12:25:54.151  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'transactional.id.resolver' was supplied but isn't a known config.
2022-03-07 12:25:54.151  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'app.id' was supplied but isn't a known config.
2022-03-07 12:25:54.151  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'ttl' was supplied but isn't a known config.
2022-03-07 12:25:54.151  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'distributor.distance' was supplied but isn't a known config.
2022-03-07 12:25:54.151  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'enable.value.headers' was supplied but isn't a known config.
2022-03-07 12:25:54.151  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'app.version' was supplied but isn't a known config.
2022-03-07 12:25:54.151  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'transactional.id.pattern' was supplied but isn't a known config.
2022-03-07 12:25:54.151  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'environment' was supplied but isn't a known config.
2022-03-07 12:25:54.151  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'system' was supplied but isn't a known config.
2022-03-07 12:25:54.151  WARN 7292 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'distributor.timeout' was supplied but isn't a known config.
2022-03-07 12:25:54.153  INFO 7292 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.7.1
2022-03-07 12:25:54.153  INFO 7292 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 61dbce85d0d41457
2022-03-07 12:25:54.153  INFO 7292 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1646652354152
2022-03-07 12:25:54.155  INFO 7292 --- [           main] i.a.c.p.s.g.BaseClientProxySwitcher      : Created new backing consumer
2022-03-07 12:25:54.156  INFO 7292 --- [           main] i.a.c.p.s.consumer.ConsumerSwitcher      : Consumer switched, applying assignments and subscriptions
2022-03-07 12:25:54.156  INFO 7292 --- [           main] i.a.c.p.s.consumer.ConsumerSwitcher      : Consumer switch finished
2022-03-07 12:25:54.157  INFO 7292 --- [           main] i.a.c.p.s.consumer.SwitchingConsumer     : Subscribing to topics: [avro-applicationlog]
2022-03-07 12:25:54.158  INFO 7292 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Subscribed to topic(s): axual-example-local-avro-applicationlog
2022-03-07 12:25:54.224  INFO 7292 --- [           main] i.a.c.examples.AvroConsumerApplication   : Started AvroConsumerApplication in 2.797 seconds (JVM running for 3.8)
2022-03-07 12:25:54.785  INFO 7292 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Cluster ID: 8rd8-LboS2WQEqxL5dBpAQ
2022-03-07 12:25:56.082  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Discovered group coordinator localhost:8084 (id: 2147483646 rack: null)
2022-03-07 12:25:56.086  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] (Re-)joining group
2022-03-07 12:25:56.185  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] (Re-)joining group
2022-03-07 12:25:59.235  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Successfully joined group with generation Generation{generationId=1, memberId='consumer-io.axual.example.client.avro.consumer-1-d06edc45-6a13-4458-b43a-234c4e893787', protocol='range'}
2022-03-07 12:25:59.241  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Finished assignment for group at generation 1: {consumer-io.axual.example.client.avro.consumer-1-d06edc45-6a13-4458-b43a-234c4e893787=Assignment(partitions=[axual-example-local-avro-applicationlog-9, axual-example-local-avro-applicationlog-8, axual-example-local-avro-applicationlog-7, axual-example-local-avro-applicationlog-6, axual-example-local-avro-applicationlog-5, axual-example-local-avro-applicationlog-4, axual-example-local-avro-applicationlog-3, axual-example-local-avro-applicationlog-2, axual-example-local-avro-applicationlog-1, axual-example-local-avro-applicationlog-0])}
2022-03-07 12:25:59.390  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Successfully synced group in generation Generation{generationId=1, memberId='consumer-io.axual.example.client.avro.consumer-1-d06edc45-6a13-4458-b43a-234c4e893787', protocol='range'}
2022-03-07 12:25:59.391  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Notifying assignor about the new Assignment(partitions=[axual-example-local-avro-applicationlog-9, axual-example-local-avro-applicationlog-8, axual-example-local-avro-applicationlog-7, axual-example-local-avro-applicationlog-6, axual-example-local-avro-applicationlog-5, axual-example-local-avro-applicationlog-4, axual-example-local-avro-applicationlog-3, axual-example-local-avro-applicationlog-2, axual-example-local-avro-applicationlog-1, axual-example-local-avro-applicationlog-0])
2022-03-07 12:25:59.394  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Adding newly assigned partitions: axual-example-local-avro-applicationlog-9, axual-example-local-avro-applicationlog-8, axual-example-local-avro-applicationlog-7, axual-example-local-avro-applicationlog-6, axual-example-local-avro-applicationlog-5, axual-example-local-avro-applicationlog-4, axual-example-local-avro-applicationlog-3, axual-example-local-avro-applicationlog-2, axual-example-local-avro-applicationlog-1, axual-example-local-avro-applicationlog-0
2022-03-07 12:25:59.419  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-9
2022-03-07 12:25:59.419  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-8
2022-03-07 12:25:59.419  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-7
2022-03-07 12:25:59.419  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-6
2022-03-07 12:25:59.419  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-5
2022-03-07 12:25:59.419  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-4
2022-03-07 12:25:59.420  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-3
2022-03-07 12:25:59.420  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-2
2022-03-07 12:25:59.420  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-1
2022-03-07 12:25:59.420  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Found no committed offset for partition axual-example-local-avro-applicationlog-0
2022-03-07 12:25:59.445  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-9 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
2022-03-07 12:25:59.446  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-8 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
2022-03-07 12:25:59.446  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-7 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
2022-03-07 12:25:59.446  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-6 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
2022-03-07 12:25:59.446  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-5 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
2022-03-07 12:25:59.446  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-4 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
2022-03-07 12:25:59.446  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-3 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
2022-03-07 12:25:59.447  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-2 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
2022-03-07 12:25:59.447  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
2022-03-07 12:25:59.447  INFO 7292 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=axual-example-local-io.axual.example.client.avro.consumer] Resetting offset for partition axual-example-local-avro-applicationlog-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:8084 (id: 1 rack: null)], epoch=0}}.
2022-03-07 12:25:59.448  INFO 7292 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : io.axual.example.client.avro.consumer: partitions assigned: [avro-applicationlog-6, avro-applicationlog-5, avro-applicationlog-8, avro-applicationlog-7, avro-applicationlog-2, avro-applicationlog-1, avro-applicationlog-4, avro-applicationlog-3, avro-applicationlog-0, avro-applicationlog-9]
2022-03-07 12:26:17.276  INFO 7292 --- [ntainer#0-0-C-1] i.a.c.examples.AvroConsumerApplication   : Received message on topic avro-applicationlog partition 6 offset 0 key '{"name": "axual speaks spring", "version": "1.0.0", "owner": "none"}' value '{"timestamp": 1646652376145, "source": {"name": "Axual ❤️ Spring", "version": "1.0.0", "owner": "none"}, "context": {"Some key": "Some Value"}, "level": "INFO", "message": "Sending a message!"}'. Time to consume 410 ms

This is all the coding required to start a consumer!

Wrapping up

You have concluded the getting started section by preparing your stream & applications, requesting access to the stream and actually producing and consuming some data. If you are going to deploy your application in another environment, it is advised to enable monitoring.

Proceed to Enabling Monitoring

You can also use the menu on the left to find information about other platform features, that might not have been touched in this Getting Started.