Producing Using Java Spring Kafka

Creating A Producer Application

This shows typical uses of Axual Platform combined with Spring Boot.

Before continuing, please check you fulfill all the Prerequisites.

Create Spring Boot project using your IDE or Spring Initializr.

Building The Application

To enable Axual client functionality within a Spring Boot application, include the following dependencies in pom.xml.

For full executable Spring kafka Avro producer example, please refer examples repository.
<!-- Axual client proxy-->
<dependency>
    <groupId>io.axual.client</groupId>
    <artifactId>axual-client-proxy</artifactId>
    <version>5.8.1</version>
</dependency>
<!-- Enables SpringBoot Kafka functionality -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<!-- SpringBoot starter -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Optionally when using Avro -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.9.1</version>
</dependency>

In order to create a producer application, you need to provide the configuration for the producer in resources/application.yaml file.

axual:
  endpoint: "http://127.0.0.1:8081"
  tenant: "axual"
  environment: "local"
  stream: "avro-applicationlog"

spring:
  kafka:
    producer:
      client-id: io.axual.example.client.avro.producer
      acks: -1
      batch-size: 16834
      buffer-memory: 33554432
      properties:
        linger.ms: 100
      ssl:
        # Using JKS
        key-store-location: client-certs/axual.client.keystore.jks
        key-store-password: notsecret
        key-password: notsecret
        trust-store-location: client-certs/axual.client.truststore.jks
        trust-store-password: notsecret
Check your essentials-package for the truststore and keystore files, see also Security

Create AxualKafkaProducerFactory<K, V> bean class by extending DefaultKafkaProducerFactory<K, V>.

public class AxualKafkaProducerFactory<K, V> extends DefaultKafkaProducerFactory<K, V> {

    public AxualKafkaProducerFactory(Map<String, Object> configs) {
        super(configs);
    }

    @Override
    protected Producer<K, V> createKafkaProducer() {
        return new AxualProducer<>(getConfigurationProperties());
    }
}

Define producer beans configuration.

@Configuration
public class SpringAvroProducer {

    @Value("${axual.endpoint}")
    private String endpoint;

    @Value("${axual.tenant}")
    private String tenant;

    @Value("${axual.environment}")
    private String environment;

    private final KafkaProperties kafkaProperties;

