Producing Using .Net Kafka Client
Creating a Producer Application
When you have completed this step, you will have set up a producer application that is producing some randomly generated data in Avro format to the topic you have configured in Creating topics.
Before continuing, please check you fulfill all the prerequisites.
Building The Application
For full executable Kafka client SASL/SSL Avro producer example, please refer examples repository. |
Start by including the dependency on the Confluent kafka .Net client library. Add the following NuGet packages:
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.8.2" />
<PackageReference Include="Confluent.SchemaRegistry" Version="1.8.2" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.8.2" />
</ItemGroup>
Producing using SASL
In order to create a very basic producer application that use SASL, you need to create the configuration for the Kafka Producer
.
// The topic name defined in Axual Self-Service where we want to produce
const string topicName = "applicationlogevents";
// Axual uses namespacing to enable tenants, instance and environments to share the same cluster
// Determine the axual specific settings, to be translated to Kafka resource names
const string tenant = "demo";
const string instance = "local";
const string environment = "example";
// SASL username which have been generated in Self-Service
const string username = "usernameFromSelfService";
const string password = "passwordFromSelfService";
// Determine the full topic name based on the topic pattern
// The default pattern is "{tenant}-{instance}-{environment}-{topicName}"
// The naming pattern can be retrieved from the Discovery API or from the support team for your deployment
var topic = $"{tenant}-{instance}-{environment}-{topicName}";
const string applicationId = "io.axual.example.client.avro.producer";
// Determine the client id based on the group pattern
// The default pattern is "{tenant}-{instance}-{environment}-{applicationId}"
var clientId = $"{tenant}-{instance}-{environment}-{applicationId}";
var producerConfig = new ProducerConfig()
{
// Connect to the SASL listener of the local platform
BootstrapServers = "platform.local:9097",
// Set the security protocol to use SASL with SSL
SecurityProtocol = SecurityProtocol.SaslSsl,
// Ssl settings
SslCaLocation = $"{RootFolder}/certificates/trustedCa.pem",
EnableSslCertificateVerification = false,
// Sasl settings
SaslMechanism = SaslMechanism.ScramSha512,
SaslUsername = username,
SaslPassword = password,
// Specify the Kafka delivery settings
Acks = Acks.All,
LingerMs = 10,
//Debug = "all"
// Specify the Kafka ClientID settings
ClientId = clientId
};
var schemaRegistryConfig = new SchemaRegistryConfig()
{
Url = "https://platform.local:24000/",
// Ssl settings, we use custom certificate authorities for Schema Registry as well
SslCaLocation = $"{RootFolder}/certificates/trustedCa.pem",
EnableSslCertificateVerification = false,
// Use the SASL Credentials
BasicAuthCredentialsSource = AuthCredentialsSource.SaslInherit
};
// Add the SASL username password
schemaRegistryConfig.Set("sasl.username", username);
schemaRegistryConfig.Set("sasl.password", password);
When using confluent kafka client, you need to provide the fully resolved The default topic pattern is |
|
With above configurations, instantiate a Kafka Producer
and start sending records.
// Construct the Schema Registry Client and serializer
var srClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
var avroSerializer = new AvroSerializer<GenericRecord>(srClient).AsSyncOverAsync();
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
Console.WriteLine();
Console.WriteLine("-------------------------------------------------------------------------------------");
Console.WriteLine($" '{typeof(Program).Namespace}' producing to stream '{topic}'");
Console.WriteLine("-------------------------------------------------------------------------------------");
Console.WriteLine("Starting producer, Ctrl-C to stop producing");
using (var producer = new ProducerBuilder<GenericRecord, GenericRecord>(producerConfig)
.SetKeySerializer(avroSerializer)
.SetValueSerializer(avroSerializer)
.SetLogHandler((_, l) => Console.WriteLine($"> [{l.Level}]: {l.Message}"))
.SetErrorHandler((_, e) => Console.WriteLine($"> [Error]: {e.Reason}"))
.Build())
{
var id = new BigInteger(0);
var appName = "Example Avro SASL Producer";
var owner = "Axual Demo";
var logContext = new Dictionary<string, object> {{"some key", "some value"}};
var logLevel = "INFO";
while (!cts.IsCancellationRequested)
{
var logMessage = $"Logging for producer, iteration {id}";
// By changing the version field the target partition will change on the topic
var key = CreateApplicationRecord(appName, id.ToString(), owner);
var val = CreateApplicationLogEventRecord(key, DateTime.Now.ToFileTimeUtc(), logLevel,logMessage,logContext);
id++;
try
{
producer.Produce(
topic, new Message<GenericRecord, GenericRecord> {Key = key, Value = val},
r => Console.WriteLine(!r.Error.IsError
? $"> Produced message: log message '{logMessage}' to stream {r.Topic} partition {r.Partition} offset {r.Offset}"
: $"> Delivery Error: {r.Error.Reason}"));
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
Thread.Sleep(100);
}
producer.Flush();
}
Console.WriteLine("> Done");
The CreateApplicationRecord
and CreateApplicationLogEventRecord
used in above code to construct an Avro record based on the schema definition.
// Construct an Avro Generic Record based on the Application schema definition
private static GenericRecord CreateApplicationRecord(string applicationName, string version = null,
string owner = null)
{
var application = new GenericRecord(ApplicationSchema);
application.Add("name", applicationName);
application.Add("version", version);
application.Add("owner", owner);
return application;
}
// Construct an Avro Generic Record based on the ApplicationLogEvent schema definition
private static GenericRecord CreateApplicationLogEventRecord(GenericRecord application, long timestamp,
string logLevel, string message, Dictionary<string, object> context)
{
var levelSchema = (EnumSchema) ApplicationLogEventSchema["level"].Schema;
var logEvent = new GenericRecord(ApplicationLogEventSchema);
logEvent.Add("timestamp", timestamp);
logEvent.Add("source", application);
if (logLevel != null)
{
var enumLogLevel = new GenericEnum(levelSchema, logLevel);
logEvent.Add("level", enumLogLevel);
}
logEvent.Add("message", message);
logEvent.Add("context", context);
return logEvent;
}
When all of the above steps have been done correctly, start your producer app. Your app will produce logging that will look like this:
--------------------------------------------------------------------------------------
'kafka_client_sasl_avro_producer' producing to stream 'demo-local-example-applicationlogevents'
--------------------------------------------------------------------------------------
Starting producer, Ctrl-C to stop producing
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41667
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41668
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41669
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41670
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41671
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41672
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41673
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41674
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41675
> Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41676
> Finish
This is all the coding required to make a successful produce happen.
Producing using Mutual TLS
In order to create a very basic producer application using mTLS, you need to create the configuration for the Kafka Producer
.
const string streamName = "applicationlogevents";
// Axual uses namespacing to enable tenants, instance and environments to share the same cluster
// Determine the axual specific settings, to be translated to Kafka resource names
const string tenant = "demo";
const string instance = "local";
const string environment = "example";
// Determine the full topic name based on the topic pattern
// The default pattern is "{tenant}-{instance}-{environment}-{topicName}"
// The naming pattern can be retrieved from the Discovery API or from the support team for your deployment
var topic = $"{tenant}-{instance}-{environment}-{streamName}";
const string applicationId = "io.axual.example.client.avro.producer";
// Determine the client id based on the group pattern
// The default pattern is "{tenant}-{instance}-{environment}-{applicationId}"
var clientId = $"{tenant}-{instance}-{environment}-{applicationId}";
var producerConfig = new ProducerConfig()
{
// Connect to the SSL listener of the local platform
BootstrapServers = "platform.local:9096",
// Set the security protocol to use SSL and certificate based authentication
SecurityProtocol = SecurityProtocol.Ssl,
// Ssl settings
// Keystore files are used because Schema Registry client does not support PEM files
SslKeystoreLocation = $"{RootFolder}/certificates/example-producer.p12",
SslKeystorePassword = "notsecret",
SslCaLocation = $"{RootFolder}/certificates/trustedCa.pem",
EnableSslCertificateVerification = false,
// Specify the Kafka delivery settings
Acks = Acks.All,
LingerMs = 10,
//Debug = "all"
// Specify the Kafka ClientID settings
ClientId = clientId
};
var schemaRegistryConfig = new SchemaRegistryConfig()
{
Url = "https://platform.local:24000/",
// Ssl settings for trusting the Schema Registry, and identifying with certificates
// Keystore files are used because Schema Registry client does not support PEM files
SslKeystoreLocation = $"{RootFolder}/certificates/example-producer.p12",
SslKeystorePassword = "notsecret",
SslCaLocation = $"{RootFolder}/certificates/trustedCa.pem",
EnableSslCertificateVerification = false,
};
When using confluent kafka client, you need to provide the fully resolved The default topic pattern is |
|
With above configurations, instantiate a Kafka Producer
and start sending records.
// Construct the Schema Registry Client and serializer
var srClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
var avroSerializer = new AvroSerializer<GenericRecord>(srClient).AsSyncOverAsync();
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
Console.WriteLine();
Console.WriteLine("-------------------------------------------------------------------------------------");
Console.WriteLine($" '{typeof(Program).Namespace}' producing to stream '{topic}'");
Console.WriteLine("-------------------------------------------------------------------------------------");
Console.WriteLine("Starting producer, Ctrl-C to stop producing");
using (var producer = new ProducerBuilder<GenericRecord, GenericRecord>(producerConfig)
.SetKeySerializer(avroSerializer)
.SetValueSerializer(avroSerializer)
.SetLogHandler((_, l) => Console.WriteLine($"> [{l.Level}]: {l.Message}"))
.SetErrorHandler((_, e) => Console.WriteLine($"> [Error]: {e.Reason}"))
.Build())
{
var id = new BigInteger(0);
var appName = "Example Avro SSL Producer";
var owner = "Axual Demo";
var logContext = new Dictionary<string, object> {{"some key", "some value"}};
var logLevel = "INFO";
while (!cts.IsCancellationRequested)
{
var logMessage = $"Logging for producer, iteration {id}";
// By changing the version field the target partition will change on the topic
var key = CreateApplicationRecord(appName, id.ToString(), owner);
var val = CreateApplicationLogEventRecord(key, DateTime.Now.ToFileTimeUtc(), logLevel,logMessage,logContext);
id++;
try
{
producer.Produce(
topic, new Message<GenericRecord, GenericRecord> {Key = key, Value = val},
r => Console.WriteLine(!r.Error.IsError
? $"> Produced message: log message '{logMessage}' to stream {r.Topic} partition {r.Partition} offset {r.Offset}"
: $"> Delivery Error: {r.Error.Reason}"));
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
Thread.Sleep(100);
}
producer.Flush();
}
Console.WriteLine("> Done");
The CreateApplicationRecord
and CreateApplicationLogEventRecord
used in above code to construct an Avro record based on the schema definition.
// Construct an Avro Generic Record based on the Application schema definition
private static GenericRecord CreateApplicationRecord(string applicationName, string version = null,
string owner = null)
{
var application = new GenericRecord(ApplicationSchema);
application.Add("name", applicationName);
application.Add("version", version);
application.Add("owner", owner);
return application;
}
// Construct an Avro Generic Record based on the ApplicationLogEvent schema definition
private static GenericRecord CreateApplicationLogEventRecord(GenericRecord application, long timestamp,
string logLevel, string message, Dictionary<string, object> context)
{
var levelSchema = (EnumSchema) ApplicationLogEventSchema["level"].Schema;
var logEvent = new GenericRecord(ApplicationLogEventSchema);
logEvent.Add("timestamp", timestamp);
logEvent.Add("source", application);
if (logLevel != null)
{
var enumLogLevel = new GenericEnum(levelSchema, logLevel);
logEvent.Add("level", enumLogLevel);
}
logEvent.Add("message", message);
logEvent.Add("context", context);
return logEvent;
}
When all the above steps have been done correctly, start your producer app. Your app will produce logging that will look like this:
-------------------------------------------------------------------------------------- 'kafka_client_ssl_avro_producer' producing to stream 'demo-local-example-applicationlogevents' -------------------------------------------------------------------------------------- Starting producer, Ctrl-C to stop producing > Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41667 > Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41668 > Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41669 > Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41670 > Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41671 > Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41672 > Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41673 > Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41674 > Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41675 > Produced message to stream demo-local-example-applicationlogevents partition [1] offset 41676 > Finish
This is all the coding required to make a successful produce happen.
Next Step: Consuming Data Using A .Net Consumer Application
In the next step you will create a Java consumer application to use the data you just produced.
Proceed to Consuming Data (.Net Kafka Client)