Step 5: Producing data

Choose your client

There are many ways you can produce data to your stream. In this Getting Started we have covered three ways for both producing and consuming the data to and from your stream.
Use the selector below to follow the right steps for your client type.

java producer
NET producer
REST producer

Java Producer Application using the latest version of the Java Axual Client

.NET Producer Application using the latest version of the .NET Axual Client

REST Producer using any tool that can perform REST calls

Building a Java application yourself using Axual Java Client to produce data to the stream.

Building a .NET application yourself using Axual .NET Client to produce data to the stream.

Creating REST calls to produce data to the stream.

As of release 2020.3 you can also use the Axual Python Client

Creating A Java Producer Application

When you have completed this step, you will have set up a producer application that is producing some randomly generated data in Avro format to the stream you have configured in step 2.

Prerequisites

A computer equipped with:

Building The Application

Start by including the dependency on the Axual client library as you would do with any dependency.

<dependency>
    <groupId>io.axual.client</groupId>
    <artifactId>axual-client</artifactId>
    <version>5.5.4</version>
</dependency>

In order to create a very basic producer application, this is all you need. Next step is to create the configuration for the AxualClient and also for the Producer that will be used.

ClientConfig config = ClientConfig.newBuilder()
        .setApplicationId("io.axual.example.client.avro.producer")
        .setApplicationVersion("0.0.1")
        .setEndpoint("https://192.168.99.100:443")
        .setTenant("demo")
        .setEnvironment("example")
        .setSslConfig(
                SslConfig.newBuilder()
                        .setEnableHostnameVerification(false)
                        .setKeystoreLocation("/path/to/example-producer.client.jks") // Absolute path to the application keystore
                        .setKeystorePassword(new PasswordConfig("notsecret"))
                        .setKeyPassword(new PasswordConfig("notsecret"))
                        .setTruststoreLocation("/path/to/common-truststore.jks") // Absolute path to the application truststore
                        .setTruststorePassword(new PasswordConfig("notsecret"))
                        .build()
        )
        .build();
Check your care package for the truststore and keystore files, see also Step 3

The next step is creating a ProducerConfig.

SpecificAvroProducerConfig<Application, ApplicationLogEvent> specificAvroProducerConfig =
    SpecificAvroProducerConfig.<Application, ApplicationLogEvent>builder()
        // The strategy you want your producer to apply while sending records.
        .setDeliveryStrategy(DeliveryStrategy.AT_LEAST_ONCE)
        .setOrderingStrategy(OrderingStrategy.KEEPING_ORDER)
        .setMessageBufferWaitTimeout(100)
        .setBatchSize(1)
        .setProxyChain(ProxyChain.newBuilder()
            .append(SWITCHING_PROXY_ID)
            .append(RESOLVING_PROXY_ID)
            .append(LINEAGE_PROXY_ID)
            .append(HEADER_PROXY_ID)
            .build())
        .build();

With these configurations, we can instantiate an AxualClient and a Producer and use those to start sending records.

/* Both the AxualClient and the Producer are AutoCloseable, so can be used in a try-with-resources block.*/
try (final AxualClient axualClient = new AxualClient(config);
     final LogEventSpecificProducer producer = new LogEventSpecificProducer(axualClient, specificAvroProducerConfig, "logeventproducer", "0.0.1", "Team Log")) {
    // The produce method returns a `Future` with which you can do all your usual Future operations that you desire. This will act on errors or on a successful produce.
    futures = IntStream.range(0, 1)
            .mapToObj(i -> producer.produce("app_" + i, 1000 + i, INFO, String.format("Message %d", i), Collections.singletonMap("Some key", "Some Value")))
            .collect(Collectors.toList());

    do {
        futures.removeIf(future -> {
            if (!future.isDone()) {
                return false;
            }

            try {
                ProducedMessage<Application, ApplicationLogEvent> producedMessage = future.get();
                LOG.info("Produced message to topic {} partition {} offset {}", producedMessage.getStream(), producedMessage.getPartition(), producedMessage.getOffset());
            } catch (InterruptedException | ExecutionException e) {
                LOG.error("Error getting future, produce failed", e);
            }
            return true;
        });
        sleep(100);
    } while (!futures.isEmpty());
}

