Consuming Using Java Axual Client

Creating A Consumer Application

When you have completed this step, you will have set up a consumer application that is consuming data from the stream configured in Creating streams in Avro format. To get some data onto the stream, follow Create A Producer Application.

Axual client doesn’t support SASL authentication. If you want to use SASL, please refer Kafka client consumer using SASL

Before continuing, please check you fulfill all the prerequisites .

Building The Application

To create a consumer application, the start is similar to creating a producer application. You start by including the maven dependency on the Axual Client and create the configuration for it.

For full executable Axual client Avro consumer example, please refer examples repository.
<dependency>
    <groupId>io.axual.client</groupId>
    <artifactId>axual-client</artifactId>
    <version>5.8.1</version>
</dependency>

In order to create a very basic consumer application, this is all you need. Next step is to create the configuration for the AxualClient and also for the Consumer that will be used.

SslConfig sslConfig = SslConfig.newBuilder()
	.setEnableHostnameVerification(false)
	.setKeystoreLocation("/path/to/example-consumer.client.jks") // Absolute path to the application keystore
	.setKeystorePassword(new PasswordConfig("notsecret"))
	.setKeyPassword(new PasswordConfig("notsecret"))
	.setTruststoreLocation("/path/to/common-truststore.jks") // Absolute path to the application truststore
	.setTruststorePassword(new PasswordConfig("notsecret"))
	.build();

ClientConfig config = ClientConfig.newBuilder()
        // This is the app ID as it is configured in the self service
        .setApplicationId("io.axual.example.client.avro.consumer")
        .setApplicationVersion("0.0.1")
        // The endoint of the Discovery API (used to retrieve information about bootstrap servers, schema registry, TTL etc...)
        .setEndpoint("https://192.168.99.100:443")
        // The tenant you are part of
        .setTenant("demo")
        // The environment corresponding to the shortname of the env in the self service
        .setEnvironment("example")
        // The ssl configuration for your application. The certificate used should have a DN
        // matching the DN that was configured in self service
        .setSslConfig(sslConfig)
        .build();
Check your care package for the truststore and keystore files, see also Security

The next step is creating a ConsumerConfig, similar to creating the ProducerConfig in step 5.

SpecificAvroConsumerConfig<Application, ApplicationLogEvent> specificAvroConsumerConfig =
        SpecificAvroConsumerConfig.<Application, ApplicationLogEvent>builder()
            // We want to make sure we get all the messages at least once. On a Kafka level, this means
            // that the offsets are committed once the message have been processed bey the application.
            .setDeliveryStrategy(DeliveryStrategy.AT_LEAST_ONCE)
            .setStream("applicationlogevents")
            .setProxyChain(ProxyChain.newBuilder()
                    .append(SWITCHING_PROXY_ID)
                    .append(RESOLVING_PROXY_ID)
                    .append(LINEAGE_PROXY_ID)
                    .append(HEADER_PROXY_ID)
                    .build())
            .build();

A consumer will also need a Processor. This is where all the business logic is defined on how to handle the consumed messages. In this example, we will use a simple LogEventSpecificConsumer that implements the Processor that logs the key and value for each consumed message.

public class LogEventSpecificConsumer implements Processor<Application, ApplicationLogEvent>, AutoCloseable {
    public static final Logger LOG = LoggerFactory.getLogger(LogEventSpecificConsumer.class);
    private final Consumer<Application, ApplicationLogEvent> consumer;
    private final LinkedList<ConsumerMessage<Application, ApplicationLogEvent>> received = new LinkedList<>();

    public LogEventSpecificConsumer(
            final AxualClient axualClient
            , final SpecificAvroConsumerConfig<Application
            , ApplicationLogEvent> consumerConfig) {
        this.consumer = axualClient.buildConsumer(consumerConfig, this);
        // This will start a user thread that does the actual consumption and processing of messages
        this.consumer.startConsuming();
    }

    // This method is used to process the message.
    @Override
    public void processMessage(ConsumerMessage<Application, ApplicationLogEvent> msg) {

        LOG.info("Received message on topic {} partition {} offset {} key {} value {}", msg.getSystem(), msg.getPartition(), msg.getOffset(), msg.getKey(), msg.getValue());
        received.add(msg);
    }

    public LinkedList<ConsumerMessage<Application, ApplicationLogEvent>> getReceived() {
        return received;
    }

    @Override
    public void close() {
        this.consumer.stopConsuming();
    }

    public boolean isConsuming() {
        return this.consumer.isConsuming();
    }
}

The consumer is then started like this:

