Producing

Introduction

Producing messages to the Axual Platform consists of three main steps which are discussed in separate segments of this document.

The sequence of steps that need be taken to produce a message are:

Instantiating an AxualProducer

Namespace: Axual.Kafka.Proxy.Proxies.Axual

The Producer is the object used for producing messages.

To create such an object we make use of the Builder pattern from our code base like so:

var producer = new AxualProducerBuilder<Application, ApplicationLogEvent>(config) (1)
    .SetKeySerializer(new SpecificAvroSerializer<Application>()) (2)
    .SetValueSerializer(new SpecificAvroSerializer<ApplicationLogEvent>()) (3)
    .SetLogHandler((_, l) => Console.WriteLine($"> [{l.Level}]: {l.Message}")) (4)
    .SetErrorHandler((_, e) => Console.WriteLine($"> [Error]: {e.Reason}")) (5)
    .Build()) (6)
1 Uses the AxualProducerBuilder builder. Configuration as explained in Producer Configuration
2 Indicates the serializer for the key Objects
3 Indicates the serializer for the value Objects
4 Callback for log handling, in this case we simply log everything
5 Callback for error handling, in this case we simply log the exceptions
6 Build the producer object
It is recommended to use the above code within a using statement such that correct resource management using IDisposable is guaranteed.

Instantiating a Message

Define Message Definition

Now our producer is readily available, we can proceed to creating some meaningful information that we want to publish to the Axual Platform. That is done in the form of key-value pairs in the Kafka realm, and those key-value pairs called messages or events.

The payload of those messages is dependent on the stream definition. We will not go into too much detail on why AVRO is preferred since that is covered in the Self Service documentation. Briefly put, AVRO provides for a more efficient means to encapsulate object types compared to STRING and that is why it has our preference.

For our example, we are producing to a stream in which the key value pairs defined as following:
- Key : Application
- Value : ApplicationLogEvent
In other words, each event or message on the stream consists of an Application object for key and ApplicationLogEvent as the value.

The previously mentioned data types formalized in an AVRO file .avsc, files that express the data they consist of.

Application.avsc
{
  "type": "record",
  "name": "Application",
  "namespace": "io.axual.client.example.schema",
  "fields": [
    {
      "name": "name",
      "doc": "The name of the application",
      "type": "string"
    },
    {
      "name": "version",
      "doc": "(Optional) The application version",
      "default": null,
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "owner",
      "doc": "The owner of the application",
      "default": null,
      "type": [
        "null",
        "string"
      ]
    }
  ]
}
ApplicationLogEvent.avsc
{
  "type": "record",
  "name": "ApplicationLogEvent",
  "namespace": "io.axual.client.example.schema",
  "fields": [
    {
      "name": "timestamp",
      "doc": "Timestamp of the event",
      "type": "long"
    },
    {
      "name": "source",
      "doc": "The application that sent the event",
      "type": {
        "type": "record",
        "name": "Application",
        "namespace": "io.axual.client.example.schema",
        "fields": [
          {
            "name": "name",
            "doc": "The name of the application",
            "type": "string"
          },
          {
            "name": "version",
            "doc": "(Optional) The application version",
            "default": null,
            "type": [
              "null",
              "string"
            ]
          },
          {
            "name": "owner",
            "doc": "The owner of the application",
            "default": null,
            "type": [
              "null",
              "string"
            ]
          }
        ]
      }
    },
    {
      "name": "context",
      "doc": "The application context, contains application-specific key-value pairs",
      "type": {
        "type": "map",
        "values": "string"
      }
    },
    {
      "name": "level",
      "doc": "The log level, being either DEBUG, INFO, WARN or ERROR",
      "type": {
        "type": "enum",
        "name": "ApplicationLogLevel",
        "namespace": "io.axual.client.example.schema",
        "symbols": [
          "DEBUG",
          "INFO",
          "WARN",
          "ERROR",
          "FATAL"
        ]
      }
    },
    {
      "name": "message",
      "doc": "The log message",
      "type": "string"
    }
  ]
}

In order to produce a message according to the schema definition we use the Apache Avro Tool
Using the tool we will generation a class which based on our previously-defined schema. Once we have defined the relevant classes, there is no need to use the schema directly in our programs.

avrogen -s <schema_file> <output> --namespace avro.namespace:csharp.namespace

