Producing Using Java Spring Kafka
Creating A Producer Application
This shows typical uses of Axual Platform combined with Spring Boot.
Before continuing, please check you fulfill all the Prerequisites.
Create Spring Boot project using your IDE or Spring Initializr.
Building The Application
To enable Axual client functionality within a Spring Boot application, include the following dependencies in pom.xml
.
For full executable Spring kafka Avro producer example, please refer examples repository. |
<!-- Axual client proxy-->
<dependency>
<groupId>io.axual.client</groupId>
<artifactId>axual-client-proxy</artifactId>
<version>5.8.1</version>
</dependency>
<!-- Enables SpringBoot Kafka functionality -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- SpringBoot starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Optionally when using Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.1</version>
</dependency>
In order to create a producer application, you need to provide the configuration for the producer in resources/application.yaml
file.
axual:
endpoint: "http://127.0.0.1:8081"
tenant: "axual"
environment: "local"
stream: "avro-applicationlog"
spring:
kafka:
producer:
client-id: io.axual.example.client.avro.producer
acks: -1
batch-size: 16834
buffer-memory: 33554432
properties:
linger.ms: 100
ssl:
# Using JKS
key-store-location: client-certs/axual.client.keystore.jks
key-store-password: notsecret
key-password: notsecret
trust-store-location: client-certs/axual.client.truststore.jks
trust-store-password: notsecret
Check your essentials-package for the truststore and keystore files, see also Security |
Create AxualKafkaProducerFactory<K, V>
bean class by extending DefaultKafkaProducerFactory<K, V>
.
public class AxualKafkaProducerFactory<K, V> extends DefaultKafkaProducerFactory<K, V> {
public AxualKafkaProducerFactory(Map<String, Object> configs) {
super(configs);
}
@Override
protected Producer<K, V> createKafkaProducer() {
return new AxualProducer<>(getConfigurationProperties());
}
}
Define producer beans configuration.
@Configuration
public class SpringAvroProducer {
@Value("${axual.endpoint}")
private String endpoint;
@Value("${axual.tenant}")
private String tenant;
@Value("${axual.environment}")
private String environment;
private final KafkaProperties kafkaProperties;
@Autowired
public SpringAvroProducer(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>(kafkaProperties.buildProducerProperties());
/*
* The Proxy chain config determines what added features you want when connecting to Axual.
*
* SWITCHING PROXY: Enables automatic failover to a different Kafka cluster. Team Speed can
* trigger a switch when performing Kafka cluster maintenance. By passing the CommonConfig.ENDPOINT
* config, the producer discovers the Kafka broker coordinates automatically.
*
* RESOLVING PROXY: Enables resolution of topic and group id names to actual names in Kafka
* cluster. This allows multi-tenancy and multi-environment support on a single Kafka cluster.
*
* LINEAGE PROXY: Adds useful metadata in message headers to allow tracing of messages through Axual.
*
* LOGGING PROXY: Can be activated anywhere in the chain to log the state.
*/
props.put(AxualProducerConfig.CHAIN_CONFIG, ProxyChain.newBuilder()
.append(ProxyTypeRegistry.SWITCHING_PROXY_ID)
.append(ProxyTypeRegistry.RESOLVING_PROXY_ID)
.append(ProxyTypeRegistry.HEADER_PROXY_ID)
.append(ProxyTypeRegistry.LINEAGE_PROXY_ID)
.build());
props.put(CommonConfig.APPLICATION_ID, kafkaProperties.getProducer().getClientId());
props.put(CommonConfig.APPLICATION_VERSION, "1.0.0");
props.put(CommonConfig.TENANT, tenant);
props.put(CommonConfig.ENVIRONMENT, environment);
props.put(CommonConfig.ENDPOINT, endpoint);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class.getName());
return props;
}
@Bean
public ProducerFactory<Application, ApplicationLogEvent> producerFactory() {
return new AxualKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<Application, ApplicationLogEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Update your SpringBootApplication class to start sending records.
@SpringBootApplication
public class AvroProducerApplication implements CommandLineRunner {
private static final Logger LOG = LoggerFactory.getLogger(AvroProducerApplication.class);
@Value("${axual.stream}")
private String stream;
private KafkaTemplate<Application, ApplicationLogEvent> kafkaTemplate;
private static final Application SOURCE = Application.newBuilder()
.setName("Axual ❤️ Spring")
.setVersion("1.0.0")
.setOwner("none")
.build();
public static void main(String[] args) {
SpringApplication.run(AvroProducerApplication.class, args);
}
@Autowired
public AvroProducerApplication(KafkaTemplate<Application, ApplicationLogEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void run(String... args) {
// every message in kafka consist of key and value, let's build it.
Application key = Application.newBuilder()
.setName("axual speaks spring")
.setVersion("1.0.0")
.setOwner("none")
.build();
ApplicationLogEvent value = ApplicationLogEvent.newBuilder()
.setTimestamp(System.currentTimeMillis())
.setSource(SOURCE)
.setLevel(ApplicationLogLevel.INFO)
.setMessage("Sending a message!")
.setContext(Collections.singletonMap("Some key", "Some Value"))
.build();
kafkaTemplate.setProducerListener(new ProducerListener<Application, ApplicationLogEvent>() {
@Override
public void onSuccess(ProducerRecord<Application, ApplicationLogEvent> producerRecord,
RecordMetadata recordMetadata) {
// Do NOT perform time-consuming network or DB operations here. This method is
// called within the producer sender thread so this method should complete super fast.
// If you need to perform expensive network or DB operations store the metadata
// in a hash map and perform the operation in a separate thread.
long now = System.currentTimeMillis();
LOG.info("Successfully produced message {} on partition {}, offset {} in {} milliseconds",
producerRecord, recordMetadata.partition(),
recordMetadata.offset(), now - recordMetadata.timestamp());
}
@Override
public void onError(ProducerRecord<Application, ApplicationLogEvent> producerRecord,
RecordMetadata recordMetadata,
Exception exception) {
// When a failure occurs, information is available in ExecutionException.
LOG.error("Error occurred while producing message {}", producerRecord, exception);
}
});
kafkaTemplate.send(stream, key, value);
}
}
- When all of the above steps have been done correctly, start your producer app. With the logging level
INFO
, Your app should log
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.5.4) 2022-03-04 14:51:57.723 INFO 57887 --- [ main] i.a.c.examples.AvroProducerApplication : Starting AvroProducerApplication using Java 11.0.2 on MacBook-Pro.local with PID 57887 (/Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples/spring-examples/axual-client-spring-examples/axual-spring-kafka-avro-producer/target/classes started by shalabh in /Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples) 2022-03-04 14:51:57.728 INFO 57887 --- [ main] i.a.c.examples.AvroProducerApplication : No active profile set, falling back to default profiles: default 2022-03-04 14:51:58.877 INFO 57887 --- [ main] i.a.c.examples.AvroProducerApplication : Started AvroProducerApplication in 1.635 seconds (JVM running for 4.01) 2022-03-04 14:51:59.812 INFO 57887 --- [ main] i.a.d.client.fetcher.DiscoveryLoader : TTL updated to: 5000 (was: 0) 2022-03-04 14:51:59.812 INFO 57887 --- [ main] i.a.d.client.fetcher.DiscoveryLoader : Fetched discovery properties: DiscoveryResult: {group.id.pattern={tenant}-{instance}-{environment}-{group}, acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, topic.pattern={tenant}-{instance}-{environment}-{topic}, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ttl=5000, distributor.distance=2, enable.value.headers=false, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, schema.registry.url=http://localhost:8082, environment=local, system=platform-test-standalone, distributor.timeout=20000, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, tenant=axual} 2022-03-04 14:51:59.812 INFO 57887 --- [ main] i.a.c.p.s.discovery.DiscoverySubscriber : Received new DiscoveryResult for SwitchingProducer: DiscoveryResult: {group.id.pattern={tenant}-{instance}-{environment}-{group}, acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, topic.pattern={tenant}-{instance}-{environment}-{topic}, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ttl=5000, distributor.distance=2, enable.value.headers=false, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, schema.registry.url=http://localhost:8082, environment=local, system=platform-test-standalone, distributor.timeout=20000, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, tenant=axual} 2022-03-04 14:51:59.813 INFO 57887 --- [ main] i.a.c.p.s.discovery.DiscoverySubscriber : Switching SwitchingProducer from null to clusterA 2022-03-04 14:51:59.813 INFO 57887 --- [ main] i.a.c.p.s.g.BaseClientProxySwitcher : Creating new backing producer with Discovery API result: DiscoveryResult: {group.id.pattern={tenant}-{instance}-{environment}-{group}, acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, topic.pattern={tenant}-{instance}-{environment}-{topic}, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ttl=5000, distributor.distance=2, enable.value.headers=false, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, schema.registry.url=http://localhost:8082, environment=local, system=platform-test-standalone, distributor.timeout=20000, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, tenant=axual} 2022-03-04 14:51:59.813 INFO 57887 --- [ main] i.a.c.p.s.producer.ProducerSwitcher : Creating a new producer with properties: {acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, reconnect.backoff.ms=1000, topic.pattern={tenant}-{instance}-{environment}-{topic}, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, retry.backoff.ms=1000, buffer.memory=33554432, key.serializer=io.axual.serde.avro.SpecificAvroSerializer, ssl.keystore.type=JKS, schema.registry.url=http://localhost:8082, endpoint=http://127.0.0.1:8081, ssl.key.password=[hidden], ssl.truststore.password=[hidden], max.in.flight.requests.per.connection=5, tenant=axual, client.id=producer-io.axual.example.client.avro.producer-1, ssl.endpoint.identification.algorithm=, group.id.pattern={tenant}-{instance}-{environment}-{group}, ssl.protocol=TLSv1.3, ssl.enabled.protocols=[TLSv1.2, TLSv1.3], acks=-1, batch.size=16834, ssl.keystore.location=/Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples/spring-examples/axual-client-spring-examples/axual-spring-kafka-avro-producer/target/classes/client-certs/axual.client.keystore.jks, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, app.id=io.axual.example.client.avro.producer, ttl=5000, distributor.distance=2, ssl.truststore.type=JKS, enable.value.headers=false, app.version=1.0.0, security.protocol=SSL, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, retries=0, environment=local, value.serializer=io.axual.serde.avro.SpecificAvroSerializer, ssl.truststore.location=/Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples/spring-examples/axual-client-spring-examples/axual-spring-kafka-avro-producer/target/classes/client-certs/axual.client.truststore.jks, system=platform-test-standalone, distributor.timeout=20000, ssl.keystore.password=[hidden], group.id.resolver=io.axual.common.resolver.GroupPatternResolver, internal.auto.downgrade.txn.commit=true, linger.ms=100} 2022-03-04 14:51:59.849 INFO 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: ... 2022-03-04 14:51:59.861 INFO 57887 --- [ main] i.c.k.s.KafkaAvroSerializerConfig : KafkaAvroSerializerConfig values: ... 2022-03-04 14:51:59.959 INFO 57887 --- [ main] i.c.k.s.KafkaAvroSerializerConfig : KafkaAvroSerializerConfig values: ... 2022-03-04 14:51:59.960 INFO 57887 --- [ main] i.c.k.s.KafkaAvroSerializerConfig : KafkaAvroSerializerConfig values: ... 2022-03-04 14:52:00.001 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'acl.principal.builder' was supplied but isn't a known config. 2022-03-04 14:52:00.002 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'cluster' was supplied but isn't a known config. 2022-03-04 14:52:00.003 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'instance' was supplied but isn't a known config. 2022-03-04 14:52:00.003 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'topic.pattern' was supplied but isn't a known config. 2022-03-04 14:52:00.003 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'schema.registry.url' was supplied but isn't a known config. 2022-03-04 14:52:00.003 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'endpoint' was supplied but isn't a known config. 2022-03-04 14:52:00.003 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'tenant' was supplied but isn't a known config. 2022-03-04 14:52:00.003 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'group.id.pattern' was supplied but isn't a known config. 2022-03-04 14:52:00.003 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'transactional.id.resolver' was supplied but isn't a known config. 2022-03-04 14:52:00.003 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'app.id' was supplied but isn't a known config. 2022-03-04 14:52:00.004 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'ttl' was supplied but isn't a known config. 2022-03-04 14:52:00.004 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'distributor.distance' was supplied but isn't a known config. 2022-03-04 14:52:00.004 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'enable.value.headers' was supplied but isn't a known config. 2022-03-04 14:52:00.004 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'app.version' was supplied but isn't a known config. 2022-03-04 14:52:00.005 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'transactional.id.pattern' was supplied but isn't a known config. 2022-03-04 14:52:00.005 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'environment' was supplied but isn't a known config. 2022-03-04 14:52:00.005 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'system' was supplied but isn't a known config. 2022-03-04 14:52:00.005 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'distributor.timeout' was supplied but isn't a known config. 2022-03-04 14:52:00.005 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'group.id.resolver' was supplied but isn't a known config. 2022-03-04 14:52:00.007 INFO 57887 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.7.1 2022-03-04 14:52:00.007 INFO 57887 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 61dbce85d0d41457 2022-03-04 14:52:00.007 INFO 57887 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1646401920005 2022-03-04 14:52:00.009 INFO 57887 --- [ main] i.a.c.p.s.g.BaseClientProxySwitcher : Created new backing producer 2022-03-04 14:52:00.476 INFO 57887 --- [avro.producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-io.axual.example.client.avro.producer-1] Cluster ID: ttKdVMqWRd-uWOHsliPu4Q 2022-03-04 14:52:00.584 INFO 57887 --- [ionShutdownHook] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-io.axual.example.client.avro.producer-1] Closing the Kafka producer with timeoutMillis = 86399913600000 ms. 2022-03-04 14:52:00.787 INFO 57887 --- [avro.producer-1] i.a.c.examples.AvroProducerApplication : Successfully produced message ProducerRecord(topic=avro-applicationlog, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key={"name": "axual speaks spring", "version": "1.0.0", "owner": "none"}, value={"timestamp": 1646401918900, "source": {"name": "Axual ❤️ Spring", "version": "1.0.0", "owner": "none"}, "context": {"Some key": "Some Value"}, "level": "INFO", "message": "Sending a message!"}, timestamp=null) on partition 6, offset 0 in 311 milliseconds 2022-03-04 14:52:00.801 INFO 57887 --- [ionShutdownHook] org.apache.kafka.common.metrics.Metrics : Metrics scheduler closed 2022-03-04 14:52:00.801 INFO 57887 --- [ionShutdownHook] org.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter 2022-03-04 14:52:00.801 INFO 57887 --- [ionShutdownHook] org.apache.kafka.common.metrics.Metrics : Metrics reporters closed 2022-03-04 14:52:00.802 INFO 57887 --- [ionShutdownHook] o.a.kafka.common.utils.AppInfoParser : App info kafka.producer for producer-io.axual.example.client.avro.producer-1 unregistered Process finished with exit code 0
This is all the coding required to make a successful produce happen.
Next Step: Consuming Data Using A Java Consumer Application
In the next step you will create a Java consumer application to use the data you just produced.
Proceed to Consuming Data (Java Spring Client)