Producing Using Java Spring Kafka

Creating A Producer Application

This shows typical uses of Axual Platform combined with Spring Boot.

Before continuing, please check you fulfill all the prerequisites .

Create Spring Boot project using your IDE or Spring Initializr.

Building The Application

To enable Axual client functionality within a Spring Boot application, include the following dependencies in pom.xml.

For full executable Spring kafka Avro producer example, please refer examples repository.
<!-- Axual client proxy-->
<dependency>
    <groupId>io.axual.client</groupId>
    <artifactId>axual-client-proxy</artifactId>
    <version>5.8.1</version>
</dependency>
<!-- Enables SpringBoot Kafka functionality -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<!-- SpringBoot starter -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Optionally when using Avro -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.9.1</version>
</dependency>

In order to create a producer application, you need to provide the configuration for the producer in resources/application.yaml file.

axual:
  endpoint: "http://127.0.0.1:8081"
  tenant: "axual"
  environment: "local"
  stream: "avro-applicationlog"

spring:
  kafka:
    producer:
      client-id: io.axual.example.client.avro.producer
      acks: -1
      batch-size: 16834
      buffer-memory: 33554432
      properties:
        linger.ms: 100
      ssl:
        # Using JKS
        key-store-location: client-certs/axual.client.keystore.jks
        key-store-password: notsecret
        key-password: notsecret
        trust-store-location: client-certs/axual.client.truststore.jks
        trust-store-password: notsecret
Check your care package for the truststore and keystore files, see also Step 3

Create AxualKafkaProducerFactory<K, V> bean class by extending DefaultKafkaProducerFactory<K, V>.

public class AxualKafkaProducerFactory<K, V> extends DefaultKafkaProducerFactory<K, V> {

    public AxualKafkaProducerFactory(Map<String, Object> configs) {
        super(configs);
    }

    @Override
    protected Producer<K, V> createKafkaProducer() {
        return new AxualProducer<>(getConfigurationProperties());
    }
}

Define producer beans configuration.

@Configuration
public class SpringAvroProducer {

    @Value("${axual.endpoint}")
    private String endpoint;

    @Value("${axual.tenant}")
    private String tenant;

    @Value("${axual.environment}")
    private String environment;

    private final KafkaProperties kafkaProperties;

