Producing Using Java Axual Client
Creating A Producer Application
When you have completed this step, you will have set up a producer application that is producing some randomly generated data in Avro format to the stream you have configured in Creating streams.
Axual client doesn’t support SASL authentication. If you want to use SASL , please refer Kafka client producer using SASL
|
Before continuing, please check you fulfill all the Prerequisites.
Building The Application
Start by including the dependency on the Axual client library as you would do with any dependency.
For full executable Axual client Avro producer example, please refer examples repository. |
<dependency>
<groupId>io.axual.client</groupId>
<artifactId>axual-client</artifactId>
<version>5.8.1</version>
</dependency>
In order to create a very basic producer application, this is all you need. Next step is to create the configuration for the AxualClient
and also for the Producer that will be used.
SslConfig sslConfig = SslConfig.newBuilder()
.setEnableHostnameVerification(false)
.setKeystoreLocation("/path/to/example-producer.client.jks") // Absolute path to the application keystore
.setKeystorePassword(new PasswordConfig("notsecret"))
.setKeyPassword(new PasswordConfig("notsecret"))
.setTruststoreLocation("/path/to/common-truststore.jks") // Absolute path to the application truststore
.setTruststorePassword(new PasswordConfig("notsecret"))
.build();
ClientConfig config = ClientConfig.newBuilder()
.setApplicationId("io.axual.example.client.avro.producer")
.setApplicationVersion("0.0.1")
.setEndpoint("https://192.168.99.100:443")
.setTenant("demo")
.setEnvironment("example")
.setSslConfig(sslConfig)
.build();
Check your care package for the truststore and keystore files, see also Security |
The next step is creating a ProducerConfig.
SpecificAvroProducerConfig<Application, ApplicationLogEvent> specificAvroProducerConfig =
SpecificAvroProducerConfig.<Application, ApplicationLogEvent>builder()
// The strategy you want your producer to apply while sending records.
.setDeliveryStrategy(DeliveryStrategy.AT_LEAST_ONCE)
.setOrderingStrategy(OrderingStrategy.KEEPING_ORDER)
.setMessageBufferWaitTimeout(100)
.setBatchSize(1)
.setProxyChain(ProxyChain.newBuilder()
.append(SWITCHING_PROXY_ID)
.append(RESOLVING_PROXY_ID)
.append(LINEAGE_PROXY_ID)
.append(HEADER_PROXY_ID)
.build())
.build();
With these configurations, we can instantiate an AxualClient
and a Producer
and use those to start sending records.
/* Both the AxualClient and the Producer are AutoCloseable, so can be used in a try-with-resources block.*/
try (final AxualClient axualClient = new AxualClient(config);
final LogEventSpecificProducer producer = new LogEventSpecificProducer(axualClient, specificAvroProducerConfig, "logeventproducer", "0.0.1", "Team Log")) {
// The produce method returns a `Future` with which you can do all your usual Future operations that you desire. This will act on errors or on a successful produce.
futures = IntStream.range(0, 1)
.mapToObj(i -> producer.produce("app_" + i, 1000 + i, INFO, String.format("Message %d", i), Collections.singletonMap("Some key", "Some Value")))
.collect(Collectors.toList());
do {
futures.removeIf(future -> {
if (!future.isDone()) {
return false;
}
try {
ProducedMessage<Application, ApplicationLogEvent> producedMessage = future.get();
LOG.info("Produced message to topic {} partition {} offset {}", producedMessage.getStream(), producedMessage.getPartition(), producedMessage.getOffset());
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error getting future, produce failed", e);
}
return true;
});
sleep(100);
} while (!futures.isEmpty());
}
LogEventSpecificProducer used in above code to produce messages:
public class LogEventSpecificProducer implements AutoCloseable {
private final Producer<Application, ApplicationLogEvent> producer;
private final Application application;
public LogEventSpecificProducer(
final AxualClient axualClient
, final SpecificAvroProducerConfig<Application, ApplicationLogEvent> producerConfig
, final String applicationName
, final String applicationVersion
, final String applicationOwner) {
producer = axualClient.buildProducer(producerConfig);
this.application = Application.newBuilder()
.setName(applicationName)
.setVersion(applicationVersion)
.setOwner(applicationOwner)
.build();
}
public Future<ProducedMessage<Application, ApplicationLogEvent>> produce(
final String applicationName,
final long timestamp,
final ApplicationLogLevel logLevel,
final CharSequence logMessage,
final Map<CharSequence, CharSequence> context) {
Application key = Application.newBuilder()
.setName(applicationName)
.setVersion("1.9.9")
.setOwner("none")
.build();
ApplicationLogEvent value = ApplicationLogEvent.newBuilder()
.setTimestamp(timestamp)
.setSource(this.application)
.setLevel(logLevel)
.setMessage(logMessage)
.setContext(context)
.build();
ProducerMessage<Application, ApplicationLogEvent> message = ProducerMessage.<Application, ApplicationLogEvent>newBuilder()
.setStream("applicationlogevents")
.setKey(key)
.setValue(value)
.build();
Future<ProducedMessage<Application, ApplicationLogEvent>> result = producer.produce(message);
return result;
}
@Override
public void close() {
producer.close();
}
}
- When all of the above steps have been done correctly, start your producer app. With the logging level for the
io.axual
package toINFO
, Your app should log
[main] INFO io.axual.client.example.axualclient.avro.ClientAvroProducerApp - Creating producerConfig [main] INFO io.axual.client.example.axualclient.avro.ClientAvroProducerApp - Creating client and LogSpecificEventProducer [main] INFO io.axual.client.producer.generic.ProducerWorkerManager - Claiming worker Producer : io.axual.client.producer.avro.GenericAvroProducer@cac736f Sink : io.axual.client.config.SpecificAvroProducerConfig@5e265ba4 DeliveryStrategy : AT_LEAST_ONCE OrderingStrategy : KEEPING_ORDER MessageBufferSize : 1000 messageBufferWaitTimeout : 100 blockedMessageInsert : true [main] INFO io.axual.client.producer.generic.ProducerWorker - Creating new ProducerWorker [main] INFO io.axual.client.producer.generic.ProducerWorker - Creating a new Axual producer with properties: {...} [main] INFO io.axual.discovery.client.fetcher.DiscoveryLoader - TTL updated to: 5000 (was: 0) [main] INFO io.axual.discovery.client.fetcher.DiscoveryLoader - Fetched discovery properties: DiscoveryResult: {...} [main] INFO io.axual.client.proxy.switching.discovery.DiscoverySubscriber - Received new DiscoveryResult for SwitchingProducer: DiscoveryResult: {...} [main] INFO io.axual.client.proxy.switching.discovery.DiscoverySubscriber - Switching SwitchingProducer from null to clusterA [main] INFO io.axual.client.proxy.switching.generic.BaseClientProxySwitcher - Creating new backing producer with Discovery API result: DiscoveryResult: {...} [main] INFO io.axual.client.proxy.switching.producer.ProducerSwitcher - Creating a new producer with properties: {...} [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: ... [main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ... [main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ... [main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ... [main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: ... [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'acl.principal.builder' was supplied but isn't a known config. [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'cluster' was supplied but isn't a known config. [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'instance' was supplied but isn't a known config. [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'topic.pattern' was supplied but isn't a known config. [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'schema.registry.url' was supplied but isn't a known config. [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'endpoint' was supplied but isn't a known config. [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'tenant' was supplied but isn't a known config. [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'group.id.pattern' was supplied but isn't a known config. [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'app.id' was supplied but isn't a known config. [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'ttl' was supplied but isn't a known config. [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'distributor.distance' was supplied but isn't a known config. [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'enable.value.headers' was supplied but isn't a known config. [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'app.version' was supplied but isn't a known config. [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'rest.proxy.url' was supplied but isn't a known config. [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'environment' was supplied but isn't a known config. [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'distributor.timeout' was supplied but isn't a known config. [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'system' was supplied but isn't a known config. [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'group.id.resolver' was supplied but isn't a known config. [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.3.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: fc1aaa116b661c8a [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1585830850920 [main] INFO io.axual.client.proxy.switching.generic.BaseClientProxySwitcher - Created new backing producer [main] INFO io.axual.client.producer.generic.ProducerWorker - Created a new Axual producer [main] INFO io.axual.client.producer.generic.GenericProducer - Created producer of type AT_LEAST_ONCE/KEEPING_ORDER. [ProducerWorker0] INFO io.axual.client.producer.generic.ProducerWorker - Starting ProducerWorker thread ProducerWorker0 [kafka-producer-network-thread | producer-logeventproducer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-logeventproducer-1] Cluster ID: JICqHTqLTuaJ9-TObXa4sQ [main] INFO io.axual.client.example.axualclient.avro.ClientAvroProducerApp - Produced message to topic applicationlogevents partition 3 offset 7 [main] INFO io.axual.client.producer.generic.ProducerWorkerManager - Releasing producer io.axual.client.producer.avro.GenericAvroProducer@cac736f [main] INFO io.axual.client.producer.generic.ProducerWorker - ProducerWorker cancel requested [ProducerWorker0] INFO io.axual.client.producer.generic.ProducerWorker - ProducerWorker cancelled, flushing 0 messages in buffer [ProducerWorker0] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-logeventproducer-1] Closing the Kafka producer with timeoutMillis = 86399913600000 ms. [ProducerWorker0] INFO io.axual.client.producer.generic.ProducerWorker - Exiting ProducerWorker thread ProducerWorker0 [main] INFO io.axual.client.producer.generic.ProducerWorkerManager - Thread join took PT0.012S [main] INFO io.axual.client.AxualClient - Closing AxualClient logeventproducer [main] INFO io.axual.client.producer.generic.ProducerWorkerManager - Shutting down ProducerWorkerManager, cancelling all workers [main] INFO io.axual.client.example.axualclient.avro.ClientAvroProducerApp - Done! [Janitor shutdown thread] INFO io.axual.client.janitor.Janitor - Janitor shutdown activated, cleaning up
This is all the coding required to make a successful produce happen.
Next Step: Consuming Data Using A Java Consumer Application
In the next step you will create a Java consumer application to use the data you just produced.
Proceed to Consuming Data (Java Axual Client)