Consuming

Introduction

Consuming messages from the Axual Platform consists of two steps which are discussed in separate segments of this document.

The sequence of steps that need be taken to consume messages are:

Instantiating an AxualConsumer

Namespace: Axual.Kafka.Proxy.Proxies.Axual

The Consumer is the object used for retrieving messages.

To create such an object we make use of the Builder pattern from our code base like so:

using (var consumer = new AxualConsumerBuilder<Application, ApplicationLogEvent>(config) (1)
        .SetKeyDeserializer(new SpecificAvroDeserializer<Application>()) (2)
        .SetValueDeserializer(new SpecificAvroDeserializer<ApplicationLogEvent>()) (3)
        .SetLogHandler((_, l) => Console.WriteLine($"> [{l.Level}]: {l.Message}")) (4)
        .SetErrorHandler((_, e) => Console.WriteLine($"> [Error]: {e.Reason}")) (5)
        .Build()) (6)
1 Uses the AxualConsumerBuilder builder.Configuration as explained in Consumer Configuration
2 Indicates the deserializer for the key Objects
3 Indicates the deserializer for the value Objects
4 Callback for log handling, in this case we simply log everything
5 Callback for error handling, in this case we simply log the exceptions
6 Build the consumer object
It is recommended to use the above code within a using statement such that correct resource management using IDisposable is guaranteed.

Starting the AxualConsumer Loop

Now our Consumer object is readily available, we can proceed to using it in order to process messages from the desired stream.

First it is necessary to point the consumer to the streams that we expect to read from. Once this is done the Consumer object can retrieve records on demand by calling Consume. Custom logic for handling the message received follows.

By default, the .NET consumer will commit offsets automatically.This can be circumvented if necessary by calling Consumer.Commit(consumeResult).

An outline of the code needed to process messages is as follows:

using (var consumer = new AxualConsumerBuilder<Application, ApplicationLogEvent>(config)
    .SetKeyDeserializer(new SpecificAvroDeserializer<Application>())
    .SetValueDeserializer(new SpecificAvroDeserializer<ApplicationLogEvent>())
    .SetLogHandler((_, l) => Console.WriteLine($"> [{l.Level}]: {l.Message}"))
    .SetErrorHandler((_, e) => Console.WriteLine($"> [Error]: {e.Reason}"))
    .Build())
{
    consumer.Subscribe(streamName); (1)

    while (!cancelled) (2)
    {
        try
        {
            var consumeResult = consumer.Consume(cancellationToken); (3)

            if (consumeResult.IsPartitionEOF) (4)
            {
                Console.WriteLine(
                    $"Reached end of stream {consumeResult.Topic}, partition " +
                    $"{consumeResult.Partition}, offset {consumeResult.Offset}.");
                continue;
            }

            Console.WriteLine(
                $"Received message at {consumeResult.TopicPartitionOffset}: " +
                $"<{consumeResult.Message.Key},{consumeResult.Message.Value}>"); (5)

            try
            {
                consumer.Commit(consumeResult); (6)
            }
            catch (KafkaException e)
            {
                Console.WriteLine($"Commit error: {e.Error.Reason}");
            }
        }
        catch (ConsumeException e)
        {
            Console.WriteLine($"Consume error: {e.Error.Reason}");
        }
    }
    consumer.Close(); (7)
}
1 Subscribe to a stream of interest.Messages on this stream will be handled by our consumer
2 Since the data streams have no bounds it is common for consumer applications to run in such infinite loops
3 Hit Kafka brokers requesting for new messages and store the result
4 (Optional) Check if there were any new messages since last Consume call
5 Custom consumer logic.In our case we log part of the metadata from the result of the Consume call.
6 Update the consumerGroup offset on the progress of the Consumer
7 In order to leave the group cleanly - i.e. commit final offsets and trigger a group rebalance which ensures that any partitions owned by the consumer are re-assigned to other members in the group in a timely fashion - you additionally need to call the Close method prior to disposing