Producing Using Java Spring Kafka
Creating A Producer Application
This shows typical uses of Axual Platform combined with Spring Boot.
Before continuing, please check you fulfill all the prerequisites .
Create Spring Boot project using your IDE or Spring Initializr.
Building The Application
To enable Axual client functionality within a Spring Boot application, include the following dependencies in pom.xml
.
For full executable Spring kafka Avro producer example, please refer examples repository. |
<!-- Axual client proxy-->
<dependency>
<groupId>io.axual.client</groupId>
<artifactId>axual-client-proxy</artifactId>
<version>5.8.1</version>
</dependency>
<!-- Enables SpringBoot Kafka functionality -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- SpringBoot starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Optionally when using Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.1</version>
</dependency>
In order to create a producer application, you need to provide the configuration for the producer in resources/application.yaml
file.
axual:
endpoint: "http://127.0.0.1:8081"
tenant: "axual"
environment: "local"
stream: "avro-applicationlog"
spring:
kafka:
producer:
client-id: io.axual.example.client.avro.producer
acks: -1
batch-size: 16834
buffer-memory: 33554432
properties:
linger.ms: 100
ssl:
# Using JKS
key-store-location: client-certs/axual.client.keystore.jks
key-store-password: notsecret
key-password: notsecret
trust-store-location: client-certs/axual.client.truststore.jks
trust-store-password: notsecret
Check your care package for the truststore and keystore files, see also Step 3 |
Create AxualKafkaProducerFactory<K, V>
bean class by extending DefaultKafkaProducerFactory<K, V>
.
public class AxualKafkaProducerFactory<K, V> extends DefaultKafkaProducerFactory<K, V> {
public AxualKafkaProducerFactory(Map<String, Object> configs) {
super(configs);
}
@Override
protected Producer<K, V> createKafkaProducer() {
return new AxualProducer<>(getConfigurationProperties());
}
}
Define producer beans configuration.
@Configuration
public class SpringAvroProducer {
@Value("${axual.endpoint}")
private String endpoint;
@Value("${axual.tenant}")
private String tenant;
@Value("${axual.environment}")
private String environment;
private final KafkaProperties kafkaProperties;
@Autowired
public SpringAvroProducer(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>(kafkaProperties.buildProducerProperties());
/*
* The Proxy chain config determines what added features you want when connecting to Axual.
*
* SWITCHING PROXY: Enables automatic failover to a different Kafka cluster. Team Speed can
* trigger a switch when performing Kafka cluster maintenance. By passing the CommonConfig.ENDPOINT
* config, the producer discovers the Kafka broker coordinates automatically.
*
* RESOLVING PROXY: Enables resolution of topic and group id names to actual names in Kafka
* cluster. This allows multi-tenancy and multi-environment support on a single Kafka cluster.
*
* LINEAGE PROXY: Adds useful metadata in message headers to allow tracing of messages through Axual.
*
* LOGGING PROXY: Can be activated anywhere in the chain to log the state.
*/
props.put(AxualProducerConfig.CHAIN_CONFIG, ProxyChain.newBuilder()
.append(ProxyTypeRegistry.SWITCHING_PROXY_ID)
.append(ProxyTypeRegistry.RESOLVING_PROXY_ID)
.append(ProxyTypeRegistry.HEADER_PROXY_ID)
.append(ProxyTypeRegistry.LINEAGE_PROXY_ID)
.build());
props.put(CommonConfig.APPLICATION_ID, kafkaProperties.getProducer().getClientId());
props.put(CommonConfig.APPLICATION_VERSION, "1.0.0");
props.put(CommonConfig.TENANT, tenant);
props.put(CommonConfig.ENVIRONMENT, environment);
props.put(CommonConfig.ENDPOINT, endpoint);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class.getName());
return props;
}
@Bean
public ProducerFactory<Application, ApplicationLogEvent> producerFactory() {
return new AxualKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<Application, ApplicationLogEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Update your SpringBootApplication class to start sending records.
@SpringBootApplication
public class AvroProducerApplication implements CommandLineRunner {
private static final Logger LOG = LoggerFactory.getLogger(AvroProducerApplication.class);
@Value("${axual.stream}")
private String stream;
private KafkaTemplate<Application, ApplicationLogEvent> kafkaTemplate;
private static final Application SOURCE = Application.newBuilder()
.setName("Axual ❤️ Spring")
.setVersion("1.0.0")
.setOwner("none")
.build();
public static void main(String[] args) {
SpringApplication.run(AvroProducerApplication.class, args);
}
@Autowired
public AvroProducerApplication(KafkaTemplate<Application, ApplicationLogEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void run(String... args) {
// every message in kafka consist of key and value, let's build it.
Application key = Application.newBuilder()
.setName("axual speaks spring")
.setVersion("1.0.0")
.setOwner("none")
.build();
ApplicationLogEvent value = ApplicationLogEvent.newBuilder()
.setTimestamp(System.currentTimeMillis())
.setSource(SOURCE)
.setLevel(ApplicationLogLevel.INFO)
.setMessage("Sending a message!")
.setContext(Collections.singletonMap("Some key", "Some Value"))
.build();
kafkaTemplate.setProducerListener(new ProducerListener<Application, ApplicationLogEvent>() {
@Override
public void onSuccess(ProducerRecord<Application, ApplicationLogEvent> producerRecord,
RecordMetadata recordMetadata) {
// Do NOT perform time-consuming network or DB operations here. This method is
// called within the producer sender thread so this method should complete super fast.
// If you need to perform expensive network or DB operations store the metadata
// in a hash map and perform the operation in a separate thread.
long now = System.currentTimeMillis();
LOG.info("Successfully produced message {} on partition {}, offset {} in {} milliseconds",
producerRecord, recordMetadata.partition(),
recordMetadata.offset(), now - recordMetadata.timestamp());
}
@Override
public void onError(ProducerRecord<Application, ApplicationLogEvent> producerRecord,
RecordMetadata recordMetadata,
Exception exception) {
// When a failure occurs, information is available in ExecutionException.
LOG.error("Error occurred while producing message {}", producerRecord, exception);
}
});
kafkaTemplate.send(stream, key, value);
}
}
- When all of the above steps have been done correctly, start your producer app. With the logging level
INFO
, Your app should log
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.5.4) 2022-03-04 14:51:57.723 INFO 57887 --- [ main] i.a.c.examples.AvroProducerApplication : Starting AvroProducerApplication using Java 11.0.2 on MacBook-Pro.local with PID 57887 (/Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples/spring-examples/axual-client-spring-examples/axual-spring-kafka-avro-producer/target/classes started by shalabh in /Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples) 2022-03-04 14:51:57.728 INFO 57887 --- [ main] i.a.c.examples.AvroProducerApplication : No active profile set, falling back to default profiles: default 2022-03-04 14:51:58.877 INFO 57887 --- [ main] i.a.c.examples.AvroProducerApplication : Started AvroProducerApplication in 1.635 seconds (JVM running for 4.01) 2022-03-04 14:51:59.812 INFO 57887 --- [ main] i.a.d.client.fetcher.DiscoveryLoader : TTL updated to: 5000 (was: 0) 2022-03-04 14:51:59.812 INFO 57887 --- [ main] i.a.d.client.fetcher.DiscoveryLoader : Fetched discovery properties: DiscoveryResult: {group.id.pattern={tenant}-{instance}-{environment}-{group}, acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, topic.pattern={tenant}-{instance}-{environment}-{topic}, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ttl=5000, distributor.distance=2, enable.value.headers=false, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, schema.registry.url=http://localhost:8082, environment=local, system=platform-test-standalone, distributor.timeout=20000, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, tenant=axual} 2022-03-04 14:51:59.812 INFO 57887 --- [ main] i.a.c.p.s.discovery.DiscoverySubscriber : Received new DiscoveryResult for SwitchingProducer: DiscoveryResult: {group.id.pattern={tenant}-{instance}-{environment}-{group}, acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, topic.pattern={tenant}-{instance}-{environment}-{topic}, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ttl=5000, distributor.distance=2, enable.value.headers=false, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, schema.registry.url=http://localhost:8082, environment=local, system=platform-test-standalone, distributor.timeout=20000, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, tenant=axual} 2022-03-04 14:51:59.813 INFO 57887 --- [ main] i.a.c.p.s.discovery.DiscoverySubscriber : Switching SwitchingProducer from null to clusterA 2022-03-04 14:51:59.813 INFO 57887 --- [ main] i.a.c.p.s.g.BaseClientProxySwitcher : Creating new backing producer with Discovery API result: DiscoveryResult: {group.id.pattern={tenant}-{instance}-{environment}-{group}, acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, topic.pattern={tenant}-{instance}-{environment}-{topic}, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, ttl=5000, distributor.distance=2, enable.value.headers=false, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, schema.registry.url=http://localhost:8082, environment=local, system=platform-test-standalone, distributor.timeout=20000, group.id.resolver=io.axual.common.resolver.GroupPatternResolver, tenant=axual} 2022-03-04 14:51:59.813 INFO 57887 --- [ main] i.a.c.p.s.producer.ProducerSwitcher : Creating a new producer with properties: {acl.principal.builder=io.axual.security.principal.BasicAclPrincipalBuilder, cluster=clusterA, instance=example, reconnect.backoff.ms=1000, topic.pattern={tenant}-{instance}-{environment}-{topic}, bootstrap.servers=localhost:8084, topic.resolver=io.axual.common.resolver.TopicPatternResolver, retry.backoff.ms=1000, buffer.memory=33554432, key.serializer=io.axual.serde.avro.SpecificAvroSerializer, ssl.keystore.type=JKS, schema.registry.url=http://localhost:8082, endpoint=http://127.0.0.1:8081, ssl.key.password=[hidden], ssl.truststore.password=[hidden], max.in.flight.requests.per.connection=5, tenant=axual, client.id=producer-io.axual.example.client.avro.producer-1, ssl.endpoint.identification.algorithm=, group.id.pattern={tenant}-{instance}-{environment}-{group}, ssl.protocol=TLSv1.3, ssl.enabled.protocols=[TLSv1.2, TLSv1.3], acks=-1, batch.size=16834, ssl.keystore.location=/Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples/spring-examples/axual-client-spring-examples/axual-spring-kafka-avro-producer/target/classes/client-certs/axual.client.keystore.jks, transactional.id.resolver=io.axual.common.resolver.TransactionalIdPatternResolver, app.id=io.axual.example.client.avro.producer, ttl=5000, distributor.distance=2, ssl.truststore.type=JKS, enable.value.headers=false, app.version=1.0.0, security.protocol=SSL, transactional.id.pattern={tenant}-{environment}-{app.id}-{transactional.id}, retries=0, environment=local, value.serializer=io.axual.serde.avro.SpecificAvroSerializer, ssl.truststore.location=/Users/shalabh/IdeaProjects/axual-projects/clients-examples/client-java-examples/spring-examples/axual-client-spring-examples/axual-spring-kafka-avro-producer/target/classes/client-certs/axual.client.truststore.jks, system=platform-test-standalone, distributor.timeout=20000, ssl.keystore.password=[hidden], group.id.resolver=io.axual.common.resolver.GroupPatternResolver, internal.auto.downgrade.txn.commit=true, linger.ms=100} 2022-03-04 14:51:59.849 INFO 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: ... 2022-03-04 14:51:59.861 INFO 57887 --- [ main] i.c.k.s.KafkaAvroSerializerConfig : KafkaAvroSerializerConfig values: ... 2022-03-04 14:51:59.959 INFO 57887 --- [ main] i.c.k.s.KafkaAvroSerializerConfig : KafkaAvroSerializerConfig values: ... 2022-03-04 14:51:59.960 INFO 57887 --- [ main] i.c.k.s.KafkaAvroSerializerConfig : KafkaAvroSerializerConfig values: ... 2022-03-04 14:52:00.001 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'acl.principal.builder' was supplied but isn't a known config. 2022-03-04 14:52:00.002 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'cluster' was supplied but isn't a known config. 2022-03-04 14:52:00.003 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'instance' was supplied but isn't a known config. 2022-03-04 14:52:00.003 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'topic.pattern' was supplied but isn't a known config. 2022-03-04 14:52:00.003 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'schema.registry.url' was supplied but isn't a known config. 2022-03-04 14:52:00.003 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'endpoint' was supplied but isn't a known config. 2022-03-04 14:52:00.003 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'tenant' was supplied but isn't a known config. 2022-03-04 14:52:00.003 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'group.id.pattern' was supplied but isn't a known config. 2022-03-04 14:52:00.003 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'transactional.id.resolver' was supplied but isn't a known config. 2022-03-04 14:52:00.003 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'app.id' was supplied but isn't a known config. 2022-03-04 14:52:00.004 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'ttl' was supplied but isn't a known config. 2022-03-04 14:52:00.004 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'distributor.distance' was supplied but isn't a known config. 2022-03-04 14:52:00.004 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'enable.value.headers' was supplied but isn't a known config. 2022-03-04 14:52:00.004 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'app.version' was supplied but isn't a known config. 2022-03-04 14:52:00.005 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'transactional.id.pattern' was supplied but isn't a known config. 2022-03-04 14:52:00.005 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'environment' was supplied but isn't a known config. 2022-03-04 14:52:00.005 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'system' was supplied but isn't a known config. 2022-03-04 14:52:00.005 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'distributor.timeout' was supplied but isn't a known config. 2022-03-04 14:52:00.005 WARN 57887 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'group.id.resolver' was supplied but isn't a known config. 2022-03-04 14:52:00.007 INFO 57887 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.7.1 2022-03-04 14:52:00.007 INFO 57887 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 61dbce85d0d41457 2022-03-04 14:52:00.007 INFO 57887 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1646401920005 2022-03-04 14:52:00.009 INFO 57887 --- [ main] i.a.c.p.s.g.BaseClientProxySwitcher : Created new backing producer 2022-03-04 14:52:00.476 INFO 57887 --- [avro.producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-io.axual.example.client.avro.producer-1] Cluster ID: ttKdVMqWRd-uWOHsliPu4Q 2022-03-04 14:52:00.584 INFO 57887 --- [ionShutdownHook] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-io.axual.example.client.avro.producer-1] Closing the Kafka producer with timeoutMillis = 86399913600000 ms. 2022-03-04 14:52:00.787 INFO 57887 --- [avro.producer-1] i.a.c.examples.AvroProducerApplication : Successfully produced message ProducerRecord(topic=avro-applicationlog, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key={"name": "axual speaks spring", "version": "1.0.0", "owner": "none"}, value={"timestamp": 1646401918900, "source": {"name": "Axual ❤️ Spring", "version": "1.0.0", "owner": "none"}, "context": {"Some key": "Some Value"}, "level": "INFO", "message": "Sending a message!"}, timestamp=null) on partition 6, offset 0 in 311 milliseconds 2022-03-04 14:52:00.801 INFO 57887 --- [ionShutdownHook] org.apache.kafka.common.metrics.Metrics : Metrics scheduler closed 2022-03-04 14:52:00.801 INFO 57887 --- [ionShutdownHook] org.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter 2022-03-04 14:52:00.801 INFO 57887 --- [ionShutdownHook] org.apache.kafka.common.metrics.Metrics : Metrics reporters closed 2022-03-04 14:52:00.802 INFO 57887 --- [ionShutdownHook] o.a.kafka.common.utils.AppInfoParser : App info kafka.producer for producer-io.axual.example.client.avro.producer-1 unregistered Process finished with exit code 0
This is all the coding required to make a successful produce happen.
Next Step: Consuming Data Using A Java Consumer Application
In the next step you will create a Java consumer application to use the data you just produced.
Proceed to Step 6: Consuming Data (Java Spring Client)