Step 4: Consuming Data

Create A Consumer Application

When you have completed this step, you will have set up a consumer application that is consuming data from the stream configured in step 1 in String format. To get some data onto the stream, follow step 3: Create A Producer Application.

To create a consuming application, the start is similar to creating a producer application in step 2. You start by including the maven dependency on the Axual Client and create the configuration for it:

<dependency>
    <groupId>io.axual.client</groupId>
    <artifactId>axual-client</artifactId>
    <version>5.2.0</version>
</dependency>

In order to create a very basic producer application, this is all you need. Next step is to create the configuration for the AxualClient and also for the Producer that will be used.

private static ClientConfig getClientConfig() {
    return ClientConfig.newBuilder()
       	    // This is the app ID as it is configured in the self service
            .setApplicationId(APP_ID)
            .setApplicationVersion(APPLICATION_VERSION)
            // The endoint of the Discovery API (used to retrieve information about bootstrap servers, schema registry, TTL etc...)
            .setEndpoint(ENDPOINT)
            // The environment corresponding to the shortname of the env in the self service
            .setEnvironment(ENV)
            // The tenant you are part of
            .setTenant(TENANT)
            // The ssl configuration for your application. The certificate used should have a DN
            // matching the DN that was configured in self service
            .setSslConfig(SslConfig.newBuilder()
                    // Disable CN and subject-alternative-name verification for the broker.
                    // Generally not necessary in production, just in test and local deployments.
                    .setEnableHostnameVerification(false)
                    .setKeyPassword(new PasswordConfig(KEY_PASSWORD))
                    .setKeystorePassword(new PasswordConfig(KEYSTORE_PASSWORD))
                    .setTruststorePassword(new PasswordConfig(TRUSTSTORE_PASSWORD))
                    .setKeystoreLocation(KEYSTORE_LOCATION)
                    .setTruststoreLocation(TRUSTSTORE_LOCATION)
                    .build())
            .build();
}

The next step is creating a ConsumerConfig, similar to creating the ProducerConfig in step 2.

private static ConsumerConfig<String, String> getBaseConsumerConfig() {
    return ConsumerConfig.<String, String>builder()
            // Since the types are String, we will use a StringDeserializer for the key and value
            .setKeyDeserializerClassName(StringDeserializer.class.getCanonicalName())
            .setValueDeserializerClassName(StringDeserializer.class.getCanonicalName())
            // We want to make sure we get all the messages at least once. On a Kafka level, this means
            // that the offsets are committed once the message have been processed bey the application.
            .setConsumerStrategy(ConsumerStrategy.AT_LEAST_ONCE)
            .setStream(STREAM)
            .build();
}

A consumer will also need a Processor. This is where all the business logic is defined on how to handle the consumed messages. In this example, we will use a very simple processor that only logs the key and value for each consumed message:

private static final Processor<String, String> processor = consumerMessage ->
        log.info("Consumed message with key: {} and value: {}", consumerMessage.getKey(), consumerMessage.getValue());

The consumer is then started like this:

public static void main(String[] args) throws Exception {
    // Both the AxualClient and Consumer are AutoClosable and can be used in try-with-resources.
    try (AxualClient axualClient = new AxualClient(getClientConfig());
         Consumer consumer = axualClient.buildConsumer(getBaseConsumerConfig(), processor)) {
        // This will start a user thread that does the actual consumption and processing of messages
        consumer.startConsuming();
        // We want to prevent that the main thread exits the try-with-resources block, since that
        // will cause the consumer and axualClient to close, ending the user thread that started
        // the consumption. So in this example we make the main thread sleep for a second as long
        // as the consumer is still consuming.
        while (consumer.isConsuming()) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

When all of the above steps have been done correctly, start your consumer app.
With the logging level for the io.axual package to INFO, your app will produce logging that will look like this:

INFO  BaseConsumer:37 - Created consumer with source of class: io.axual.client.consumer.generic.GenericMessageSource.
INFO  DiscoveryLoader:77 - TTL updated to: 600000 (was: 0)
INFO  DiscoveryLoader:44 - Fetched discovery properties: DiscoveryResult: {...}
INFO  DiscoverySubscriber:64 - Received new DiscoveryResult: DiscoveryResult: {...}
INFO  BaseProxySwitcher:66 - Creating new backing consumer with Discovery API result: DiscoveryResult: {...}
INFO  ConsumerSwitcher:60 - Creating a new consumer with properties: {...}
INFO  BaseProxySwitcher:68 - Created new backing consumer
INFO  ConsumerSwitcher:45 - Consumer switched, applying assignments and subscriptions
INFO  ConsumerSwitcher:48 - Consumer switch finished
INFO  AxualTestConsumer:20 - Consumed message with key: this is a key of my message and value: this is the value value of my message

The logging generated by the AxualTestConsumer class comes from the newly created class using the client library. The other logging is all generated by the Axual Client Library itself.

This is all the coding required to start a basic consumer!

Wrapping up

You have concluded the getting started section by preparing your stream & applications, requesting access to the stream and actually producing and consuming some data. You can now use other documentation sections such as the Self-Service user guide or Axual Client documentation.