/* Both the AxualClient and LogEventSpecificConsumer are AutoClosable and can be used in try-with-resources.*/
try (final AxualClient axualClient = new AxualClient(config);
     final LogEventSpecificConsumer consumer = new LogEventSpecificConsumer(axualClient, specificAvroConsumerConfig)) {
    // We want to prevent that the main thread exits the try-with-resources block, since that
    // will cause the consumer and axualClient to close, ending the user thread that started
    // the consumption. So in this example we make the main thread sleep for a second as long
    // as the consumer is still consuming.
    while (consumer.isConsuming()) {
        sleep(1000);
    }
}

When all of the above steps have been done correctly, start your consumer app.
With the logging level for the io.axual package to INFO, Your app should log::

[main] INFO io.axual.client.consumer.base.BaseConsumer - Created consumer with source of class: io.axual.client.consumer.avro.AvroMessageSource.
[pool-1-thread-1] INFO io.axual.discovery.client.fetcher.DiscoveryLoader - TTL updated to: 600000 (was: 0)
[pool-1-thread-1] INFO io.axual.discovery.client.fetcher.DiscoveryLoader - Fetched discovery properties: DiscoveryResult: {...}
[pool-1-thread-1] INFO io.axual.client.proxy.switching.discovery.DiscoverySubscriber - Received new DiscoveryResult for SwitchingConsumer: DiscoveryResult: {...}
[pool-1-thread-1] INFO io.axual.client.proxy.switching.discovery.DiscoverySubscriber - Switching SwitchingConsumer from null to local
[pool-1-thread-1] INFO io.axual.client.proxy.switching.generic.BaseClientProxySwitcher - Creating new backing consumer with Discovery API result: DiscoveryResult: {...}
[pool-1-thread-1] INFO io.axual.client.proxy.switching.consumer.ConsumerSwitcher - Creating a new consumer with properties: {...}
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: {...}

[pool-1-thread-1] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: {...}

[pool-1-thread-1] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: {...}

[pool-1-thread-1] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: {...}

[pool-1-thread-1] INFO io.confluent.kafka.serializers.KafkaAvroDeserializerConfig - KafkaAvroDeserializerConfig values: {...}

[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'acl.principal.builder' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'cluster' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'instance' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'resolvingconsumerpartitionassignor.topic.resolver' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'topic.pattern' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'schema.registry.url' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'endpoint' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'resolvingconsumerpartitionassignor.backing.assignor' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'tenant' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'group.id.pattern' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'app.id' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'distributor.distance' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'ttl' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'enable.value.headers' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'app.version' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'rest.proxy.url' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'environment' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'system' was supplied but isn't a known config.
[pool-1-thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'distributor.timeout' was supplied but isn't a known config.
[pool-1-thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.3.0
[pool-1-thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: fc1aaa116b661c8a
[pool-1-thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1585853689619
[pool-1-thread-1] INFO io.axual.client.proxy.switching.generic.BaseClientProxySwitcher - Created new backing consumer
[pool-1-thread-1] INFO io.axual.client.proxy.switching.consumer.ConsumerSwitcher - Consumer switched, applying assignments and subscriptions
[pool-1-thread-1] INFO io.axual.client.proxy.switching.consumer.ConsumerSwitcher - Consumer switch finished
[pool-1-thread-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Cluster ID: oA8Db59vRi21SnnMO2-G3g
[pool-1-thread-1] INFO io.axual.client.proxy.switching.consumer.SwitchingConsumer - Subscribing to topics: [applicationlogevents]
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Subscribed to topic(s): demo-Local-example-applicationlogevents
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Discovered group coordinator 192.168.99.100:9096 (id: 2147483646 rack: null)
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Revoking previously assigned partitions []
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] (Re-)joining group
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] (Re-)joining group
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Successfully joined group with generation 5
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Setting newly assigned partitions: demo-Local-example-applicationlogevents-0
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-io.axual.example.client.avro.consumer-1, groupId=demo-Local-example-io.axual.example.client.avro.consumer] Setting offset for partition demo-Local-example-applicationlogevents-0 to the committed offset FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.99.100:9096 (id: 1 rack: rack-1), epoch=0}}
[pool-1-thread-1] INFO io.axual.client.example.axualclient.avro.LogEventSpecificConsumer - Received message on topic demo-system partition 0 offset 10 key {"name": "app_0", "version": "1.9.9", "owner": "none"} value {"timestamp": 1000, "source": {"name": "logeventproducer", "version": "0.0.1", "owner": "Team Log"}, "context": {"Some key": "Some Value"}, "level": "INFO", "message": "Message 0"}

This is all the coding required to start a basic consumer!

Wrapping up

You have concluded the getting started section by preparing your stream & applications, requesting access to the stream and actually producing and consuming some data. If you are going to deploy your application in another environment, it is advised to enable monitoring.

Proceed to Enabling Monitoring

You can also use the menu on the left to find information about other platform features, that might not have been touched in this Getting Started.