Application.cs
// ------------------------------------------------------------------------------
// <auto-generated>
//    Generated by avrogen, version 1.9.0.0
//    Changes to this file may cause incorrect behavior and will be lost if code
//    is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------
namespace io.axual.client.example.schema
{
	using System;
	using System.Collections.Generic;
	using System.Text;
	using Avro;
	using Avro.Specific;

	public partial class Application : ISpecificRecord
	{
		public static Schema _SCHEMA = Avro.Schema.Parse(@"{""type"":""record"",""name"":""Application"",""namespace"":""io.axual.client.example.schema"",""fields"":[{""name"":""name"",""doc"":""The name of the application"",""type"":""string""},{""name"":""version"",""doc"":""(Optional) The application version"",""default"":null,""type"":[""null"",""string""]},{""name"":""owner"",""doc"":""The owner of the application"",""default"":null,""type"":[""null"",""string""]}]}");
		/// <summary>
		/// The name of the application
		/// </summary>
		private string _name;
		/// <summary>
		/// (Optional) The application version
		/// </summary>
		private string _version;
		/// <summary>
		/// The owner of the application
		/// </summary>
		private string _owner;
		public virtual Schema Schema
		{
			get
			{
				return Application._SCHEMA;
			}
		}
		/// <summary>
		/// The name of the application
		/// </summary>
		public string name
		{
			get
			{
				return this._name;
			}
			set
			{
				this._name = value;
			}
		}
		/// <summary>
		/// (Optional) The application version
		/// </summary>
		public string version
		{
			get
			{
				return this._version;
			}
			set
			{
				this._version = value;
			}
		}
		/// <summary>
		/// The owner of the application
		/// </summary>
		public string owner
		{
			get
			{
				return this._owner;
			}
			set
			{
				this._owner = value;
			}
		}
		public virtual object Get(int fieldPos)
		{
			switch (fieldPos)
			{
			case 0: return this.name;
			case 1: return this.version;
			case 2: return this.owner;
			default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
			};
		}
		public virtual void Put(int fieldPos, object fieldValue)
		{
			switch (fieldPos)
			{
			case 0: this.name = (System.String)fieldValue; break;
			case 1: this.version = (System.String)fieldValue; break;
			case 2: this.owner = (System.String)fieldValue; break;
			default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
			};
		}

		public override string ToString()
		{
			return $"[Application]: name = {name}, version = {version}, owner = {owner}";
		}
	}
}
ApplicationLogEvent.cs
// ------------------------------------------------------------------------------
// <auto-generated>
//    Generated by avrogen, version 1.9.0.0
//    Changes to this file may cause incorrect behavior and will be lost if code
//    is regenerated
// </auto-generated>
// ------------------------------------------------------------------------------

using System.Linq;

namespace io.axual.client.example.schema
{
	using System;
	using System.Collections.Generic;
	using System.Text;
	using Avro;
	using Avro.Specific;

