Consuming Using .Net Kafka Client
Creating A Consumer Application
When you have completed this step, you will have set up a consumer application that is consuming data from the topic configured in Creating topics in Avro format. To get some data onto the topic, follow Create A Producer Application.
Before continuing, please check you fulfill all the prerequisites.
Building The Application
For full executable Kafka client SASL/SSL Avro consumer example, please refer examples repository. |
Start by including the dependency on the Confluent kafka .Net client library. Add the following NuGet packages:
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.8.2" />
<PackageReference Include="Confluent.SchemaRegistry" Version="1.8.2" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.8.2" />
</ItemGroup>
Consuming using SASL
In order to create a very basic consumer application that uses SASL, you need to create the configuration for the Kafka Consumer
.
// The topic name defined in Axual Self Service where we want to produce
const string topicName = "applicationlogevents";
const string applicationId = "io.axual.example.client.avro.consumer";
// Axual uses namespacing to enable tenants, instance and environments to share the same cluster
// Determine the axual specific settings, to be translated to Kafka resource names
const string tenant = "demo";
const string instance = "local";
const string environment = "example";
// SASL username which have been generated in Self-Service
const string username = "usernameFromSelfService";
const string password = "passwordFromSelfService";
// Determine the full topic name based on the topic pattern
// The default pattern is "{tenant}-{instance}-{environment}-{topicName}"
// The naming pattern can be retrieved from the Discovery API or from the support team for your deployment
var topic = $"{tenant}-{instance}-{environment}-{topicName}";
// Determine the full group id based on the group pattern
// The default pattern is "{tenant}-{instance}-{environment}-{applicationId}"
// The naming pattern can be retrieved from the Discovery API or from the support team for your deployment
var groupId = $"{tenant}-{instance}-{environment}-{applicationId}";
// Determine the client id based on the group pattern
// The default pattern is "{tenant}-{instance}-{environment}-{applicationId}"
var clientId = $"{tenant}-{instance}-{environment}-{applicationId}";
var consumerConfig = new ConsumerConfig
{
// Connect to the SASL listener of the local platform
BootstrapServers = "platform.local:9097",
// Set the security protocol to use SASL with SSL
SecurityProtocol = SecurityProtocol.SaslSsl,
// Ssl settings
SslCaLocation = $"{RootFolder}/certificates/trustedCa.pem",
EnableSslCertificateVerification = false,
// Sasl settings
SaslMechanism = SaslMechanism.ScramSha512,
SaslUsername = username,
SaslPassword = password,
// Specify the Kafka delivery settings
GroupId = groupId,
EnableAutoCommit = true,
AutoCommitIntervalMs = 200,
AutoOffsetReset = AutoOffsetReset.Earliest,
// Specify the Kafka ClientID settings
ClientId = clientId
};
var schemaRegistryConfig = new SchemaRegistryConfig()
{
Url = "https://platform.local:24000/",
// Ssl settings, we use custom certificate authorities for Schema Registry as well
SslCaLocation = $"{RootFolder}/certificates/trustedCa.pem",
EnableSslCertificateVerification = false,
// Use the SASL Credentials
BasicAuthCredentialsSource = AuthCredentialsSource.SaslInherit
};
// Add the SASL username password
schemaRegistryConfig.Set("sasl.username", username);
schemaRegistryConfig.Set("sasl.password", password);
When using kafka client, you need to provide the fully resolved The default patterns are:
|
|
With above configurations, instantiate a Kafka Consumer
and start consuming records.
// Construct the Schema Registry Client and serializer
var srClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
var avroDeserializer = new AvroDeserializer<GenericRecord>(srClient).AsSyncOverAsync();
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
Console.WriteLine("--------------------------------------------------------------------------------------");
Console.WriteLine($" '{typeof(Program).Namespace}' consuming from stream '{topic}'");
Console.WriteLine("--------------------------------------------------------------------------------------");
Console.WriteLine("Starting consumer, Ctrl-C to stop consuming");
using (var consumer = new ConsumerBuilder<GenericRecord, GenericRecord>(consumerConfig)
.SetKeyDeserializer(avroDeserializer)
.SetValueDeserializer(avroDeserializer)
.SetLogHandler((_, l) => Console.WriteLine($"> [{l.Level}]: {l.Message}"))
.SetErrorHandler((_, e) => Console.WriteLine($"> [Error]: {e.Code} -{e.Reason} "))
.Build())
{
consumer.Subscribe(topic);
try
{
while (!cts.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
if (consumeResult.IsPartitionEOF)
{
Console.WriteLine(
$"> Reached end of stream {consumeResult.Topic}, partition " +
$"{consumeResult.Partition}, offset {consumeResult.Offset}.");
continue;
}
Console.WriteLine(
$"> Received message key '{consumeResult.Message.Key}' value '{consumeResult.Message.Value}'" +
$"at {consumeResult.TopicPartitionOffset}: ");
try
{
consumer.Commit(consumeResult);
}
catch (KafkaException e)
{
Console.WriteLine($"> Commit error: {e.Error.Reason}");
}
}
catch (ConsumeException e)
{
Console.WriteLine($"> Consume error: {e.Error.Reason}");
}
}
}
finally
{
Console.WriteLine("> Closing consumer");
consumer.Close();
}
Console.WriteLine("> Done");
}
When all of the above steps have been done correctly, start your consumer app. Your app should log:
---------------------------------------------------------------------------------------- 'kafka_client_sasl_avro_consumer' consuming from stream 'demo-local-example-applicationlogevents partition' ---------------------------------------------------------------------------------------- Started consumer, Ctrl-C to stop consuming > Received message at demo-local-example-applicationlogevents partition [[1]] @41677: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1000, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 0 = some value 0}, log level = INFO, message = Message 0 > Received message at demo-local-example-applicationlogevents partition [[1]] @41678: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1001, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 1 = some value 1}, log level = INFO, message = Message 1 > Received message at demo-local-example-applicationlogevents partition [[1]] @41679: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1002, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 2 = some value 2}, log level = INFO, message = Message 2 > Received message at demo-local-example-applicationlogevents partition [[1]] @41680: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1003, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 3 = some value 3}, log level = INFO, message = Message 3 > Received message at demo-local-example-applicationlogevents partition [[1]] @41681: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1004, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 4 = some value 4}, log level = INFO, message = Message 4 > Received message at demo-local-example-applicationlogevents partition [[1]] @41682: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1005, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 5 = some value 5}, log level = INFO, message = Message 5 > Received message at demo-local-example-applicationlogevents partition [[1]] @41683: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1006, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 6 = some value 6}, log level = INFO, message = Message 6 > Received message at demo-local-example-applicationlogevents partition [[1]] @41684: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1007, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 7 = some value 7}, log level = INFO, message = Message 7 > Received message at demo-local-example-applicationlogevents partition [[1]] @41685: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1008, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 8 = some value 8}, log level = INFO, message = Message 8 > Received message at demo-local-example-applicationlogevents partition [[1]] @41686: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1009, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 9 = some value 9}, log level = INFO, message = Message 9
This is all the coding required to make a successful produce happen.
Consuming using Mutual TLS
In order to create a very basic consumer application using mTLS, you need to create the configuration for the Kafka Consumer
.
// The topic name defined in Axual Self Service where we want to produce
const string topicName = "applicationlogevents";
const string applicationId = "io.axual.example.client.avro.consumer";
// Axual uses namespacing to enable tenants, instance and environments to share the same cluster
// Determine the axual specific settings, to be translated to Kafka resource names
const string tenant = "demo";
const string instance = "local";
const string environment = "example";
// SASL username which have been generated in Self-Service
const string username = "usernameFromSelfService";
const string password = "passwordFromSelfService";
// Determine the full topic name based on the topic pattern
// The default pattern is "{tenant}-{instance}-{environment}-{topicName}"
// The naming pattern can be retrieved from the Discovery API or from the support team for your deployment
var topic = $"{tenant}-{instance}-{environment}-{topicName}";
// Determine the full group id based on the group pattern
// The default pattern is "{tenant}-{instance}-{environment}-{applicationId}"
// The naming pattern can be retrieved from the Discovery API or from the support team for your deployment
var groupId = $"{tenant}-{instance}-{environment}-{applicationId}";
// Determine the client id based on the group pattern
// The default pattern is "{tenant}-{instance}-{environment}-{applicationId}"
var clientId = $"{tenant}-{instance}-{environment}-{applicationId}";
var consumerConfig = new ConsumerConfig
{
// Connect to the SSL listener of the local platform
BootstrapServers = "platform.local:9096",
// Set the security protocol to use SSL and certificate based authentication
SecurityProtocol = SecurityProtocol.Ssl,
// Ssl settings
// Keystore files are used because Schema Registry client does not support PEM files
SslKeystoreLocation = $"{RootFolder}/certificates/example-consumer.p12",
SslKeystorePassword = "notsecret",
SslCaLocation = $"{RootFolder}/certificates/trustedCa.pem",
EnableSslCertificateVerification = false,
// Specify the Kafka delivery settings
GroupId = groupId,
EnableAutoCommit = true,
AutoCommitIntervalMs = 200,
AutoOffsetReset = AutoOffsetReset.Earliest,
// Specify the Kafka ClientID settings
ClientId = clientId
};
var schemaRegistryConfig = new SchemaRegistryConfig()
{
Url = "https://platform.local:24000/",
// Ssl settings for trusting the Schema Registry, and identifying with certificates
// Keystore files are used because Schema Registry client does not support PEM files
SslKeystoreLocation = $"{RootFolder}/certificates/example-consumer.p12",
SslKeystorePassword = "notsecret",
SslCaLocation = $"{RootFolder}/certificates/trustedCa.pem",
EnableSslCertificateVerification = false,
};
When using kafka client, you need to provide the fully resolved The default patterns are:
|
|
With above configurations, instantiate a Kafka Consumer
and start consuming records.
// Construct the Schema Registry Client and serializer
var srClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
var avroDeserializer = new AvroDeserializer<GenericRecord>(srClient).AsSyncOverAsync();
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
Console.WriteLine("--------------------------------------------------------------------------------------");
Console.WriteLine($" '{typeof(Program).Namespace}' consuming from stream '{topic}'");
Console.WriteLine("--------------------------------------------------------------------------------------");
Console.WriteLine("Starting consumer, Ctrl-C to stop consuming");
using (var consumer = new ConsumerBuilder<GenericRecord, GenericRecord>(consumerConfig)
.SetKeyDeserializer(avroDeserializer)
.SetValueDeserializer(avroDeserializer)
.SetLogHandler((_, l) => Console.WriteLine($"> [{l.Level}]: {l.Message}"))
.SetErrorHandler((_, e) => Console.WriteLine($"> [Error]: {e.Code} -{e.Reason} "))
.Build())
{
consumer.Subscribe(topic);
try
{
while (!cts.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
if (consumeResult.IsPartitionEOF)
{
Console.WriteLine(
$"> Reached end of stream {consumeResult.Topic}, partition " +
$"{consumeResult.Partition}, offset {consumeResult.Offset}.");
continue;
}
Console.WriteLine(
$"> Received message key '{consumeResult.Message.Key}' value '{consumeResult.Message.Value}'" +
$"at {consumeResult.TopicPartitionOffset}: ");
try
{
consumer.Commit(consumeResult);
}
catch (KafkaException e)
{
Console.WriteLine($"> Commit error: {e.Error.Reason}");
}
}
catch (ConsumeException e)
{
Console.WriteLine($"> Consume error: {e.Error.Reason}");
}
}
}
finally
{
Console.WriteLine("> Closing consumer");
consumer.Close();
}
Console.WriteLine("> Done");
}
When all of the above steps have been done correctly, start your consumer app. Your app should log:
---------------------------------------------------------------------------------------- 'kafka_client_ssl_avro_consumer' consuming from stream 'demo-local-example-applicationlogevents partition' ---------------------------------------------------------------------------------------- Started consumer, Ctrl-C to stop consuming > Received message at demo-local-example-applicationlogevents partition [[1]] @41677: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1000, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 0 = some value 0}, log level = INFO, message = Message 0 > Received message at demo-local-example-applicationlogevents partition [[1]] @41678: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1001, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 1 = some value 1}, log level = INFO, message = Message 1 > Received message at demo-local-example-applicationlogevents partition [[1]] @41679: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1002, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 2 = some value 2}, log level = INFO, message = Message 2 > Received message at demo-local-example-applicationlogevents partition [[1]] @41680: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1003, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 3 = some value 3}, log level = INFO, message = Message 3 > Received message at demo-local-example-applicationlogevents partition [[1]] @41681: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1004, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 4 = some value 4}, log level = INFO, message = Message 4 > Received message at demo-local-example-applicationlogevents partition [[1]] @41682: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1005, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 5 = some value 5}, log level = INFO, message = Message 5 > Received message at demo-local-example-applicationlogevents partition [[1]] @41683: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1006, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 6 = some value 6}, log level = INFO, message = Message 6 > Received message at demo-local-example-applicationlogevents partition [[1]] @41684: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1007, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 7 = some value 7}, log level = INFO, message = Message 7 > Received message at demo-local-example-applicationlogevents partition [[1]] @41685: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1008, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 8 = some value 8}, log level = INFO, message = Message 8 > Received message at demo-local-example-applicationlogevents partition [[1]] @41686: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1009, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 9 = some value 9}, log level = INFO, message = Message 9
This is all the coding required to make a successful consumer.
Wrapping up
You have concluded the getting started section by preparing your topic & applications, requesting access to the stream and actually producing and consuming some data. If you are going to deploy your application in another environment, it is advised to enable monitoring.
You can also use the menu on the left to find information about other platform features, that might not have been touched in this Getting Started.