Consuming Using .Net Kafka Client

Creating A Consumer Application

When you have completed this step, you will have set up a consumer application that is consuming data from the stream configured in Creating streams in Avro format. To get some data onto the stream, follow Create A Producer Application.

Before continuing, please check you fulfill all the prerequisites.

Building The Application

For full executable Kafka client SASL/SSL Avro consumer example, please refer examples repository.

Start by including the dependency on the Confluent kafka .Net client library. Add the following NuGet packages:

<ItemGroup>
      <PackageReference Include="Confluent.Kafka" Version="1.8.2" />
      <PackageReference Include="Confluent.SchemaRegistry" Version="1.8.2" />
      <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.8.2" />
</ItemGroup>

Consuming using SASL

In order to create a very basic consumer application that uses SASL, you need to create the configuration for the Kafka Consumer.

// The stream name defined in Axual Self Service where we want to produce
const string streamName = "applicationlogevents";
const string applicationId = "io.axual.example.client.avro.consumer";

// Axual uses namespacing to enable tenants, instance and environments to share the same cluster
// Determine the axual specific settings, to be translated to Kafka resource names
const string tenant = "demo";
const string instance = "local";
const string environment = "example";

// SASL username which have been generated in Self-Service
const string username = "usernameFromSelfService";
const string password = "passwordFromSelfService";

// Determine the full topic name based on the topic pattern
// The default pattern is "{tenant}-{instance}-{environment}-{streamName}"
// The naming pattern can be retrieved from the Discovery API or from the support team for your deployment
var topic = $"{tenant}-{instance}-{environment}-{streamName}";

// Determine the full group id based on the group pattern
// The default pattern is "{tenant}-{instance}-{environment}-{applicationId}"
// The naming pattern can be retrieved from the Discovery API or from the support team for your deployment
var groupId = $"{tenant}-{instance}-{environment}-{applicationId}";

var consumerConfig = new ConsumerConfig
{
    // Connect to the SASL listener of the local platform
    BootstrapServers = "platform.local:9097",

    // Set the security protocol to use SASL with SSL
    SecurityProtocol = SecurityProtocol.SaslSsl,

    // Ssl settings
    SslCaLocation = $"{RootFolder}/certificates/trustedCa.pem",
    EnableSslCertificateVerification = false,

    // Sasl settings
    SaslMechanism = SaslMechanism.ScramSha512,
    SaslUsername = username,
    SaslPassword = password,

    // Specify the Kafka delivery settings
    GroupId = groupId,
    EnableAutoCommit = true,
    AutoCommitIntervalMs = 200,
    AutoOffsetReset = AutoOffsetReset.Earliest
};


var schemaRegistryConfig = new SchemaRegistryConfig()
{
    Url = "https://platform.local:24000/",

    // Ssl settings, we use custom certificate authorities for Schema Registry as well
    SslCaLocation = $"{RootFolder}/certificates/trustedCa.pem",
    EnableSslCertificateVerification = false,

    // Use the SASL Credentials
    BasicAuthCredentialsSource = AuthCredentialsSource.SaslInherit
};
// Add the SASL username password
schemaRegistryConfig.Set("sasl.username", username);
schemaRegistryConfig.Set("sasl.password", password);

When using kafka client, you need to provide the fully resolved stream name and group.id to the configuration. You can resolve the stream name and group.id by looking into the patterns.

The default patterns are:

  • Topic Pattern: {tenant}-{instance}-{environment}-{streamName}

  • Group Id Pattern: {tenant}-{instance}-{environment}-{applicationId}

  • Check your care package for ssl CA file, see also Security

  • Replace username and password with credentials generated while configuring the application in Creating Applications

  • Values of fields tenant, instance can be found in the on-boarding information, environment value is example in this Getting Started. If unsure, contact support or your technical system administrator.

  • For bootstrap.servers and Schema Registry url contact support or your technical system administrator.

With above configurations, instantiate a Kafka Consumer and start consuming records.

// Construct the Schema Registry Client and serializer
var srClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
var avroDeserializer = new AvroDeserializer<GenericRecord>(srClient).AsSyncOverAsync();

var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
    e.Cancel = true; // prevent the process from terminating.
    cts.Cancel();
};

Console.WriteLine("--------------------------------------------------------------------------------------");
Console.WriteLine($"  '{typeof(Program).Namespace}' consuming from stream '{topic}'");
Console.WriteLine("--------------------------------------------------------------------------------------");
Console.WriteLine("Starting consumer, Ctrl-C to stop consuming");