LogEventSpecificProducer used in above code to produce messages:

public class LogEventSpecificProducer implements AutoCloseable {
    private final Producer<Application, ApplicationLogEvent> producer;
    private final Application application;

    public LogEventSpecificProducer(
            final AxualClient axualClient
            , final SpecificAvroProducerConfig<Application, ApplicationLogEvent> producerConfig
            , final String applicationName
            , final String applicationVersion
            , final String applicationOwner) {
        producer = axualClient.buildProducer(producerConfig);
        this.application = Application.newBuilder()
                .setName(applicationName)
                .setVersion(applicationVersion)
                .setOwner(applicationOwner)
                .build();
    }

    public Future<ProducedMessage<Application, ApplicationLogEvent>> produce(
            final String applicationName,
            final long timestamp,
            final ApplicationLogLevel logLevel,
            final CharSequence logMessage,
            final Map<CharSequence, CharSequence> context) {
        Application key = Application.newBuilder()
                .setName(applicationName)
                .setVersion("1.9.9")
                .setOwner("none")
                .build();
        ApplicationLogEvent value = ApplicationLogEvent.newBuilder()
                .setTimestamp(timestamp)
                .setSource(this.application)
                .setLevel(logLevel)
                .setMessage(logMessage)
                .setContext(context)
                .build();

        ProducerMessage<Application, ApplicationLogEvent> message = ProducerMessage.<Application, ApplicationLogEvent>newBuilder()
                .setStream("applicationlogevents")
                .setKey(key)
                .setValue(value)
                .build();
        Future<ProducedMessage<Application, ApplicationLogEvent>> result = producer.produce(message);
        return result;
    }

    @Override
    public void close() {
        producer.close();
    }
}

When all of the above steps have been done correctly, start your producer app. With the logging level for the io.axual package to INFO, your app will produce logging that will look like this:

[main] INFO io.axual.client.example.axualclient.avro.ClientAvroProducerApp - Creating producerConfig
[main] INFO io.axual.client.example.axualclient.avro.ClientAvroProducerApp - Creating client and LogSpecificEventProducer
[main] INFO io.axual.client.producer.generic.ProducerWorkerManager - Claiming worker	Producer                 : io.axual.client.producer.avro.GenericAvroProducer@cac736f	Sink                     : io.axual.client.config.SpecificAvroProducerConfig@5e265ba4	DeliveryStrategy         : AT_LEAST_ONCE	OrderingStrategy         : KEEPING_ORDER	MessageBufferSize        : 1000	messageBufferWaitTimeout : 100	blockedMessageInsert     : true
[main] INFO io.axual.client.producer.generic.ProducerWorker - Creating new ProducerWorker
[main] INFO io.axual.client.producer.generic.ProducerWorker - Creating a new Axual producer with properties: {...}
[main] INFO io.axual.discovery.client.fetcher.DiscoveryLoader - TTL updated to: 5000 (was: 0)
[main] INFO io.axual.discovery.client.fetcher.DiscoveryLoader - Fetched discovery properties: DiscoveryResult: {...}
[main] INFO io.axual.client.proxy.switching.discovery.DiscoverySubscriber - Received new DiscoveryResult for SwitchingProducer: DiscoveryResult: {...}
[main] INFO io.axual.client.proxy.switching.discovery.DiscoverySubscriber - Switching SwitchingProducer from null to clusterA
[main] INFO io.axual.client.proxy.switching.generic.BaseClientProxySwitcher - Creating new backing producer with Discovery API result: DiscoveryResult: {...}
[main] INFO io.axual.client.proxy.switching.producer.ProducerSwitcher - Creating a new producer with properties: {...}
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: ...