    @Autowired
    public SpringAvroProducer(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>(kafkaProperties.buildProducerProperties());

        /*
         * The Proxy chain config determines what added features you want when connecting to Axual.
         *
         * SWITCHING PROXY: Enables automatic failover to a different Kafka cluster. Team Speed can
         * trigger a switch when performing Kafka cluster maintenance. By passing the CommonConfig.ENDPOINT
         * config, the producer discovers the Kafka broker coordinates automatically.
         *
         * RESOLVING PROXY: Enables resolution of topic and group id names to actual names in Kafka
         * cluster. This allows multi-tenancy and multi-environment support on a single Kafka cluster.
         *
         * LINEAGE PROXY: Adds useful metadata in message headers to allow tracing of messages through Axual.
         *
         * LOGGING PROXY: Can be activated anywhere in the chain to log the state.
         */
        props.put(AxualProducerConfig.CHAIN_CONFIG, ProxyChain.newBuilder()
                .append(ProxyTypeRegistry.SWITCHING_PROXY_ID)
                .append(ProxyTypeRegistry.RESOLVING_PROXY_ID)
                .append(ProxyTypeRegistry.HEADER_PROXY_ID)
                .append(ProxyTypeRegistry.LINEAGE_PROXY_ID)
                .build());

        props.put(CommonConfig.APPLICATION_ID, kafkaProperties.getProducer().getClientId());
        props.put(CommonConfig.APPLICATION_VERSION, "1.0.0");
        props.put(CommonConfig.TENANT, tenant);
        props.put(CommonConfig.ENVIRONMENT, environment);

        props.put(CommonConfig.ENDPOINT, endpoint);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL);

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class.getName());

        return props;
    }

    @Bean
    public ProducerFactory<Application, ApplicationLogEvent> producerFactory() {
        return new AxualKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<Application, ApplicationLogEvent> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Update your SpringBootApplication class to start sending records.

@SpringBootApplication
public class AvroProducerApplication implements CommandLineRunner {

    private static final Logger LOG = LoggerFactory.getLogger(AvroProducerApplication.class);

    @Value("${axual.stream}")
    private String stream;

    private KafkaTemplate<Application, ApplicationLogEvent> kafkaTemplate;

    private static final Application SOURCE = Application.newBuilder()
            .setName("Axual ❤️ Spring")
            .setVersion("1.0.0")
            .setOwner("none")
            .build();

    public static void main(String[] args) {
        SpringApplication.run(AvroProducerApplication.class, args);
    }

    @Autowired
    public AvroProducerApplication(KafkaTemplate<Application, ApplicationLogEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Override
    public void run(String... args) {
        // every message in kafka consist of key and value, let's build it.
        Application key = Application.newBuilder()
                .setName("axual speaks spring")
                .setVersion("1.0.0")
                .setOwner("none")
                .build();

        ApplicationLogEvent value = ApplicationLogEvent.newBuilder()
                .setTimestamp(System.currentTimeMillis())
                .setSource(SOURCE)
                .setLevel(ApplicationLogLevel.INFO)
                .setMessage("Sending a message!")
                .setContext(Collections.singletonMap("Some key", "Some Value"))
                .build();

        kafkaTemplate.setProducerListener(new ProducerListener<Application, ApplicationLogEvent>() {
            @Override
            public void onSuccess(ProducerRecord<Application, ApplicationLogEvent> producerRecord,
                                  RecordMetadata recordMetadata) {
                // Do NOT perform time-consuming network or DB operations here. This method is
                // called within the producer sender thread so this method should complete super fast.
                // If you need to perform expensive network or DB operations store the metadata
                // in a hash map and perform the operation in a separate thread.
                long now = System.currentTimeMillis();
                LOG.info("Successfully produced message {} on partition {}, offset {} in {} milliseconds",
                        producerRecord, recordMetadata.partition(),
                        recordMetadata.offset(), now - recordMetadata.timestamp());
            }

            @Override
            public void onError(ProducerRecord<Application, ApplicationLogEvent> producerRecord,
                                RecordMetadata recordMetadata,
                                Exception exception) {
                // When a failure occurs, information is available in ExecutionException.
                LOG.error("Error occurred while producing message {}", producerRecord, exception);
            }
        });

        kafkaTemplate.send(stream, key, value);
    }
}
When all of the above steps have been done correctly, start your producer app. With the logging level INFO, Your app should log
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.5.4)

2022-03-04 14:51:57.723  INFO 57887 --- [           main] i.a.c.examples.AvroProducerApplication   : Starting AvroProducerApplication using Java 11.0.2 on MacBook-Pro.local with PID 57887 (/Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples/spring-examples/axual-client-spring-examples/axual-spring-kafka-avro-producer/target/classes started by shalabh in /Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples)
2022-03-04 14:51:57.728  INFO 57887 --- [           main] i.a.c.examples.AvroProducerApplication   : No active profile set, falling back to default profiles: default
2022-03-04 14:51:58.877  INFO 57887 --- [           main] i.a.c.examples.AvroProducerApplication   : Started AvroProducerApplication in 1.635 seconds (JVM running for 4.01)
2022-03-04 14:51:59.812  INFO 57887 --- [           main] i.a.d.client.fetcher.DiscoveryLoader     : TTL updated to: 5000 (was: 0)
2022-03-04 14:51:59.812  INFO 57887 --- [           main] i.a.d.client.fetcher.DiscoveryLoader     : Fetched discovery properties: DiscoveryResult: {group.id.pattern={tenant}-{instance}-{environment}-{group}, acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, topic.pattern={tenant}-{instance}-{environment}-{topic}, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ttl=5000, distributor.distance=2, enable.value.headers=false, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, schema.registry.url=http://localhost:8082, environment=local, system=platform-test-standalone, distributor.timeout=20000, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, tenant=axual}
2022-03-04 14:51:59.812  INFO 57887 --- [           main] i.a.c.p.s.discovery.DiscoverySubscriber  : Received new DiscoveryResult for SwitchingProducer: DiscoveryResult: {group.id.pattern={tenant}-{instance}-{environment}-{group}, acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, topic.pattern={tenant}-{instance}-{environment}-{topic}, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ttl=5000, distributor.distance=2, enable.value.headers=false, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, schema.registry.url=http://localhost:8082, environment=local, system=platform-test-standalone, distributor.timeout=20000, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, tenant=axual}
2022-03-04 14:51:59.813  INFO 57887 --- [           main] i.a.c.p.s.discovery.DiscoverySubscriber  : Switching SwitchingProducer from null to clusterA
2022-03-04 14:51:59.813  INFO 57887 --- [           main] i.a.c.p.s.g.BaseClientProxySwitcher      : Creating new backing producer with Discovery API result: DiscoveryResult: {group.id.pattern={tenant}-{instance}-{environment}-{group}, acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, topic.pattern={tenant}-{instance}-{environment}-{topic}, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ttl=5000, distributor.distance=2, enable.value.headers=false, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, schema.registry.url=http://localhost:8082, environment=local, system=platform-test-standalone, distributor.timeout=20000, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, tenant=axual}
2022-03-04 14:51:59.813  INFO 57887 --- [           main] i.a.c.p.s.producer.ProducerSwitcher      : Creating a new producer with properties: {acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, reconnect.backoff.ms=1000, topic.pattern={tenant}-{instance}-{environment}-{topic}, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, retry.backoff.ms=1000, buffer.memory=33554432, key.serializer=io.axual.serde.avro.SpecificAvroSerializer, ssl.keystore.type=JKS, schema.registry.url=http://localhost:8082, endpoint=http://127.0.0.1:8081, ssl.key.password=[hidden], ssl.truststore.password=[hidden], max.in.flight.requests.per.connection=5, tenant=axual, client.id=producer-io.axual.example.client.avro.producer-1, ssl.endpoint.identification.algorithm=, group.id.pattern={tenant}-{instance}-{environment}-{group}, ssl.protocol=TLSv1.3, ssl.enabled.protocols=[TLSv1.2, TLSv1.3], acks=-1, batch.size=16834, ssl.keystore.location=/Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples/spring-examples/axual-client-spring-examples/axual-spring-kafka-avro-producer/target/classes/client-certs/axual.client.keystore.jks, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, app.id=io.axual.example.client.avro.producer, ttl=5000, distributor.distance=2, ssl.truststore.type=JKS, enable.value.headers=false, app.version=1.0.0, security.protocol=SSL, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, retries=0, environment=local, value.serializer=io.axual.serde.avro.SpecificAvroSerializer, ssl.truststore.location=/Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples/spring-examples/axual-client-spring-examples/axual-spring-kafka-avro-producer/target/classes/client-certs/axual.client.truststore.jks, system=platform-test-standalone, distributor.timeout=20000, ssl.keystore.password=[hidden], group.id.resolver=io.axual.common.resolver.GroupPatternResolver, internal.auto.downgrade.txn.commit=true, linger.ms=100}
2022-03-04 14:51:59.849  INFO 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: ...
	2022-03-04 14:51:59.861  INFO 57887 --- [           main] i.c.k.s.KafkaAvroSerializerConfig        : KafkaAvroSerializerConfig values: ...
2022-03-04 14:51:59.959  INFO 57887 --- [           main] i.c.k.s.KafkaAvroSerializerConfig        : KafkaAvroSerializerConfig values: ...

2022-03-04 14:51:59.960  INFO 57887 --- [           main] i.c.k.s.KafkaAvroSerializerConfig        : KafkaAvroSerializerConfig values: ...
2022-03-04 14:52:00.001  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'acl.principal.builder' was supplied but isn't a known config.
2022-03-04 14:52:00.002  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'cluster' was supplied but isn't a known config.
2022-03-04 14:52:00.003  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'instance' was supplied but isn't a known config.
2022-03-04 14:52:00.003  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'topic.pattern' was supplied but isn't a known config.
2022-03-04 14:52:00.003  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'schema.registry.url' was supplied but isn't a known config.
2022-03-04 14:52:00.003  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'endpoint' was supplied but isn't a known config.
2022-03-04 14:52:00.003  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'tenant' was supplied but isn't a known config.
2022-03-04 14:52:00.003  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'group.id.pattern' was supplied but isn't a known config.
2022-03-04 14:52:00.003  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'transactional.id.resolver' was supplied but isn't a known config.
2022-03-04 14:52:00.003  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'app.id' was supplied but isn't a known config.
2022-03-04 14:52:00.004  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'ttl' was supplied but isn't a known config.
2022-03-04 14:52:00.004  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'distributor.distance' was supplied but isn't a known config.
2022-03-04 14:52:00.004  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'enable.value.headers' was supplied but isn't a known config.
2022-03-04 14:52:00.004  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'app.version' was supplied but isn't a known config.
2022-03-04 14:52:00.005  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'transactional.id.pattern' was supplied but isn't a known config.
2022-03-04 14:52:00.005  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'environment' was supplied but isn't a known config.
2022-03-04 14:52:00.005  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'system' was supplied but isn't a known config.
2022-03-04 14:52:00.005  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'distributor.timeout' was supplied but isn't a known config.
2022-03-04 14:52:00.005  WARN 57887 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'group.id.resolver' was supplied but isn't a known config.
2022-03-04 14:52:00.007  INFO 57887 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.7.1
2022-03-04 14:52:00.007  INFO 57887 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 61dbce85d0d41457
2022-03-04 14:52:00.007  INFO 57887 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1646401920005
2022-03-04 14:52:00.009  INFO 57887 --- [           main] i.a.c.p.s.g.BaseClientProxySwitcher      : Created new backing producer
2022-03-04 14:52:00.476  INFO 57887 --- [avro.producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-io.axual.example.client.avro.producer-1] Cluster ID: ttKdVMqWRd-uWOHsliPu4Q
2022-03-04 14:52:00.584  INFO 57887 --- [ionShutdownHook] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-io.axual.example.client.avro.producer-1] Closing the Kafka producer with timeoutMillis = 86399913600000 ms.
2022-03-04 14:52:00.787  INFO 57887 --- [avro.producer-1] i.a.c.examples.AvroProducerApplication   : Successfully produced message ProducerRecord(topic=avro-applicationlog, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key={"name": "axual speaks spring", "version": "1.0.0", "owner": "none"}, value={"timestamp": 1646401918900, "source": {"name": "Axual ❤️ Spring", "version": "1.0.0", "owner": "none"}, "context": {"Some key": "Some Value"}, "level": "INFO", "message": "Sending a message!"}, timestamp=null) on partition 6, offset 0 in 311 milliseconds
2022-03-04 14:52:00.801  INFO 57887 --- [ionShutdownHook] org.apache.kafka.common.metrics.Metrics  : Metrics scheduler closed
2022-03-04 14:52:00.801  INFO 57887 --- [ionShutdownHook] org.apache.kafka.common.metrics.Metrics  : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2022-03-04 14:52:00.801  INFO 57887 --- [ionShutdownHook] org.apache.kafka.common.metrics.Metrics  : Metrics reporters closed
2022-03-04 14:52:00.802  INFO 57887 --- [ionShutdownHook] o.a.kafka.common.utils.AppInfoParser     : App info kafka.producer for producer-io.axual.example.client.avro.producer-1 unregistered

Process finished with exit code 0

This is all the coding required to make a successful produce happen.

Next Step: Consuming Data Using A Java Consumer Application

In the next step you will create a Java consumer application to use the data you just produced.