using (var consumer = new ConsumerBuilder<GenericRecord, GenericRecord>(consumerConfig)
           .SetKeyDeserializer(avroDeserializer)
           .SetValueDeserializer(avroDeserializer)
           .SetLogHandler((_, l) => Console.WriteLine($"> [{l.Level}]: {l.Message}"))
           .SetErrorHandler((_, e) => Console.WriteLine($"> [Error]: {e.Code} -{e.Reason} "))
           .Build())
{
    consumer.Subscribe(topic);

    try
    {
        while (!cts.IsCancellationRequested)
        {
            try
            {
                var consumeResult = consumer.Consume(cts.Token);

                if (consumeResult.IsPartitionEOF)
                {
                    Console.WriteLine(
                        $"> Reached end of stream {consumeResult.Topic}, partition " +
                        $"{consumeResult.Partition}, offset {consumeResult.Offset}.");

                    continue;
                }

                Console.WriteLine(
                    $"> Received message key '{consumeResult.Message.Key}' value '{consumeResult.Message.Value}'" +
                    $"at {consumeResult.TopicPartitionOffset}: ");

                try
                {
                    consumer.Commit(consumeResult);
                }
                catch (KafkaException e)
                {
                    Console.WriteLine($"> Commit error: {e.Error.Reason}");
                }
            }
            catch (ConsumeException e)
            {
                Console.WriteLine($"> Consume error: {e.Error.Reason}");
            }
        }
    }
    finally
    {
        Console.WriteLine("> Closing consumer");
        consumer.Close();
    }

    Console.WriteLine("> Done");
}

When all of the above steps have been done correctly, start your consumer app. Your app should log:

----------------------------------------------------------------------------------------
'kafka_client_sasl_avro_consumer' consuming from stream 'demo-local-example-applicationlogevents partition'
----------------------------------------------------------------------------------------
Started consumer, Ctrl-C to stop consuming
> Received message at demo-local-example-applicationlogevents partition [[1]] @41677:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1000, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 0 = some value 0}, log level = INFO, message = Message 0
> Received message at demo-local-example-applicationlogevents partition [[1]] @41678:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1001, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 1 = some value 1}, log level = INFO, message = Message 1
> Received message at demo-local-example-applicationlogevents partition [[1]] @41679:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1002, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 2 = some value 2}, log level = INFO, message = Message 2
> Received message at demo-local-example-applicationlogevents partition [[1]] @41680:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1003, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 3 = some value 3}, log level = INFO, message = Message 3
> Received message at demo-local-example-applicationlogevents partition [[1]] @41681:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1004, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 4 = some value 4}, log level = INFO, message = Message 4
> Received message at demo-local-example-applicationlogevents partition [[1]] @41682:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1005, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 5 = some value 5}, log level = INFO, message = Message 5
> Received message at demo-local-example-applicationlogevents partition [[1]] @41683:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1006, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 6 = some value 6}, log level = INFO, message = Message 6
> Received message at demo-local-example-applicationlogevents partition [[1]] @41684:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1007, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 7 = some value 7}, log level = INFO, message = Message 7
> Received message at demo-local-example-applicationlogevents partition [[1]] @41685:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1008, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 8 = some value 8}, log level = INFO, message = Message 8
> Received message at demo-local-example-applicationlogevents partition [[1]] @41686:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1009, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 9 = some value 9}, log level = INFO, message = Message 9

This is all the coding required to make a successful produce happen.

Consuming using Mutual TLS

In order to create a very basic consumer application using mTLS, you need to create the configuration for the Kafka Consumer.

// The stream name defined in Axual Self Service where we want to produce
const string streamName = "applicationlogevents";
const string applicationId = "io.axual.example.client.avro.consumer";

// Axual uses namespacing to enable tenants, instance and environments to share the same cluster
// Determine the axual specific settings, to be translated to Kafka resource names
const string tenant = "demo";
const string instance = "local";
const string environment = "example";

// SASL username which have been generated in Self-Service
const string username = "usernameFromSelfService";
const string password = "passwordFromSelfService";

// Determine the full topic name based on the topic pattern
// The default pattern is "{tenant}-{instance}-{environment}-{streamName}"
// The naming pattern can be retrieved from the Discovery API or from the support team for your deployment
var topic = $"{tenant}-{instance}-{environment}-{streamName}";

// Determine the full group id based on the group pattern
// The default pattern is "{tenant}-{instance}-{environment}-{applicationId}"
// The naming pattern can be retrieved from the Discovery API or from the support team for your deployment
var groupId = $"{tenant}-{instance}-{environment}-{applicationId}";

var consumerConfig = new ConsumerConfig
{
    // Connect to the SSL listener of the local platform
    BootstrapServers = "platform.local:9096",

    // Set the security protocol to use SSL and certificate based authentication
    SecurityProtocol = SecurityProtocol.Ssl,

    // Ssl settings
    // Keystore files are used because Schema Registry client does not support PEM files
    SslKeystoreLocation = $"{RootFolder}/certificates/example-consumer.p12",
    SslKeystorePassword = "notsecret",
    SslCaLocation = $"{RootFolder}/certificates/trustedCa.pem",
    EnableSslCertificateVerification = false,

    // Specify the Kafka delivery settings
    GroupId = groupId,
    EnableAutoCommit = true,
    AutoCommitIntervalMs = 200,
    AutoOffsetReset = AutoOffsetReset.Earliest
};

var schemaRegistryConfig = new SchemaRegistryConfig()
{
    Url = "https://platform.local:24000/",

    // Ssl settings for trusting the Schema Registry, and identifying with certificates
    // Keystore files are used because Schema Registry client does not support PEM files
    SslKeystoreLocation = $"{RootFolder}/certificates/example-consumer.p12",
    SslKeystorePassword = "notsecret",
    SslCaLocation = $"{RootFolder}/certificates/trustedCa.pem",
    EnableSslCertificateVerification = false,
};

