Step 3: Producing data

Prerequisites

A computer equipped with:

JDK 8 or above - https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
Maven 3.x or above - https://maven.apache.org/download.cgi
Your favorite IDE

Please see the instructions below on how to configure Maven on your machine. If you haven’t received your credentials yet, please request them through

Create A Producer Application

When you have completed this step, you will have set up a producer application that is producing some randomly generated data in String format to the stream you have configured in step 1.

Start by including the dependency on the Axual client library as you would do with any dependency.

<dependency>
    <groupId>io.axual.client</groupId>
    <artifactId>axual-client</artifactId>
    <version>5.2.0</version>
</dependency>

In order to create a very basic producer application, this is all you need. Next step is to create the configuration for the AxualClient and also for the Producer that will be used.

private static ClientConfig getClientConfig() {
    return ClientConfig.newBuilder()
       	    // This is the app ID as it is configured in the self service
            .setApplicationId(APP_ID)
            .setApplicationVersion(APPLICATION_VERSION)
            // The endoint of the Discovery API (used to retrieve information about bootstrap servers, schema registry, TTL etc...)
            .setEndpoint(ENDPOINT)
            // The environment corresponding to the shortname of the env in the self service
            .setEnvironment(ENV)
            // The tenant you are part of
            .setTenant(TENANT)
            // The ssl configuration for your application. The certificate used should have a DN
            // matching the DN that was configured in self service
            .setSslConfig(SslConfig.newBuilder()
                     // Disable CN and subject-alternative-name verification for the broker.
                     // Generally not necessary in production, just in test and local deployments.
                    .setEnableHostnameVerification(false)
                    .setKeyPassword(new PasswordConfig(KEY_PASSWORD))
                    .setKeystorePassword(new PasswordConfig(KEYSTORE_PASSWORD))
                    .setTruststorePassword(new PasswordConfig(TRUSTSTORE_PASSWORD))
                    .setKeystoreLocation(KEYSTORE_LOCATION)
                    .setTruststoreLocation(TRUSTSTORE_LOCATION)
                    .build())
            .build();
}
private static ProducerConfig<String, String> getBaseProducerConfig() {
    return ProducerConfig.<String, String>builder()
            // What are the types of the key and values, what serializers do we use.
            // In this example we are producing key and values of type String, as such
            // we are using StringSerializers
            .setKeySerializerClassName(StringSerializer.class.getCanonicalName())
            .setValueSerializerClassName(StringSerializer.class.getCanonicalName())
            // The strategy you want your producer to apply while sending records.
            .setProducerStrategy(ProducerStrategy.AT_LEAST_ONCE_KEEPING_ORDER)
            .build();
}

With these configurations, we can instantiate an AxualClient and a Producer and use those to start sending records.

public static void main(String[] args) {
    // Both the AxualClient and the Producer are AutoCloseable, so can be used in a try-with-resources block.
    try (AxualClient axualClient = new AxualClient(getClientConfig());
         Producer<String, String> producer = axualClient.buildProducer(getBaseProducerConfig())) {
        // Create a very basic producer message. The Stream is also given in the producer message
        // and corresponds to the same stream in the self service.
        ProducerMessage<String, String> producerMessage = ProducerMessage.<String, String>newBuilder()
                .setKey("this is a key of my message")
                .setValue("this is the value value of my message")
                .setStream(STREAM)
                .build();
        // And produce the message.
        // Here we also call .get(), to make the call synchronous
        // and wait for the produce to finish before the try-with-resources block
        // closes the producer and the axualClient.
        try {
            producer.produce(producerMessage).get();
        } catch (InterruptedException|ExecutionException e) {
             log.error(e);
        }
    }
}

The produce method returns a Future with which you can do all your usual Future operations that you desire. Optionally a ProduceCallback may be passed to the produce method as well, this will act on errors or on a successful produce. Here is a very simple ProduceCallback example:

private static final ProduceCallback<String, String> produceCallback = new ProduceCallback<>() {
    @Override
    public void onComplete(ProducedMessage<String, String> producedMessage) {
        log.info("Produced a message on stream {} and partition {} with offset {}",
                producedMessage.getStream(), producedMessage.getPartition(), producedMessage.getOffset());

    }
    @Override
    public void onError(ProducerMessage<String, String> producerMessage, ExecutionException e) {
        log.warn("Something went wrong while trying to produce the message");
        // Further handling of the error
    }
};

