Consuming Using .Net Axual Client
Creating A Consumer Application
When you have completed this step, you will have set up a consumer application that is consuming data from the stream configured in Creating streams in Avro format. To get some data onto the stream, follow Create A Producer Application.
Adding Dependencies
Start by including the dependency on the Axual .NET client library as you would do with any dependency or
by adding PackageReference
tag in your .csproj
project.
For full executable Axual client Avro consumer example, please refer examples repository. |
<ItemGroup>
<PackageReference Include="Axual.Kafka.Proxy" Version="1.4.0" />
<PackageReference Include="Axual.SchemaRegistry.Serdes.Avro" Version="1.4.0" />
</ItemGroup>
Building The Application
In order to create a very basic consumer application, this is all you need.
Next step is to create the configuration for the AxualConsumerBuilder
.
Consumer Configuration
var config = new AxualConsumerConfig
{
// This is the app ID as it is configured in the self service
ApplicationId = "loganalyzer",
// The endoint of the Discovery API (used to retrieve information about bootstrap servers, schema registry, TTL etc...)
EndPoint = new UriBuilder("https", "192.168.99.100", 443).Uri,
// The tenant you are part of
Tenant = "demo",
// The environment corresponding to the shortname of the env in the self service
Environment = "example",
SecurityProtocol = SecurityProtocol.Ssl,
SslKeystorePassword = "notsecret",
SslKeystoreLocation = SSL_KEYSTORE_LOCATION, (1)
EnableSslCertificateVerification = true,
// Client verifies the identity of the broker
SslCaLocation = SSL_CA_PATH, (2)
SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None,
};
1 | For SSL_KEYSTORE_LOCATION use the absolute path to the keystore file, see also Data security (application certificate) |
2 | For SSL_CA_PATH use the absolute path to the CA file, see also Data security (setting up trust) |
Creating The Consumer
With the above configurations, we can instantiate the AxualConsumerBuilder
to build an AxualConsumer
instance.
var consumer = new AxualConsumerBuilder<Application, ApplicationLogEvent>(config)
.SetKeyDeserializer(new SpecificAvroDeserializer<Application>())
.SetValueDeserializer(new SpecificAvroDeserializer<ApplicationLogEvent>())
.SetLogHandler((_, l) => Console.WriteLine($"> [{l.Level}]: {l.Message}"))
.SetErrorHandler((_, e) => Console.WriteLine($"> [Error]: {e.Reason}"))
.Build()
Consuming Messages
Now, we are ready to start receiving records using the AxualConsumer
.
consumer.Subscribe("applicationlogevents");
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cancellationToken);
if (consumeResult.IsPartitionEOF)
{
Console.WriteLine(
$"> Reached end of stream {consumeResult.Topic}, partition " +
$"{consumeResult.Partition}, offset {consumeResult.Offset}.");
continue;
}
Console.WriteLine(
$"> Received message at {consumeResult.TopicPartitionOffset}: " + Environment.NewLine +
$"Key: {consumeResult.Message.Key}" + Environment.NewLine +
$"Value: {consumeResult.Message.Value}");
try
{
consumer.Commit(consumeResult);
}
catch (KafkaException e)
{
Console.WriteLine($"> Commit error: {e.Error.Reason}");
}
}
catch (ConsumeException e)
{
Console.WriteLine($"> Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("> Closing consumer.");
consumer.Close();
}
When all of the above steps have been done correctly, start your consumer app, your app should log:
---------------------------------------------------------------------------------------- 'axual_client_proxy_specific_avro_consumer' consuming from stream 'applicationlogevents' ---------------------------------------------------------------------------------------- Started consumer, Ctrl-C to stop consuming > Received message at applicationlogevents [[1]] @41677: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1000, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 0 = some value 0}, log level = INFO, message = Message 0 > Received message at applicationlogevents [[1]] @41678: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1001, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 1 = some value 1}, log level = INFO, message = Message 1 > Received message at applicationlogevents [[1]] @41679: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1002, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 2 = some value 2}, log level = INFO, message = Message 2 > Received message at applicationlogevents [[1]] @41680: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1003, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 3 = some value 3}, log level = INFO, message = Message 3 > Received message at applicationlogevents [[1]] @41681: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1004, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 4 = some value 4}, log level = INFO, message = Message 4 > Received message at applicationlogevents [[1]] @41682: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1005, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 5 = some value 5}, log level = INFO, message = Message 5 > Received message at applicationlogevents [[1]] @41683: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1006, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 6 = some value 6}, log level = INFO, message = Message 6 > Received message at applicationlogevents [[1]] @41684: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1007, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 7 = some value 7}, log level = INFO, message = Message 7 > Received message at applicationlogevents [[1]] @41685: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1008, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 8 = some value 8}, log level = INFO, message = Message 8 > Received message at applicationlogevents [[1]] @41686: Key: [Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log Value: [Application Log Event]: timestamp = 1009, source = {[Application]: name = Axual Proxy .NET Specific Avro Producer, version = 1.9.9, owner = Team Log}, context = {some key 9 = some value 9}, log level = INFO, message = Message 9
This is all the coding required to start a basic consumer!
Wrapping up
You have concluded the getting started section by preparing your stream and applications, requesting access to the stream and actually producing and consuming some data. Before deploying your application to production, it is advised to enable monitoring.
You can also use the menu on the left to find information about other platform features, that might not have been touched in this Getting Started.