    @Autowired
    public SpringAvroProducer(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>(kafkaProperties.buildProducerProperties());

        /*
         * The Proxy chain config determines what added features you want when connecting to Axual.
         *
         * SWITCHING PROXY: Enables automatic failover to a different Kafka cluster. Team Speed can
         * trigger a switch when performing Kafka cluster maintenance. By passing the CommonConfig.ENDPOINT
         * config, the producer discovers the Kafka broker coordinates automatically.
         *
         * RESOLVING PROXY: Enables resolution of topic and group id names to actual names in Kafka
         * cluster. This allows multi-tenancy and multi-environment support on a single Kafka cluster.
         *
         * LINEAGE PROXY: Adds useful metadata in message headers to allow tracing of messages through Axual.
         *
         * LOGGING PROXY: Can be activated anywhere in the chain to log the state.
         */
        props.put(AxualProducerConfig.CHAIN_CONFIG, ProxyChain.newBuilder()
                .append(ProxyTypeRegistry.SWITCHING_PROXY_ID)
                .append(ProxyTypeRegistry.RESOLVING_PROXY_ID)
                .append(ProxyTypeRegistry.HEADER_PROXY_ID)
                .append(ProxyTypeRegistry.LINEAGE_PROXY_ID)
                .build());

        props.put(CommonConfig.APPLICATION_ID, kafkaProperties.getProducer().getClientId());
        props.put(CommonConfig.APPLICATION_VERSION, "1.0.0");
        props.put(CommonConfig.TENANT, tenant);
        props.put(CommonConfig.ENVIRONMENT, environment);

        props.put(CommonConfig.ENDPOINT, endpoint);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL);

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class.getName());

        return props;
    }

    @Bean
    public ProducerFactory<Application, ApplicationLogEvent> producerFactory() {
        return new AxualKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<Application, ApplicationLogEvent> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Update your SpringBootApplication class to start sending records.

@SpringBootApplication
public class AvroProducerApplication implements CommandLineRunner {

    private static final Logger LOG = LoggerFactory.getLogger(AvroProducerApplication.class);

    @Value("${axual.stream}")
    private String stream;

    private KafkaTemplate<Application, ApplicationLogEvent> kafkaTemplate;

    private static final Application SOURCE = Application.newBuilder()
            .setName("Axual ❤️ Spring")
            .setVersion("1.0.0")
            .setOwner("none")
            .build();

    public static void main(String[] args) {
        SpringApplication.run(AvroProducerApplication.class, args);
    }

    @Autowired
    public AvroProducerApplication(KafkaTemplate<Application, ApplicationLogEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Override
    public void run(String... args) {
        // every message in kafka consist of key and value, let's build it.
        Application key = Application.newBuilder()
                .setName("axual speaks spring")
                .setVersion("1.0.0")
                .setOwner("none")
                .build();

        ApplicationLogEvent value = ApplicationLogEvent.newBuilder()
                .setTimestamp(System.currentTimeMillis())
                .setSource(SOURCE)
                .setLevel(ApplicationLogLevel.INFO)
                .setMessage("Sending a message!")
                .setContext(Collections.singletonMap("Some key", "Some Value"))
                .build();

        kafkaTemplate.setProducerListener(new ProducerListener<Application, ApplicationLogEvent>() {
            @Override
            public void onSuccess(ProducerRecord<Application, ApplicationLogEvent> producerRecord,
                                  RecordMetadata recordMetadata) {
                // Do NOT perform time-consuming network or DB operations here. This method is
                // called within the producer sender thread so this method should complete super fast.
                // If you need to perform expensive network or DB operations store the metadata
                // in a hash map and perform the operation in a separate thread.
                long now = System.currentTimeMillis();
                LOG.info("Successfully produced message {} on partition {}, offset {} in {} milliseconds",
                        producerRecord, recordMetadata.partition(),
                        recordMetadata.offset(), now - recordMetadata.timestamp());
            }

            @Override
            public void onError(ProducerRecord<Application, ApplicationLogEvent> producerRecord,
                                RecordMetadata recordMetadata,
                                Exception exception) {
                // When a failure occurs, information is available in ExecutionException.
                LOG.error("Error occurred while producing message {}", producerRecord, exception);
            }
        });

        kafkaTemplate.send(stream, key, value);
    }
}
When all of the above steps have been done correctly, start your producer app. With the logging level INFO, Your app should log
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.5.4)

2022-03-04 14:51:57.723  INFO 57887 --- [           main] i.a.c.examples.AvroProducerApplication   : Starting AvroProducerApplication using Java 11.0.2 on MacBook-Pro.local with PID 57887 (/Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples/spring-examples/axual-client-spring-examples/axual-spring-kafka-avro-producer/target/classes started by shalabh in /Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples)
2022-03-04 14:51:57.728  INFO 57887 --- [           main] i.a.c.examples.AvroProducerApplication   : No active profile set, falling back to default profiles: default
2022-03-04 14:51:58.877  INFO 57887 --- [           main] i.a.c.examples.AvroProducerApplication   : Started AvroProducerApplication in 1.635 seconds (JVM running for 4.01)
2022-03-04 14:51:59.812  INFO 57887 --- [           main] i.a.d.client.fetcher.DiscoveryLoader     : TTL updated to: 5000 (was: 0)
2022-03-04 14:51:59.812  INFO 57887 --- [           main] i.a.d.client.fetcher.DiscoveryLoader     : Fetched discovery properties: DiscoveryResult: {group.id.pattern={tenant}-{instance}-{environment}-{group}, acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, topic.pattern={tenant}-{instance}-{environment}-{topic}, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ttl=5000, distributor.distance=2, enable.value.headers=false, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, schema.registry.url=http://localhost:8082, environment=local, system=platform-test-standalone, distributor.timeout=20000, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, tenant=axual}
2022-03-04 14:51:59.812  INFO 57887 --- [           main] i.a.c.p.s.discovery.DiscoverySubscriber  : Received new DiscoveryResult for SwitchingProducer: DiscoveryResult: {group.id.pattern={tenant}-{instance}-{environment}-{group}, acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, topic.pattern={tenant}-{instance}-{environment}-{topic}, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ttl=5000, distributor.distance=2, enable.value.headers=false, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, schema.registry.url=http://localhost:8082, environment=local, system=platform-test-standalone, distributor.timeout=20000, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, tenant=axual}
2022-03-04 14:51:59.813  INFO 57887 --- [           main] i.a.c.p.s.discovery.DiscoverySubscriber  : Switching SwitchingProducer from null to clusterA
2022-03-04 14:51:59.813  INFO 57887 --- [           main] i.a.c.p.s.g.BaseClientProxySwitcher      : Creating new backing producer with Discovery API result: DiscoveryResult: {group.id.pattern={tenant}-{instance}-{environment}-{group}, acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, topic.pattern={tenant}-{instance}-{environment}-{topic}, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ttl=5000, distributor.distance=2, enable.value.headers=false, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, schema.registry.url=http://localhost:8082, environment=local, system=platform-test-standalone, distributor.timeout=20000, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, tenant=axual}
2022-03-04 14:51:59.813  INFO 57887 --- [           main] i.a.c.p.s.producer.ProducerSwitcher      : Creating a new producer with properties: {acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, reconnect.backoff.ms=1000, topic.pattern={tenant}-{instance}-{environment}-{topic}, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, retry.backoff.ms=1000, buffer.memory=33554432, key.serializer=io.axual.serde.avro.SpecificAvroSerializer, ssl.keystore.type=JKS, schema.registry.url=http://localhost:8082, endpoint=http://127.0.0.1:8081, ssl.key.password=[hidden], ssl.truststore.password=[hidden], max.in.flight.requests.per.connection=5, tenant=axual, client.id=producer-io.axual.example.client.avro.producer-1, ssl.endpoint.identification.algorithm=, group.id.pattern={tenant}-{instance}-{environment}-{group}, ssl.protocol=TLSv1.3, ssl.enabled.protocols=[TLSv1.2, TLSv1.3], acks=-1, batch.size=16834, ssl.keystore.location=/Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples/spring-examples/axual-client-spring-examples/axual-spring-kafka-avro-producer/target/classes/client-certs/axual.client.keystore.jks, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, app.id=io.axual.example.client.avro.producer, ttl=5000, distributor.distance=2, ssl.truststore.type=JKS, enable.value.headers=false, app.version=1.0.0, security.protocol=SSL, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, retries=0, environment=local, value.serializer=io.axual.serde.avro.SpecificAvroSerializer, ssl.truststore.location=/Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples/spring-examples/axual-client-spring-examples/axual-spring-kafka-avro-producer/target/classes/client-certs/axual.client.truststore.jks, system=platform-test-standalone, distributor.timeout=20000, ssl.keystore.password=[hidden], group.id.resolver=io.axual.common.resolver.GroupPatternResolver, internal.auto.downgrade.txn.commit=true, linger.ms=100}
2022-03-04 14:51:59.849  INFO 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: ...
	2022-03-04 14:51:59.861  INFO 57887 --- [           main] i.c.k.s.KafkaAvroSerializerConfig        : KafkaAvroSerializerConfig values: ...
2022-03-04 14:51:59.959  INFO 57887 --- [           main] i.c.k.s.KafkaAvroSerializerConfig        : KafkaAvroSerializerConfig values: ...

2022-03-04 14:51:59.960  INFO 57887 --- [           main] i.c.k.s.KafkaAvroSerializerConfig        : KafkaAvroSerializerConfig values: ...
2022-03-04 14:52:00.001  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'acl.principal.builder' was supplied but isn't a known config.
2022-03-04 14:52:00.002  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'cluster' was supplied but isn't a known config.
2022-03-04 14:52:00.003  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'instance' was supplied but isn't a known config.
2022-03-04 14:52:00.003  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'topic.pattern' was supplied but isn't a known config.
2022-03-04 14:52:00.003  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'schema.registry.url' was supplied but isn't a known config.
2022-03-04 14:52:00.003  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'endpoint' was supplied but isn't a known config.
2022-03-04 14:52:00.003  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'tenant' was supplied but isn't a known config.
2022-03-04 14:52:00.003  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'group.id.pattern' was supplied but isn't a known config.
2022-03-04 14:52:00.003  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'transactional.id.resolver' was supplied but isn't a known config.
2022-03-04 14:52:00.003  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'app.id' was supplied but isn't a known config.
2022-03-04 14:52:00.004  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'ttl' was supplied but isn't a known config.
2022-03-04 14:52:00.004  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'distributor.distance' was supplied but isn't a known config.
2022-03-04 14:52:00.004  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'enable.value.headers' was supplied but isn't a known config.
2022-03-04 14:52:00.004  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'app.version' was supplied but isn't a known config.
2022-03-04 14:52:00.005  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'transactional.id.pattern' was supplied but isn't a known config.
2022-03-04 14:52:00.005  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'environment' was supplied but isn't a known config.
2022-03-04 14:52:00.005  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'system' was supplied but isn't a known config.
2022-03-04 14:52:00.005  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'distributor.timeout' was supplied but isn't a known config.
2022-03-04 14:52:00.005  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'group.id.resolver' was supplied but isn't a known config.
2022-03-04 14:52:00.007  INFO 57887 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.7.1
2022-03-04 14:52:00.007  INFO 57887 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 61dbce85d0d41457
2022-03-04 14:52:00.007  INFO 57887 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1646401920005
2022-03-04 14:52:00.009  INFO 57887 --- [           main] i.a.c.p.s.g.BaseClientProxySwitcher      : Created new backing producer
2022-03-04 14:52:00.476  INFO 57887 --- [avro.producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-io.axual.example.client.avro.producer-1] Cluster ID: ttKdVMqWRd-uWOHsliPu4Q
2022-03-04 14:52:00.584  INFO 57887 --- [ionShutdownHook] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-io.axual.example.client.avro.producer-1] Closing the Kafka producer with timeoutMillis = 86399913600000 ms.
2022-03-04 14:52:00.787  INFO 57887 --- [avro.producer-1] i.a.c.examples.AvroProducerApplication   : Successfully produced message ProducerRecord(topic=avro-applicationlog, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key={"name": "axual speaks spring", "version": "1.0.0", "owner": "none"}, value={"timestamp": 1646401918900, "source": {"name": "Axual ❤️ Spring", "version": "1.0.0", "owner": "none"}, "context": {"Some key": "Some Value"}, "level": "INFO", "message": "Sending a message!"}, timestamp=null) on partition 6, offset 0 in 311 milliseconds
2022-03-04 14:52:00.801  INFO 57887 --- [ionShutdownHook] org.apache.kafka.common.metrics.Metrics  : Metrics scheduler closed
2022-03-04 14:52:00.801  INFO 57887 --- [ionShutdownHook] org.apache.kafka.common.metrics.Metrics  : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2022-03-04 14:52:00.801  INFO 57887 --- [ionShutdownHook] org.apache.kafka.common.metrics.Metrics  : Metrics reporters closed
2022-03-04 14:52:00.802  INFO 57887 --- [ionShutdownHook] o.a.kafka.common.utils.AppInfoParser     : App info kafka.producer for producer-io.axual.example.client.avro.producer-1 unregistered

Process finished with exit code 0

This is all the coding required to make a successful produce happen.

Next Step: Consuming Data Using A Java Consumer Application

In the next step you will create a Java consumer application to use the data you just produced.