Producing Using Java Axual Client
Creating A Producer Application
When you have completed this step, you will have set up a producer application that is producing some randomly generated data in Avro format to the stream you have configured in Creating topics.
| Axual client doesn’t support SASLauthentication. If you want to useSASL, please refer Kafka client producer using SASL | 
Before continuing, please check you fulfill all the Prerequisites.
Building The Application
Start by including the dependency on the Axual client library as you would do with any dependency.
| For full executable Axual client Avro producer example, please refer examples repository. | 
<dependency>
    <groupId>io.axual.client</groupId>
    <artifactId>axual-client</artifactId>
    <version>5.8.1</version>
</dependency>In order to create a very basic producer application, this is all you need. Next step is to create the configuration for the AxualClient and also for the Producer that will be used.
SslConfig sslConfig = SslConfig.newBuilder()
	.setEnableHostnameVerification(false)
	.setKeystoreLocation("/path/to/example-producer.client.jks") // Absolute path to the application keystore
	.setKeystorePassword(new PasswordConfig("notsecret"))
	.setKeyPassword(new PasswordConfig("notsecret"))
	.setTruststoreLocation("/path/to/common-truststore.jks") // Absolute path to the application truststore
	.setTruststorePassword(new PasswordConfig("notsecret"))
	.build();
ClientConfig config = ClientConfig.newBuilder()
        .setApplicationId("io.axual.example.client.avro.producer")
        .setApplicationVersion("0.0.1")
        .setEndpoint("https://192.168.99.100:443")
        .setTenant("demo")
        .setEnvironment("example")
        .setSslConfig(sslConfig)
        .build();| Check your essentials-package for the truststore and keystore files, see also Security | 
The next step is creating a ProducerConfig.
SpecificAvroProducerConfig<Application, ApplicationLogEvent> specificAvroProducerConfig =
    SpecificAvroProducerConfig.<Application, ApplicationLogEvent>builder()
        // The strategy you want your producer to apply while sending records.
        .setDeliveryStrategy(DeliveryStrategy.AT_LEAST_ONCE)
        .setOrderingStrategy(OrderingStrategy.KEEPING_ORDER)
        .setMessageBufferWaitTimeout(100)
        .setBatchSize(1)
        .setProxyChain(ProxyChain.newBuilder()
            .append(SWITCHING_PROXY_ID)
            .append(RESOLVING_PROXY_ID)
            .append(LINEAGE_PROXY_ID)
            .append(HEADER_PROXY_ID)
            .build())
        .build();With these configurations, we can instantiate an AxualClient and a Producer and use those to start sending records.