[main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ...

[main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ...

[main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ...

[main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ...

[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'acl.principal.builder' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'cluster' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'instance' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'topic.pattern' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'schema.registry.url' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'endpoint' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'tenant' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'group.id.pattern' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'app.id' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'ttl' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'distributor.distance' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'enable.value.headers' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'app.version' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'rest.proxy.url' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'environment' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'distributor.timeout' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'system' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'group.id.resolver' was supplied but isn't a known config.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.3.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: fc1aaa116b661c8a
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1585830850920
[main] INFO io.axual.client.proxy.switching.generic.BaseClientProxySwitcher - Created new backing producer
[main] INFO io.axual.client.producer.generic.ProducerWorker - Created a new Axual producer
[main] INFO io.axual.client.producer.generic.GenericProducer - Created producer of type AT_LEAST_ONCE/KEEPING_ORDER.
[ProducerWorker0] INFO io.axual.client.producer.generic.ProducerWorker - Starting ProducerWorker thread ProducerWorker0
[kafka-producer-network-thread | producer-logeventproducer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-logeventproducer-1] Cluster ID: JICqHTqLTuaJ9-TObXa4sQ
[main] INFO io.axual.client.example.axualclient.avro.ClientAvroProducerApp - Produced message to topic applicationlogevents partition 3 offset 7
[main] INFO io.axual.client.producer.generic.ProducerWorkerManager - Releasing producer io.axual.client.producer.avro.GenericAvroProducer@cac736f
[main] INFO io.axual.client.producer.generic.ProducerWorker - ProducerWorker cancel requested
[ProducerWorker0] INFO io.axual.client.producer.generic.ProducerWorker - ProducerWorker cancelled, flushing 0 messages in buffer
[ProducerWorker0] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-logeventproducer-1] Closing the Kafka producer with timeoutMillis = 86399913600000 ms.
[ProducerWorker0] INFO io.axual.client.producer.generic.ProducerWorker - Exiting ProducerWorker thread ProducerWorker0
[main] INFO io.axual.client.producer.generic.ProducerWorkerManager - Thread join took PT0.012S
[main] INFO io.axual.client.AxualClient - Closing AxualClient logeventproducer
[main] INFO io.axual.client.producer.generic.ProducerWorkerManager - Shutting down ProducerWorkerManager, cancelling all workers
[main] INFO io.axual.client.example.axualclient.avro.ClientAvroProducerApp - Done!
[Janitor shutdown thread] INFO io.axual.client.janitor.Janitor - Janitor shutdown activated, cleaning up

This is all the coding required to make a successful produce happen.

Next Step: Consuming Data Using A Java Consumer Application

In the next step you will create a Java consumer application to use the data you just produced.

Creating A .NET Producer

When you have completed this step, you will have produced data in AVRO format to the stream you have configured in step 2.

Prerequisites

A computer equipped with:

Adding Dependencies

Start by including the dependency on the Axual .NET client library as you would do with any dependency or
by adding PackageReference tag in your .csproj project.

<ItemGroup>
      <PackageReference Include="Axual.Kafka.Proxy" Version="1.0.2" />
      <PackageReference Include="Axual.SchemaRegistry.Serdes.Avro" Version="1.0.2" />
</ItemGroup>

Building The Application

In order to create a very basic producer application, this is all you need.
Next step is to create the configuration for the AxualProducerBuilder.

Producer Configuration

var config = new AxualProducerConfig
{
    // This is the app ID as it is configured in the self service
    ApplicationId ="logeventproducer",
    // The endoint of the Discovery API (used to retrieve information about bootstrap servers, schema registry, TTL etc...)
    EndPoint = new UriBuilder("https", "192.168.99.100", 443).Uri,
    // The tenant you are part of
    Tenant = "demo",
    // The environment corresponding to the shortname of the env in the self service
    Environment = "example",

    // The ssl configuration for your application. The certificate used should have a DN
    // matching the DN that was configured in self service

    // Server verifies the identity of the client
    // (i.e. that the public key certificate provided by the client has been signed by a
    // CA trusted by the server).
    SslKeystorePassword = "notsecret",
    SslKeystoreLocation = SSL_KEYSTORE_LOCATION, (1)
    SecurityProtocol = SecurityProtocol.Ssl,
    EnableSslCertificateVerification = true,
    // Client verifies the identity of the broker
    SslCaLocation = SSL_CA_PATH, (2)
    SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None,
};
1 For SSL_KEYSTORE_LOCATION use the absolute path to the keystore file, see also Enabling security
2 For SSL_CA_PATH use the absolute path to the CA file, see also Enabling security

Creating The Producer

With the above configurations, we can instantiate the AxualProducerBuilder to build a AxualProducer instantiate.

var producer = new AxualProducerBuilder<Application, ApplicationLogEvent>(config)
    .SetKeySerializer(new SpecificAvroSerializer<Application>())
    .SetValueSerializer(new SpecificAvroSerializer<ApplicationLogEvent>())
    .SetLogHandler((_, l) => Console.WriteLine($"> [{l.Level}]: {l.Message}"))
    .SetErrorHandler((_, e) => Console.WriteLine($"> [Error]: {e.Reason}"))
    .Build())

Producing Messages

Now, we are ready to start sending records using the AxualProducer.

var application = new Application
{
    name = "Axual Proxy .NET Specific Avro Producer",
    version = "1.9.9",
    owner = "Team Log"
};


// Produce 10 messages
for (var i = 0; i < 10; i++)
{
    try
    {
        var applicationLogEvent = new ApplicationLogEvent
        {
            timestamp = 1000 + i,
            source = application,
            message = "Message " + i,
            context = new Dictionary<string, string> {{$"some key {i}", $"some value {i}"}},
            level = ApplicationLogLevel.INFO
        };

        var message = new Message<Application, ApplicationLogEvent>
        {
            Key = application,
            Value = applicationLogEvent
        };

        producer.Produce("applicationlogevents", message,
            r => Console.WriteLine(!r.Error.IsError
                ? $"> Produced message to stream {r.Topic} partition {r.Partition} offset {r.Offset}"
                : $"> Delivery Error: {r.Error.Reason}"));
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex);
    }
}
producer.Flush();
Console.WriteLine("> Finish");

When all of the above steps have been done correctly, start your producer app. Your app will produce messages to the console that will look like this:

--------------------------------------------------------------------------------------
'axual_client_proxy_specific_avro_producer' producing to stream 'applicationlogevents'
--------------------------------------------------------------------------------------
> Produced message to stream applicationlogevents partition [1] offset 41667
> Produced message to stream applicationlogevents partition [1] offset 41668
> Produced message to stream applicationlogevents partition [1] offset 41669
> Produced message to stream applicationlogevents partition [1] offset 41670
> Produced message to stream applicationlogevents partition [1] offset 41671
> Produced message to stream applicationlogevents partition [1] offset 41672
> Produced message to stream applicationlogevents partition [1] offset 41673
> Produced message to stream applicationlogevents partition [1] offset 41674
> Produced message to stream applicationlogevents partition [1] offset 41675
> Produced message to stream applicationlogevents partition [1] offset 41676
> Finish

Next Step: Consuming Data Using A .NET Consumer Application

In the next step you will create a .NET consumer application to use the data you just produced.

Creating A REST Producer

When you have completed this step, you will have produced data in AVRO format to the stream you have configured in step 2.

Prerequisites

  • Tool that is able to send and receive HTTP messages, such as curl or http

Producing data via REST

For the following steps, we’re going to use curl tool to produce data.

Get Schema Id

To produce the data, first we need a schemaId. Use below command to get the schemaId.

curl --request POST \
  --url "https://192.168.99.100:18100/schema/example/applicationlogevents" \
  --header "Content-Type: application/json" \
  --key ../client-cert/local-config/security/applications/example-producer/pem/example_producer.key \
  --cert ../client-cert/local-config/security/applications/example-producer/cer/example_producer.cer \
  --cacert ../client-cert/local-config/security/applications/common-truststore/cachain/tenant-root-ca.cert.pem \
  --data '
  {
    "keySchema": "{\"type\":\"record\",\"name\":\"Application\",\"namespace\":\"io.axual.client.example.schema\",\"doc\":\"Identification of an application\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"doc\":\"The name of the application\"},{\"name\":\"version\",\"type\":[\"null\",\"string\"],\"doc\":\"(Optional) The application version\",\"default\":null},{\"name\":\"owner\",\"type\":[\"null\",\"string\"],\"doc\":\"The owner of the application\",\"default\":null}]}",
    "valueSchema": "{\"type\":\"record\",\"name\":\"ApplicationLogEvent\",\"namespace\":\"io.axual.client.example.schema\",\"doc\":\"Generic application log event\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"Timestamp of the event\"},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Application\",\"doc\":\"Identification of an application\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"doc\":\"The name of the application\"},{\"name\":\"version\",\"type\":[\"null\",\"string\"],\"doc\":\"(Optional) The application version\",\"default\":null},{\"name\":\"owner\",\"type\":[\"null\",\"string\"],\"doc\":\"The owner of the application\",\"default\":null}]},\"doc\":\"The application that sent the event\"},{\"name\":\"context\",\"type\":{\"type\":\"map\",\"values\":\"string\"},\"doc\":\"The application context, contains application-specific key-value pairs\"},{\"name\":\"level\",\"type\":{\"type\":\"enum\",\"name\":\"ApplicationLogLevel\",\"doc\":\"The level of the log message\",\"symbols\":[\"DEBUG\",\"INFO\",\"WARN\",\"ERROR\",\"FATAL\"]},\"doc\":\"The log level, being either DEBUG, INFO, WARN or ERROR\"},{\"name\":\"message\",\"type\":\"string\",\"doc\":\"The log message\"}]}"
  }'
Check your care package for the key, cert and cacert parameter values, see also Step 3.

When you execute above command, as a response it will return keyId and valueId like this:

{
   "keyId":1,
   "valueId":2
}

For more detailed information to get schemaId, please refer Rest-Proxy Avro Schema Service

Producing Data

Use above keyId and valueId as keyMessage schemaId and valueMessage schemaId to produce a message:

curl --request POST \
  --url "https://192.168.99.100:18100/stream/example/applicationlogevents" \
  --header "axual-application-id: io.axual.example.client.avro.producer" \
  --header "axual-application-version: 1.0" \
  --header "axual-producer-uuid: log-producer1" \
  --header "Content-Type: application/json" \
  --key ../client-cert/local-config/security/applications/example-producer/pem/example_producer.key \
  --cert ../client-cert/local-config/security/applications/example-producer/cer/example_producer.cer \
  --cacert ../client-cert/local-config/security/applications/common-truststore/cachain/tenant-root-ca.cert.pem \
  --data '
  {
     "keyMessage":{
        "type":"AVRO",
        "schemaId": 1,
        "message":"{\"name\": \"logeventproducer\", \"version\": \"0.0.1\", \"owner\": \"none\"}"
     },
     "valueMessage":{
        "type":"AVRO",
        "schemaId": 2,
        "message":"{\"timestamp\": 1009, \"source\": {\"name\": \"logeventproducer\", \"version\": \"0.0.1\", \"owner\": \"Team Log\"}, \"context\": {\"Some key\": \"Some Value\"}, \"level\": \"INFO\", \"message\": \"Message 9\"}"
     }
  }'
Check your care package for the key, cert and cacert parameter values, see also Step 3.

When you execute above command, this will return response that will look like this:

{
   "cluster":"local",
   "offset":0,
   "timestamp":1585835138740,
   "stream":"applicationlogevents",
   "partition":5
}

For more detailed information, please refer Rest-Proxy Produce Service

Next Step: Consuming data via REST

In the next step you will consume data via the REST Proxy.