Step 3: Producing data
Prerequisites
A computer equipped with:
JDK 8 or above - https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
Maven 3.x or above - https://maven.apache.org/download.cgi
Your favorite IDE
Please see the instructions below on how to configure Maven on your machine. If you haven’t received your credentials yet, please request them through
Create A Producer Application
When you have completed this step, you will have set up a producer application that is producing some randomly generated data in String format to the stream you have configured in step 1.
Start by including the dependency on the Axual client library as you would do with any dependency.
<dependency>
<groupId>io.axual.client</groupId>
<artifactId>axual-client</artifactId>
<version>5.2.0</version>
</dependency>
In order to create a very basic producer application, this is all you need. Next step is to create the configuration for the AxualClient
and also for the Producer that will be used.
private static ClientConfig getClientConfig() {
return ClientConfig.newBuilder()
// This is the app ID as it is configured in the self service
.setApplicationId(APP_ID)
.setApplicationVersion(APPLICATION_VERSION)
// The endoint of the Discovery API (used to retrieve information about bootstrap servers, schema registry, TTL etc...)
.setEndpoint(ENDPOINT)
// The environment corresponding to the shortname of the env in the self service
.setEnvironment(ENV)
// The tenant you are part of
.setTenant(TENANT)
// The ssl configuration for your application. The certificate used should have a DN
// matching the DN that was configured in self service
.setSslConfig(SslConfig.newBuilder()
// Disable CN and subject-alternative-name verification for the broker.
// Generally not necessary in production, just in test and local deployments.
.setEnableHostnameVerification(false)
.setKeyPassword(new PasswordConfig(KEY_PASSWORD))
.setKeystorePassword(new PasswordConfig(KEYSTORE_PASSWORD))
.setTruststorePassword(new PasswordConfig(TRUSTSTORE_PASSWORD))
.setKeystoreLocation(KEYSTORE_LOCATION)
.setTruststoreLocation(TRUSTSTORE_LOCATION)
.build())
.build();
}
private static ProducerConfig<String, String> getBaseProducerConfig() {
return ProducerConfig.<String, String>builder()
// What are the types of the key and values, what serializers do we use.
// In this example we are producing key and values of type String, as such
// we are using StringSerializers
.setKeySerializerClassName(StringSerializer.class.getCanonicalName())
.setValueSerializerClassName(StringSerializer.class.getCanonicalName())
// The strategy you want your producer to apply while sending records.
.setProducerStrategy(ProducerStrategy.AT_LEAST_ONCE_KEEPING_ORDER)
.build();
}
With these configurations, we can instantiate an AxualClient
and a Producer
and use those to start sending records.
public static void main(String[] args) {
// Both the AxualClient and the Producer are AutoCloseable, so can be used in a try-with-resources block.
try (AxualClient axualClient = new AxualClient(getClientConfig());
Producer<String, String> producer = axualClient.buildProducer(getBaseProducerConfig())) {
// Create a very basic producer message. The Stream is also given in the producer message
// and corresponds to the same stream in the self service.
ProducerMessage<String, String> producerMessage = ProducerMessage.<String, String>newBuilder()
.setKey("this is a key of my message")
.setValue("this is the value value of my message")
.setStream(STREAM)
.build();
// And produce the message.
// Here we also call .get(), to make the call synchronous // and wait for the produce to finish before the try-with-resources block // closes the producer and the axualClient. try { producer.produce(producerMessage).get(); } catch (InterruptedException|ExecutionException e) { log.error(e); } } }
The produce method returns a Future
with which you can do all your usual Future operations that you desire. Optionally a ProduceCallback
may be passed to the produce method as well, this will act on errors or on a successful produce. Here is a very simple ProduceCallback
example:
private static final ProduceCallback<String, String> produceCallback = new ProduceCallback<>() {
@Override
public void onComplete(ProducedMessage<String, String> producedMessage) {
log.info("Produced a message on stream {} and partition {} with offset {}",
producedMessage.getStream(), producedMessage.getPartition(), producedMessage.getOffset());
}
@Override
public void onError(ProducerMessage<String, String> producerMessage, ExecutionException e) {
log.warn("Something went wrong while trying to produce the message");
// Further handling of the error
}
};
which can be used in the produce method as follows:
producer.produce(producerMessage, produceCallback);
When all of the above steps have been done correctly, start your producer app. With the logging level for the io.axual
package to INFO
, your app will produce logging that will look like this:
INFO ProducerWorkerManager:60 - Claiming worker Producer : io.axual.client.producer.generic.GenericProducer@3bbc39f8 Sink : io.axual.client.config.ProducerConfig@4ae3c1cd ProducerStrategy : AT_LEAST_ONCE_KEEPING_ORDER MessageBufferSize : 1000 messageBufferWaitTimeout : 0 blockedMessageInsert : true INFO ProducerWorker:43 - Creating new ProducerWorker INFO ProducerWorker:115 - Starting ProducerWorker thread ProducerWorker0 INFO ProducerWorker:105 - Creating a new Axual producer with properties: {...} INFO GenericProducer:39 - Created producer of type AT_LEAST_ONCE_KEEPING_ORDER. INFO BaseConfig:16 - Parsing io.axual.client.proxy.axual.producer.AxualProducerConfig configuration INFO BaseConfig:19 - Parsed io.axual.client.proxy.axual.producer.AxualProducerConfig configuration INFO ReloadingPropertiesProvider:63 - TTL updated to: 600000 (was: 0) INFO DiscoverySubscriber:62 - Received new DiscoveryResult: DiscoveryResult {...} INFO BaseConfig:16 - Parsing io.axual.client.proxy.switching.producer.SwitchingProducerConfig configuration INFO BaseConfig:19 - Parsed io.axual.client.proxy.switching.producer.SwitchingProducerConfig configuration INFO BaseProxySwitcher:48 - Creating new backing producer with Discovery API result: DiscoveryResult {...} INFO ProducerSwitcher:36 - Creating a new producer with properties: {...} INFO BaseConfig:16 - Parsing io.axual.client.proxy.multitenant.producer.MultitenantProducerConfig configuration INFO BaseConfig:19 - Parsed io.axual.client.proxy.multitenant.producer.MultitenantProducerConfig configuration INFO BaseConfig:16 - Parsing io.axual.client.proxy.header.producer.HeaderProducerConfig configuration INFO BaseConfig:19 - Parsed io.axual.client.proxy.header.producer.HeaderProducerConfig configuration INFO ValueHeaderBlobSerializerConfig:279 - ValueHeaderBlobSerializerConfig values: enable.value.header = false INFO ValueHeaderBlobSerializerConfig:279 - ValueHeaderBlobSerializerConfig values: enable.value.header = false INFO BaseProxySwitcher:50 - Created new backing producer INFO ProducerWorker:107 - Created a new Axual producer INFO AxualTestProducer:26 - Produced a message on stream general-test and partition 1 with offset 8 INFO ProducerWorkerManager:94 - Releasing producer io.axual.client.producer.generic.GenericProducer@3bbc39f8 INFO ProducerWorker:143 - ProducerWorker cancel requested INFO Register:83 - Cleaning up registry INFO ProducerWorker:129 - ProducerWorker cancelled, flushing 0 messages in buffer INFO ProducerWorkerManager:94 - Releasing producer io.axual.client.producer.generic.GenericProducer@3bbc39f8 INFO ProducerWorkerManager:117 - Shutting down ProducerWorkerManager, cancelling all workers INFO ProducerWorker:139 - Exiting ProducerWorker thread ProducerWorker0
The logging generated by the AxualTestProducer
class comes from the newly created class using the client library. The other logging is all generated by the Axual Client Library itself.
This is all the coding required to make a successful produce happen. You are now ready to move on to the next step 4. Consuming Data to start consuming data.
Setting up your Maven repository and credentials
Please make sure to include the following repository in your pom.xml
, and make sure you have configured the right credentials for that repository as indicated below.
<repositories>
<repository>
<id>axual-artifacts</id>
<name>Axual Artifacts</name>
<url>https://dev.axual.io/nexus/repository/axual-artifacts/</url>
</repository>
</repositories>
<server>
<id>axual-artifacts</id>
<username>YOUR_USERNAME</username>
<password>YOUR_PASSWORD</password>
</server>