Producing Using .Net Kafka Client

Creating a Producer Application

When you have completed this step, you will have set up a producer application that is producing some randomly generated data in Avro format to the topic you have configured in Creating topics.

Before continuing, please check you fulfill all the prerequisites.

Building The Application

For full executable Kafka client SASL/SSL Avro producer example, please refer examples repository.

Start by including the dependency on the Confluent kafka .Net client library. Add the following NuGet packages:

<ItemGroup>
      <PackageReference Include="Confluent.Kafka" Version="1.8.2" />
      <PackageReference Include="Confluent.SchemaRegistry" Version="1.8.2" />
      <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.8.2" />
</ItemGroup>

Producing using SASL

In order to create a very basic producer application that use SASL, you need to create the configuration for the Kafka Producer.

// The topic name defined in Axual Self-Service where we want to produce
const string topicName = "applicationlogevents";

// Axual uses namespacing to enable tenants, instance and environments to share the same cluster
// Determine the axual specific settings, to be translated to Kafka resource names
const string tenant = "demo";
const string instance = "local";
const string environment = "example";

// SASL username which have been generated in Self-Service
const string username = "usernameFromSelfService";
const string password = "passwordFromSelfService";

// Determine the full topic name based on the topic pattern
// The default pattern is "{tenant}-{instance}-{environment}-{topicName}"
// The naming pattern can be retrieved from the Discovery API or from the support team for your deployment
var topic = $"{tenant}-{instance}-{environment}-{topicName}";

const string applicationId = "io.axual.example.client.avro.producer";
// Determine the client id based on the group pattern
// The default pattern is "{tenant}-{instance}-{environment}-{applicationId}"
var clientId = $"{tenant}-{instance}-{environment}-{applicationId}";

var producerConfig = new ProducerConfig()
{
    // Connect to the SASL listener of the local platform
    BootstrapServers = "platform.local:9097",

    // Set the security protocol to use SASL with SSL
    SecurityProtocol = SecurityProtocol.SaslSsl,

    // Ssl settings
    SslCaLocation = $"{RootFolder}/certificates/trustedCa.pem",
    EnableSslCertificateVerification = false,

    // Sasl settings
    SaslMechanism = SaslMechanism.ScramSha512,
    SaslUsername = username,
    SaslPassword = password,

    // Specify the Kafka delivery settings
    Acks = Acks.All,
    LingerMs = 10,

    //Debug = "all"

    // Specify the Kafka ClientID settings
    ClientId = clientId
};

var schemaRegistryConfig = new SchemaRegistryConfig()
{
    Url = "https://platform.local:24000/",

    // Ssl settings, we use custom certificate authorities for Schema Registry as well
    SslCaLocation = $"{RootFolder}/certificates/trustedCa.pem",
    EnableSslCertificateVerification = false,

    // Use the SASL Credentials
    BasicAuthCredentialsSource = AuthCredentialsSource.SaslInherit
};
// Add the SASL username password
schemaRegistryConfig.Set("sasl.username", username);
schemaRegistryConfig.Set("sasl.password", password);

When using confluent kafka client, you need to provide the fully resolved topic name to the configuration. You can resolve the topic name by looking into the pattern.

The default topic pattern is {tenant}-{instance}-{environment}-{topicName}

  • Check your essentials-package for the ssl CA file, see also Security

  • Replace username and password with credentials generated while configuring the application in Creating Applications

  • Values of fields tenant, instance can be found in the on-boarding information, environment value is example in this Getting Started. If unsure, contact support or your technical system administrator.

  • For bootstrap.servers and Schema Registry url contact support or your technical system administrator.

With above configurations, instantiate a Kafka Producer and start sending records.

// Construct the Schema Registry Client and serializer
 var srClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
 var avroSerializer = new AvroSerializer<GenericRecord>(srClient).AsSyncOverAsync();

 var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
 Console.CancelKeyPress += (_, e) =>
 {
     e.Cancel = true; // prevent the process from terminating.
     cts.Cancel();
 };

 Console.WriteLine();
 Console.WriteLine("-------------------------------------------------------------------------------------");
 Console.WriteLine($"  '{typeof(Program).Namespace}' producing to stream '{topic}'");
 Console.WriteLine("-------------------------------------------------------------------------------------");
 Console.WriteLine("Starting producer, Ctrl-C to stop producing");

 using (var producer = new ProducerBuilder<GenericRecord, GenericRecord>(producerConfig)
     .SetKeySerializer(avroSerializer)
     .SetValueSerializer(avroSerializer)
     .SetLogHandler((_, l) => Console.WriteLine($"> [{l.Level}]: {l.Message}"))
     .SetErrorHandler((_, e) => Console.WriteLine($"> [Error]: {e.Reason}"))
     .Build())
 {
     var id = new BigInteger(0);
     var appName = "Example Avro SASL Producer";
     var owner = "Axual Demo";
     var logContext = new Dictionary<string, object> {{"some key", "some value"}};
     var logLevel = "INFO";

     while (!cts.IsCancellationRequested)
     {
         var logMessage = $"Logging for producer, iteration {id}";
         // By changing the version field the target partition will change on the topic
         var key = CreateApplicationRecord(appName, id.ToString(), owner);
         var val = CreateApplicationLogEventRecord(key, DateTime.Now.ToFileTimeUtc(), logLevel,logMessage,logContext);
         id++;

         try
         {
             producer.Produce(
                 topic, new Message<GenericRecord, GenericRecord> {Key = key, Value = val},
                 r => Console.WriteLine(!r.Error.IsError
                     ? $"> Produced message: log message '{logMessage}' to stream {r.Topic} partition {r.Partition} offset {r.Offset}"
                     : $"> Delivery Error: {r.Error.Reason}"));
         }
         catch (Exception ex)
         {
             Console.WriteLine(ex);
         }

         Thread.Sleep(100);
     }

     producer.Flush();
 }

 Console.WriteLine("> Done");