which can be used in the produce method as follows:

producer.produce(producerMessage, produceCallback);

When all of the above steps have been done correctly, start your producer app. With the logging level for the io.axual package to INFO, your app will produce logging that will look like this:

INFO  ProducerWorkerManager:60 - Claiming worker	Producer                 : io.axual.client.producer.generic.GenericProducer@3bbc39f8	Sink                     : io.axual.client.config.ProducerConfig@4ae3c1cd	ProducerStrategy         : AT_LEAST_ONCE_KEEPING_ORDER	MessageBufferSize        : 1000	messageBufferWaitTimeout : 0	blockedMessageInsert     : true
INFO  ProducerWorker:43 - Creating new ProducerWorker
INFO  ProducerWorker:115 - Starting ProducerWorker thread ProducerWorker0
INFO  ProducerWorker:105 - Creating a new Axual producer with properties: {...}
INFO  GenericProducer:39 - Created producer of type AT_LEAST_ONCE_KEEPING_ORDER.
INFO  BaseConfig:16 - Parsing io.axual.client.proxy.axual.producer.AxualProducerConfig configuration
INFO  BaseConfig:19 - Parsed io.axual.client.proxy.axual.producer.AxualProducerConfig configuration
INFO  ReloadingPropertiesProvider:63 - TTL updated to: 600000 (was: 0)
INFO  DiscoverySubscriber:62 - Received new DiscoveryResult: DiscoveryResult {...}
INFO  BaseConfig:16 - Parsing io.axual.client.proxy.switching.producer.SwitchingProducerConfig configuration
INFO  BaseConfig:19 - Parsed io.axual.client.proxy.switching.producer.SwitchingProducerConfig configuration
INFO  BaseProxySwitcher:48 - Creating new backing producer with Discovery API result: DiscoveryResult {...}
INFO  ProducerSwitcher:36 - Creating a new producer with properties: {...}
INFO  BaseConfig:16 - Parsing io.axual.client.proxy.multitenant.producer.MultitenantProducerConfig configuration
INFO  BaseConfig:19 - Parsed io.axual.client.proxy.multitenant.producer.MultitenantProducerConfig configuration
INFO  BaseConfig:16 - Parsing io.axual.client.proxy.header.producer.HeaderProducerConfig configuration
INFO  BaseConfig:19 - Parsed io.axual.client.proxy.header.producer.HeaderProducerConfig configuration
INFO  ValueHeaderBlobSerializerConfig:279 - ValueHeaderBlobSerializerConfig values:
	enable.value.header = false

INFO  ValueHeaderBlobSerializerConfig:279 - ValueHeaderBlobSerializerConfig values:
	enable.value.header = false

INFO  BaseProxySwitcher:50 - Created new backing producer
INFO  ProducerWorker:107 - Created a new Axual producer
INFO  AxualTestProducer:26 - Produced a message on stream general-test and partition 1 with offset 8
INFO  ProducerWorkerManager:94 - Releasing producer io.axual.client.producer.generic.GenericProducer@3bbc39f8
INFO  ProducerWorker:143 - ProducerWorker cancel requested
INFO  Register:83 - Cleaning up registry
INFO  ProducerWorker:129 - ProducerWorker cancelled, flushing 0 messages in buffer
INFO  ProducerWorkerManager:94 - Releasing producer io.axual.client.producer.generic.GenericProducer@3bbc39f8
INFO  ProducerWorkerManager:117 - Shutting down ProducerWorkerManager, cancelling all workers
INFO  ProducerWorker:139 - Exiting ProducerWorker thread ProducerWorker0

The logging generated by the AxualTestProducer class comes from the newly created class using the client library. The other logging is all generated by the Axual Client Library itself.

This is all the coding required to make a successful produce happen. You are now ready to move on to the next step 4. Consuming Data to start consuming data.

Setting up your Maven repository and credentials

Please make sure to include the following repository in your pom.xml, and make sure you have configured the right credentials for that repository as indicated below.

<repositories>
    <repository>
        <id>axual-artifacts</id>
        <name>Axual Artifacts</name>
        <url>https://dev.axual.io/nexus/repository/axual-artifacts/</url>
    </repository>
</repositories>
<server>
     <id>axual-artifacts</id>
     <username>YOUR_USERNAME</username>
     <password>YOUR_PASSWORD</password>
</server>