	public partial class ApplicationLogEvent : ISpecificRecord
	{
		public static Schema _SCHEMA = Avro.Schema.Parse(@"{""type"":""record"",""name"":""ApplicationLogEvent"",""namespace"":""io.axual.client.example.schema"",""fields"":[{""name"":""timestamp"",""doc"":""Timestamp of the event"",""type"":""long""},{""name"":""source"",""doc"":""The application that sent the event"",""type"":{""type"":""record"",""name"":""Application"",""namespace"":""io.axual.client.example.schema"",""fields"":[{""name"":""name"",""doc"":""The name of the application"",""type"":""string""},{""name"":""version"",""doc"":""(Optional) The application version"",""default"":null,""type"":[""null"",""string""]},{""name"":""owner"",""doc"":""The owner of the application"",""default"":null,""type"":[""null"",""string""]}]}},{""name"":""context"",""doc"":""The application context, contains application-specific key-value pairs"",""type"":{""type"":""map"",""values"":""string""}},{""name"":""level"",""doc"":""The log level, being either DEBUG, INFO, WARN or ERROR"",""type"":{""type"":""enum"",""name"":""ApplicationLogLevel"",""namespace"":""io.axual.client.example.schema"",""symbols"":[""DEBUG"",""INFO"",""WARN"",""ERROR"",""FATAL""]}},{""name"":""message"",""doc"":""The log message"",""type"":""string""}]}");
		/// <summary>
		/// Timestamp of the event
		/// </summary>
		private long _timestamp;
		/// <summary>
		/// The application that sent the event
		/// </summary>
		private Application _source;
		/// <summary>
		/// The application context, contains application-specific key-value pairs
		/// </summary>
		private Dictionary<string, string> _context;
		/// <summary>
		/// The log level, being either DEBUG, INFO, WARN or ERROR
		/// </summary>
		private ApplicationLogLevel _level;
		/// <summary>
		/// The log message
		/// </summary>
		private string _message;
		public virtual Schema Schema
		{
			get
			{
				return ApplicationLogEvent._SCHEMA;
			}
		}
		/// <summary>
		/// Timestamp of the event
		/// </summary>
		public long timestamp
		{
			get
			{
				return this._timestamp;
			}
			set
			{
				this._timestamp = value;
			}
		}
		/// <summary>
		/// The application that sent the event
		/// </summary>
		public Application source
		{
			get
			{
				return this._source;
			}
			set
			{
				this._source = value;
			}
		}
		/// <summary>
		/// The application context, contains application-specific key-value pairs
		/// </summary>
		public Dictionary<string, string> context
		{
			get
			{
				return this._context;
			}
			set
			{
				this._context = value;
			}
		}
		/// <summary>
		/// The log level, being either DEBUG, INFO, WARN or ERROR
		/// </summary>
		public io.axual.client.example.schema.ApplicationLogLevel level
		{
			get
			{
				return this._level;
			}
			set
			{
				this._level = value;
			}
		}
		/// <summary>
		/// The log message
		/// </summary>
		public string message
		{
			get
			{
				return this._message;
			}
			set
			{
				this._message = value;
			}
		}
		public virtual object Get(int fieldPos)
		{
			switch (fieldPos)
			{
			case 0: return this.timestamp;
			case 1: return this.source;
			case 2: return this.context;
			case 3: return this.level;
			case 4: return this.message;
			default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
			};
		}
		public virtual void Put(int fieldPos, object fieldValue)
		{
			switch (fieldPos)
			{
			case 0: this.timestamp = (System.Int64)fieldValue; break;
			case 1: this.source = (Application)fieldValue; break;
			case 2: this.context = (Dictionary<string, string>) fieldValue; break;
			case 3: this.level = (ApplicationLogLevel)fieldValue; break;
			case 4: this.message = (System.String)fieldValue; break;
			default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
			};
		}

		public override string ToString()
		{
			var contextStr = string.Join(";", context.Select(x => x.Key + " = " + x.Value).ToArray());

			return $"[Application Log Event]: timestamp = {timestamp}, source = {{{source}}}, " +
			       $"context = {{{contextStr}}}, log level = {level}, message = {message} ";
		}
	}
}

Producing the Message

Here we will use the previously created classes to produce the message using the producer.
The last step before producing is defining the stream on which we will produce on. This is the unresolved name of the stream as it appears on the Self Service. It is most common for this to be done within a loop but for simplicity we include only a single call of the produce method:

var application = new Application
{
    name = "Axual Proxy .NET Specific Avro Producer",
    version = "1.9.9",
    owner = "Team Log"
};

using (var producer = new AxualProducerBuilder<Application, ApplicationLogEvent>(config)
    .SetKeySerializer(new SpecificAvroSerializer<Application>())
    .SetValueSerializer(new SpecificAvroSerializer<ApplicationLogEvent>())
    .SetLogHandler((_, l) => Console.WriteLine($"> [{l.Level}]: {l.Message}"))
    .SetErrorHandler((_, e) => Console.WriteLine($"> [Error]: {e.Reason}"))
    .Build())
{
    // Produce 10 messages
    for (var i = 0; i < 10; i++)
    {
        try
        {
            var applicationLogEvent = new ApplicationLogEvent
            {
                timestamp = 1000 + i,
                source = application,
                message = "Message " + i,
                context = new Dictionary<string, string> {{$"some key {i}", $"some value {i}"}},
                level = ApplicationLogLevel.INFO
            };

            var message = new Message<Application, ApplicationLogEvent>
            {
                Key = application,
                Value = applicationLogEvent
            };

            producer.Produce(
				streamName, (1)
				message, r => Console.WriteLine(!r.Error.IsError
                    ? $"> Produced message to stream {r.Topic} partition {r.Partition} offset {r.Offset}"
                    : $"> Delivery Error: {r.Error.Reason}"));
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
    }

    producer.Flush();
}

Console.WriteLine("> Finish");
1 The stream name as found in Self Service