The CreateApplicationRecord and CreateApplicationLogEventRecord used in above code to construct an Avro record based on the schema definition.

// Construct an Avro Generic Record based on the Application schema definition
private static GenericRecord CreateApplicationRecord(string applicationName, string version = null,
    string owner = null)
{
    var application = new GenericRecord(ApplicationSchema);
    application.Add("name", applicationName);
    application.Add("version", version);
    application.Add("owner", owner);
    return application;
}

// Construct an Avro Generic Record based on the ApplicationLogEvent schema definition
private static GenericRecord CreateApplicationLogEventRecord(GenericRecord application, long timestamp,
    string logLevel, string message, Dictionary<string, object> context)
{
    var levelSchema = (EnumSchema) ApplicationLogEventSchema["level"].Schema;

    var logEvent = new GenericRecord(ApplicationLogEventSchema);
    logEvent.Add("timestamp", timestamp);
    logEvent.Add("source", application);
    if (logLevel != null)
    {
        var enumLogLevel = new GenericEnum(levelSchema, logLevel);
        logEvent.Add("level", enumLogLevel);
    }

    logEvent.Add("message", message);
    logEvent.Add("context", context);

    return logEvent;
}

When all of the above steps have been done correctly, start your producer app. Your app will produce logging that will look like this:

--------------------------------------------------------------------------------------
'kafka_client_sasl_avro_producer' producing to stream 'demo-local-example-applicationlogevents'
--------------------------------------------------------------------------------------
Starting producer, Ctrl-C to stop producing
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41667
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41668
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41669
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41670
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41671
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41672
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41673
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41674
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41675
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41676
> Finish

This is all the coding required to make a successful produce happen.

Producing using Mutual TLS

In order to create a very basic producer application using mTLS, you need to create the configuration for the Kafka Producer.

const string streamName = "applicationlogevents";

// Axual uses namespacing to enable tenants, instance and environments to share the same cluster
// Determine the axual specific settings, to be translated to Kafka resource names
const string tenant = "demo";
const string instance = "local";
const string environment = "example";

// Determine the full topic name based on the topic pattern
// The default pattern is "{tenant}-{instance}-{environment}-{topicName}"
// The naming pattern can be retrieved from the Discovery API or from the support team for your deployment
var topic = $"{tenant}-{instance}-{environment}-{streamName}";

const string applicationId = "io.axual.example.client.avro.producer";
// Determine the client id based on the group pattern
// The default pattern is "{tenant}-{instance}-{environment}-{applicationId}"
var clientId = $"{tenant}-{instance}-{environment}-{applicationId}";

var producerConfig = new ProducerConfig()
{
    // Connect to the SSL listener of the local platform
    BootstrapServers = "platform.local:9096",

    // Set the security protocol to use SSL and certificate based authentication
    SecurityProtocol = SecurityProtocol.Ssl,

    // Ssl settings
    // Keystore files are used because Schema Registry client does not support PEM files
    SslKeystoreLocation = $"{RootFolder}/certificates/example-producer.p12",
    SslKeystorePassword = "notsecret",
    SslCaLocation = $"{RootFolder}/certificates/trustedCa.pem",
    EnableSslCertificateVerification = false,

    // Specify the Kafka delivery settings
    Acks = Acks.All,
    LingerMs = 10,

    //Debug = "all"

    // Specify the Kafka ClientID settings
    ClientId = clientId
};

var schemaRegistryConfig = new SchemaRegistryConfig()
{
    Url = "https://platform.local:24000/",

    // Ssl settings for trusting the Schema Registry, and identifying with certificates
    // Keystore files are used because Schema Registry client does not support PEM files
    SslKeystoreLocation = $"{RootFolder}/certificates/example-producer.p12",
    SslKeystorePassword = "notsecret",
    SslCaLocation = $"{RootFolder}/certificates/trustedCa.pem",
    EnableSslCertificateVerification = false,
};