/* Both the AxualClient and the Producer are AutoCloseable, so can be used in a try-with-resources block.*/
try (final AxualClient axualClient = new AxualClient(config);
     final LogEventSpecificProducer producer = new LogEventSpecificProducer(axualClient, specificAvroProducerConfig, "logeventproducer", "0.0.1", "Team Log")) {
    // The produce method returns a `Future` with which you can do all your usual Future operations that you desire. This will act on errors or on a successful produce.
    futures = IntStream.range(0, 1)
            .mapToObj(i -> producer.produce("app_" + i, 1000 + i, INFO, String.format("Message %d", i), Collections.singletonMap("Some key", "Some Value")))
            .collect(Collectors.toList());
    do {
        futures.removeIf(future -> {
            if (!future.isDone()) {
                return false;
            }
            try {
                ProducedMessage<Application, ApplicationLogEvent> producedMessage = future.get();
                LOG.info("Produced message to topic {} partition {} offset {}", producedMessage.getStream(), producedMessage.getPartition(), producedMessage.getOffset());
            } catch (InterruptedException | ExecutionException e) {
                LOG.error("Error getting future, produce failed", e);
            }
            return true;
        });
        sleep(100);
    } while (!futures.isEmpty());
}LogEventSpecificProducer used in above code to produce messages:
public class LogEventSpecificProducer implements AutoCloseable {
    private final Producer<Application, ApplicationLogEvent> producer;
    private final Application application;
    public LogEventSpecificProducer(
            final AxualClient axualClient
            , final SpecificAvroProducerConfig<Application, ApplicationLogEvent> producerConfig
            , final String applicationName
            , final String applicationVersion
            , final String applicationOwner) {
        producer = axualClient.buildProducer(producerConfig);
        this.application = Application.newBuilder()
                .setName(applicationName)
                .setVersion(applicationVersion)
                .setOwner(applicationOwner)
                .build();
    }
    public Future<ProducedMessage<Application, ApplicationLogEvent>> produce(
            final String applicationName,
            final long timestamp,
            final ApplicationLogLevel logLevel,
            final CharSequence logMessage,
            final Map<CharSequence, CharSequence> context) {
        Application key = Application.newBuilder()
                .setName(applicationName)
                .setVersion("1.9.9")
                .setOwner("none")
                .build();
        ApplicationLogEvent value = ApplicationLogEvent.newBuilder()
                .setTimestamp(timestamp)
                .setSource(this.application)
                .setLevel(logLevel)
                .setMessage(logMessage)
                .setContext(context)
                .build();
        ProducerMessage<Application, ApplicationLogEvent> message = ProducerMessage.<Application, ApplicationLogEvent>newBuilder()
                .setStream("applicationlogevents")
                .setKey(key)
                .setValue(value)
                .build();
        Future<ProducedMessage<Application, ApplicationLogEvent>> result = producer.produce(message);
        return result;
    }
    @Override
    public void close() {
        producer.close();
    }
}- When all of the above steps have been done correctly, start your producer app. With the logging level for the io.axualpackage toINFO, Your app should log
[main] INFO io.axual.client.example.axualclient.avro.ClientAvroProducerApp - Creating producerConfig
[main] INFO io.axual.client.example.axualclient.avro.ClientAvroProducerApp - Creating client and LogSpecificEventProducer
[main] INFO io.axual.client.producer.generic.ProducerWorkerManager - Claiming worker	Producer                 : io.axual.client.producer.avro.GenericAvroProducer@cac736f	Sink                     : io.axual.client.config.SpecificAvroProducerConfig@5e265ba4	DeliveryStrategy         : AT_LEAST_ONCE	OrderingStrategy         : KEEPING_ORDER	MessageBufferSize        : 1000	messageBufferWaitTimeout : 100	blockedMessageInsert     : true
[main] INFO io.axual.client.producer.generic.ProducerWorker - Creating new ProducerWorker
[main] INFO io.axual.client.producer.generic.ProducerWorker - Creating a new Axual producer with properties: {...}
[main] INFO io.axual.discovery.client.fetcher.DiscoveryLoader - TTL updated to: 5000 (was: 0)
[main] INFO io.axual.discovery.client.fetcher.DiscoveryLoader - Fetched discovery properties: DiscoveryResult: {...}
[main] INFO io.axual.client.proxy.switching.discovery.DiscoverySubscriber - Received new DiscoveryResult for SwitchingProducer: DiscoveryResult: {...}
[main] INFO io.axual.client.proxy.switching.discovery.DiscoverySubscriber - Switching SwitchingProducer from null to clusterA
[main] INFO io.axual.client.proxy.switching.generic.BaseClientProxySwitcher - Creating new backing producer with Discovery API result: DiscoveryResult: {...}
[main] INFO io.axual.client.proxy.switching.producer.ProducerSwitcher - Creating a new producer with properties: {...}
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: ...
[main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ...
[main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ...
[main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ...
[main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ...
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'acl.principal.builder' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'cluster' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'instance' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'topic.pattern' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'schema.registry.url' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'endpoint' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'tenant' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'group.id.pattern' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'app.id' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'ttl' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'distributor.distance' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'enable.value.headers' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'app.version' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'rest.proxy.url' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'environment' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'distributor.timeout' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'system' was supplied but isn't a known config.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'group.id.resolver' was supplied but isn't a known config.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.3.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: fc1aaa116b661c8a
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1585830850920
[main] INFO io.axual.client.proxy.switching.generic.BaseClientProxySwitcher - Created new backing producer
[main] INFO io.axual.client.producer.generic.ProducerWorker - Created a new Axual producer
[main] INFO io.axual.client.producer.generic.GenericProducer - Created producer of type AT_LEAST_ONCE/KEEPING_ORDER.
[ProducerWorker0] INFO io.axual.client.producer.generic.ProducerWorker - Starting ProducerWorker thread ProducerWorker0
[kafka-producer-network-thread | producer-logeventproducer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-logeventproducer-1] Cluster ID: JICqHTqLTuaJ9-TObXa4sQ
[main] INFO io.axual.client.example.axualclient.avro.ClientAvroProducerApp - Produced message to topic applicationlogevents partition 3 offset 7
[main] INFO io.axual.client.producer.generic.ProducerWorkerManager - Releasing producer io.axual.client.producer.avro.GenericAvroProducer@cac736f
[main] INFO io.axual.client.producer.generic.ProducerWorker - ProducerWorker cancel requested
[ProducerWorker0] INFO io.axual.client.producer.generic.ProducerWorker - ProducerWorker cancelled, flushing 0 messages in buffer
[ProducerWorker0] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-logeventproducer-1] Closing the Kafka producer with timeoutMillis = 86399913600000 ms.
[ProducerWorker0] INFO io.axual.client.producer.generic.ProducerWorker - Exiting ProducerWorker thread ProducerWorker0
[main] INFO io.axual.client.producer.generic.ProducerWorkerManager - Thread join took PT0.012S
[main] INFO io.axual.client.AxualClient - Closing AxualClient logeventproducer
[main] INFO io.axual.client.producer.generic.ProducerWorkerManager - Shutting down ProducerWorkerManager, cancelling all workers
[main] INFO io.axual.client.example.axualclient.avro.ClientAvroProducerApp - Done!
[Janitor shutdown thread] INFO io.axual.client.janitor.Janitor - Janitor shutdown activated, cleaning up
This is all the coding required to make a successful produce happen.
Next Step: Consuming Data Using A Java Consumer Application
In the next step you will create a Java consumer application to use the data you just produced.
Proceed to Consuming Data (Java Axual Client)