When using kafka client, you need to provide the fully resolved stream name and group.id to the configuration. You can resolve the stream name and group.id by looking into the patterns.

The default patterns are:

  • Topic Pattern: {tenant}-{instance}-{environment}-{streamName}

  • Group Id Pattern: {tenant}-{instance}-{environment}-{applicationId}

  • Check your care package for ssl security files, see also Security

  • Values of fields tenant, instance can be found in the on-boarding information, environment value is example in this Getting Started. If unsure, contact support or your technical system administrator.

  • For bootstrap.servers and Schema Registry url contact support or your technical system administrator.

With above configurations, instantiate a Kafka Consumer and start consuming records.

 // Construct the Schema Registry Client and serializer
var srClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
var avroDeserializer = new AvroDeserializer<GenericRecord>(srClient).AsSyncOverAsync();

var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
    e.Cancel = true; // prevent the process from terminating.
    cts.Cancel();
};

Console.WriteLine("--------------------------------------------------------------------------------------");
Console.WriteLine($"  '{typeof(Program).Namespace}' consuming from stream '{topic}'");
Console.WriteLine("--------------------------------------------------------------------------------------");
Console.WriteLine("Starting consumer, Ctrl-C to stop consuming");

using (var consumer = new ConsumerBuilder<GenericRecord, GenericRecord>(consumerConfig)
           .SetKeyDeserializer(avroDeserializer)
           .SetValueDeserializer(avroDeserializer)
           .SetLogHandler((_, l) => Console.WriteLine($"> [{l.Level}]: {l.Message}"))
           .SetErrorHandler((_, e) => Console.WriteLine($"> [Error]: {e.Code} -{e.Reason} "))
           .Build())
{
    consumer.Subscribe(topic);

    try
    {
        while (!cts.IsCancellationRequested)
        {
            try
            {
                var consumeResult = consumer.Consume(cts.Token);

                if (consumeResult.IsPartitionEOF)
                {
                    Console.WriteLine(
                        $"> Reached end of stream {consumeResult.Topic}, partition " +
                        $"{consumeResult.Partition}, offset {consumeResult.Offset}.");

                    continue;
                }

                Console.WriteLine(
                    $"> Received message key '{consumeResult.Message.Key}' value '{consumeResult.Message.Value}'" +
                    $"at {consumeResult.TopicPartitionOffset}: ");

                try
                {
                    consumer.Commit(consumeResult);
                }
                catch (KafkaException e)
                {
                    Console.WriteLine($"> Commit error: {e.Error.Reason}");
                }
            }
            catch (ConsumeException e)
            {
                Console.WriteLine($"> Consume error: {e.Error.Reason}");
            }
        }
    }
    finally
    {
        Console.WriteLine("> Closing consumer");
        consumer.Close();
    }

    Console.WriteLine("> Done");
}

When all of the above steps have been done correctly, start your consumer app. Your app should log:

----------------------------------------------------------------------------------------
'kafka_client_ssl_avro_consumer' consuming from stream 'demo-local-example-applicationlogevents partition'
----------------------------------------------------------------------------------------
Started consumer, Ctrl-C to stop consuming
> Received message at demo-local-example-applicationlogevents partition [[1]] @41677:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1000, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 0 = some value 0}, log level = INFO, message = Message 0
> Received message at demo-local-example-applicationlogevents partition [[1]] @41678:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1001, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 1 = some value 1}, log level = INFO, message = Message 1
> Received message at demo-local-example-applicationlogevents partition [[1]] @41679:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1002, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 2 = some value 2}, log level = INFO, message = Message 2
> Received message at demo-local-example-applicationlogevents partition [[1]] @41680:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1003, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 3 = some value 3}, log level = INFO, message = Message 3
> Received message at demo-local-example-applicationlogevents partition [[1]] @41681:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1004, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 4 = some value 4}, log level = INFO, message = Message 4
> Received message at demo-local-example-applicationlogevents partition [[1]] @41682:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1005, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 5 = some value 5}, log level = INFO, message = Message 5
> Received message at demo-local-example-applicationlogevents partition [[1]] @41683:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1006, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 6 = some value 6}, log level = INFO, message = Message 6
> Received message at demo-local-example-applicationlogevents partition [[1]] @41684:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1007, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 7 = some value 7}, log level = INFO, message = Message 7
> Received message at demo-local-example-applicationlogevents partition [[1]] @41685:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1008, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 8 = some value 8}, log level = INFO, message = Message 8
> Received message at demo-local-example-applicationlogevents partition [[1]] @41686:
Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log
Value: [Application Log Event]: timestamp = 1009, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 9 = some value 9}, log level = INFO, message = Message 9

This is all the coding required to make a successful consumer.

Wrapping up

You have concluded the getting started section by preparing your stream & applications, requesting access to the stream and actually producing and consuming some data. If you are going to deploy your application in another environment, it is advised to enable monitoring.

You can also use the menu on the left to find information about other platform features, that might not have been touched in this Getting Started.