When using confluent kafka client, you need to provide the fully resolved topic name to the configuration. You can resolve the topic name by looking into the pattern.

The default topic pattern is {tenant}-{instance}-{environment}-{topicName}

  • Check your essentials-package for ssl security files, see also Security

  • Values of fields tenant, instance can be found in the on-boarding information, environment value is example in this Getting Started. If unsure, contact support or your technical system administrator.

  • For bootstrap.servers and Schema Registry url contact support or your technical system administrator.

With above configurations, instantiate a Kafka Producer and start sending records.

// Construct the Schema Registry Client and serializer
 var srClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
 var avroSerializer = new AvroSerializer<GenericRecord>(srClient).AsSyncOverAsync();

 var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
 Console.CancelKeyPress += (_, e) =>
 {
     e.Cancel = true; // prevent the process from terminating.
     cts.Cancel();
 };

 Console.WriteLine();
 Console.WriteLine("-------------------------------------------------------------------------------------");
 Console.WriteLine($"  '{typeof(Program).Namespace}' producing to stream '{topic}'");
 Console.WriteLine("-------------------------------------------------------------------------------------");
 Console.WriteLine("Starting producer, Ctrl-C to stop producing");

 using (var producer = new ProducerBuilder<GenericRecord, GenericRecord>(producerConfig)
     .SetKeySerializer(avroSerializer)
     .SetValueSerializer(avroSerializer)
     .SetLogHandler((_, l) => Console.WriteLine($"> [{l.Level}]: {l.Message}"))
     .SetErrorHandler((_, e) => Console.WriteLine($"> [Error]: {e.Reason}"))
     .Build())
 {
     var id = new BigInteger(0);
     var appName = "Example Avro SSL Producer";
     var owner = "Axual Demo";
     var logContext = new Dictionary<string, object> {{"some key", "some value"}};
     var logLevel = "INFO";

     while (!cts.IsCancellationRequested)
     {
         var logMessage = $"Logging for producer, iteration {id}";
         // By changing the version field the target partition will change on the topic
         var key = CreateApplicationRecord(appName, id.ToString(), owner);
         var val = CreateApplicationLogEventRecord(key, DateTime.Now.ToFileTimeUtc(), logLevel,logMessage,logContext);
         id++;

         try
         {
             producer.Produce(
                 topic, new Message<GenericRecord, GenericRecord> {Key = key, Value = val},
                 r => Console.WriteLine(!r.Error.IsError
                     ? $"> Produced message: log message '{logMessage}' to stream {r.Topic} partition {r.Partition} offset {r.Offset}"
                     : $"> Delivery Error: {r.Error.Reason}"));
         }
         catch (Exception ex)
         {
             Console.WriteLine(ex);
         }

         Thread.Sleep(100);
     }

     producer.Flush();
 }

 Console.WriteLine("> Done");

The CreateApplicationRecord and CreateApplicationLogEventRecord used in above code to construct an Avro record based on the schema definition.

// Construct an Avro Generic Record based on the Application schema definition
private static GenericRecord CreateApplicationRecord(string applicationName, string version = null,
    string owner = null)
{
    var application = new GenericRecord(ApplicationSchema);
    application.Add("name", applicationName);
    application.Add("version", version);
    application.Add("owner", owner);
    return application;
}

// Construct an Avro Generic Record based on the ApplicationLogEvent schema definition
private static GenericRecord CreateApplicationLogEventRecord(GenericRecord application, long timestamp,
    string logLevel, string message, Dictionary<string, object> context)
{
    var levelSchema = (EnumSchema) ApplicationLogEventSchema["level"].Schema;

    var logEvent = new GenericRecord(ApplicationLogEventSchema);
    logEvent.Add("timestamp", timestamp);
    logEvent.Add("source", application);
    if (logLevel != null)
    {
        var enumLogLevel = new GenericEnum(levelSchema, logLevel);
        logEvent.Add("level", enumLogLevel);
    }

    logEvent.Add("message", message);
    logEvent.Add("context", context);

    return logEvent;
}

When all the above steps have been done correctly, start your producer app. Your app will produce logging that will look like this:

--------------------------------------------------------------------------------------
'kafka_client_ssl_avro_producer' producing to stream 'demo-local-example-applicationlogevents'
--------------------------------------------------------------------------------------
Starting producer, Ctrl-C to stop producing
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41667
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41668
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41669
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41670
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41671
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41672
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41673
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41674
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41675
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41676
> Finish

This is all the coding required to make a successful produce happen.

Next Step: Consuming Data Using A .Net Consumer Application

In the next step you will create a Java consumer application to use the data you just produced.