Consuming Using .Net Axual Client

Creating A Consumer Application

When you have completed this step, you will have set up a consumer application that is consuming data from the stream configured in Creating topics in Avro format. To get some data onto the stream, follow Create A Producer Application.

Adding Dependencies

Start by including the dependency on the Axual .NET client library as you would do with any dependency or
by adding PackageReference tag in your .csproj project.

For full executable Axual client Avro consumer example, please refer examples repository.
<ItemGroup>
      <PackageReference Include="Axual.Kafka.Proxy" Version="1.4.0" />
      <PackageReference Include="Axual.SchemaRegistry.Serdes.Avro" Version="1.4.0" />
</ItemGroup>

Building The Application

In order to create a very basic consumer application, this is all you need.
Next step is to create the configuration for the AxualConsumerBuilder.

Consumer Configuration

var config = new AxualConsumerConfig
{
    // This is the app ID as it is configured in the self service
    ApplicationId = "loganalyzer",
    // The endoint of the Discovery API (used to retrieve information about bootstrap servers, schema registry, TTL etc...)
    EndPoint = new UriBuilder("https", "192.168.99.100", 443).Uri,
    // The tenant you are part of
    Tenant = "demo",
    // The environment corresponding to the shortname of the env in the self service
    Environment = "example",

    SecurityProtocol = SecurityProtocol.Ssl,
    SslKeystorePassword = "notsecret",
    SslKeystoreLocation = SSL_KEYSTORE_LOCATION, (1)
    EnableSslCertificateVerification = true,
    // Client verifies the identity of the broker
    SslCaLocation = SSL_CA_PATH, (2)
    SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None,
};
1 For SSL_KEYSTORE_LOCATION use the absolute path to the keystore file, see also Data security (application certificate)
2 For SSL_CA_PATH use the absolute path to the CA file, see also Data security (setting up trust)

Creating The Consumer

With the above configurations, we can instantiate the AxualConsumerBuilder to build an AxualConsumer instance.

var consumer = new AxualConsumerBuilder<Application, ApplicationLogEvent>(config)
    .SetKeyDeserializer(new SpecificAvroDeserializer<Application>())
    .SetValueDeserializer(new SpecificAvroDeserializer<ApplicationLogEvent>())
    .SetLogHandler((_, l) => Console.WriteLine($"> [{l.Level}]: {l.Message}"))
    .SetErrorHandler((_, e) => Console.WriteLine($"> [Error]: {e.Reason}"))
    .Build()

Consuming Messages

Now, we are ready to start receiving records using the AxualConsumer.

consumer.Subscribe("applicationlogevents");

try
{
    while (true)
    {
        try
        {
            var consumeResult = consumer.Consume(cancellationToken);

            if (consumeResult.IsPartitionEOF)
            {
                Console.WriteLine(
                    $"> Reached end of stream {consumeResult.Topic}, partition " +
                    $"{consumeResult.Partition}, offset {consumeResult.Offset}.");

                continue;
            }

            Console.WriteLine(
                $"> Received message at {consumeResult.TopicPartitionOffset}: " + Environment.NewLine +
                $"Key: {consumeResult.Message.Key}" + Environment.NewLine +
                $"Value: {consumeResult.Message.Value}");

            try
            {
                consumer.Commit(consumeResult);
            }
            catch (KafkaException e)
            {
                Console.WriteLine($"> Commit error: {e.Error.Reason}");
            }
        }
        catch (ConsumeException e)
        {
            Console.WriteLine($"> Consume error: {e.Error.Reason}");
        }
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine("> Closing consumer.");
    consumer.Close();
}

When all of the above steps have been done correctly, start your consumer app, your app should log:

----------------------------------------------------------------------------------------
'axual_client_proxy_specific_avro_consumer' consuming from stream 'applicationlogevents'
----------------------------------------------------------------------------------------
Started consumer, Ctrl-C to stop consuming
> Received message at applicationlogevents [[1]] @41677:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1000, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 0 = some value 0}, log level = INFO, message = Message 0
> Received message at applicationlogevents [[1]] @41678:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1001, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 1 = some value 1}, log level = INFO, message = Message 1
> Received message at applicationlogevents [[1]] @41679:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1002, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 2 = some value 2}, log level = INFO, message = Message 2
> Received message at applicationlogevents [[1]] @41680:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1003, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 3 = some value 3}, log level = INFO, message = Message 3
> Received message at applicationlogevents [[1]] @41681:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1004, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 4 = some value 4}, log level = INFO, message = Message 4
> Received message at applicationlogevents [[1]] @41682:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1005, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 5 = some value 5}, log level = INFO, message = Message 5
> Received message at applicationlogevents [[1]] @41683:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1006, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 6 = some value 6}, log level = INFO, message = Message 6
> Received message at applicationlogevents [[1]] @41684:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1007, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 7 = some value 7}, log level = INFO, message = Message 7
> Received message at applicationlogevents [[1]] @41685:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1008, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 8 = some value 8}, log level = INFO, message = Message 8
> Received message at applicationlogevents [[1]] @41686:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1009, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 9 = some value 9}, log level = INFO, message = Message 9

This is all the coding required to start a basic consumer!

Wrapping up

You have concluded the getting started section by preparing your stream and applications, requesting access to the stream and actually producing and consuming some data. Before deploying your application to production, it is advised to enable monitoring.

You can also use the menu on the left to find information about other platform features, that might not have been touched in this Getting Started.