Axual Kafka Proxy
Overview
The Kafka Proxy implementation provides a simple way to migrate from a pure Kafka implementation to
the Axual platform. It enables a full fledged Axual platform functionality with minimal code
wise effort limited to replacing of some key Kafka components and imports with their Axual
counterparts.
In order to achieve full Axual support, the developer needs to provide the necessary proxies through
configuration.
Proxy Chaining
The functionality provided by Axual Client can be plugged into the Producer or Consumer
configuration keyed as CHAIN_CONFIG
(package io.axual.client.proxy.axual.producer
or
io.axual.client.proxy.axual.consumer
respectively).
The value of the configuration will be a io.axual.client.proxy.generic.registry.ProxyChain
object which provides a builder for every axual proxy
the user needs, given a collection of proxy ids (io.axual.client.proxy.generic.registry.ProxyTypeRegistry
).
The available proxies are summarized below.
Header Proxy
Enabled by providing the Header Proxy id (io.axual.client.proxy.generic.registry.HEADER_PROXY_ID
).
Adds the Axual platform headers which are used internally.
On the Axual platform this header should be enabled.
Logging Proxy
Enabled by providing the Logging Proxy id (io.axual.client.proxy.generic.registry.LOGGING_PROXY_ID
).
Logging writes intermediate calls to a log.
Is helpful for debugging purposes.
Resolving Proxy
Enabled by providing the Switching Proxy id (io.axual.client.proxy.generic.registry.RESOLVING_PROXY_ID
).
This proxy facilitates multi-tenancy and multiple environments.
Switching Proxy
Enabled by providing the Switching Proxy id (io.axual.client.proxy.generic.registry.SWITCHING_PROXY_ID
).
This adds the switching functionality, enabling users to facilitate the active-active Axual
configuration.
This covers cases where a cluster goes down and internally handles this case by creating a new
Producer or Consumer pointed at a cluster that is available.
Examples
Consumer Example
Map<String, Object> config = new HashMap<>(); (1)
config.put(CHAIN_CONFIG, ProxyChain.newBuilder() (2)
.append(SWITCHING_PROXY_ID)
.append(RESOLVING_PROXY_ID)
.append(HEADER_PROXY_ID)
.build());
config.put(CommonConfig.APPLICATION_ID, APPLICATION_ID); (3)
config.put(CommonConfig.APPLICATION_VERSION, APPLICATION_VERSION);
config.put(CommonConfig.TENANT, TENANT);
config.put(CommonConfig.ENVIRONMENT, ENVIRONMENT);
config.put(BOOTSTRAP_SERVERS_CONFIG, ENDPOINT); (4)
config.put(SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL); (5)
config.put(SslConfigs..., ...);
config.put(KEY_DESERIALIZER_CLASS_CONFIG, new AxualDerializer<Application>()); (6)
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, new AxualDesializer<ApplicationLogEvent>());
config.put(AUTO_OFFSET_RESET_CONFIG, "earliest"); (7)
AxualConsumer consumer = new AxualConsumer(config); (8)
1 | Create a configuration map as you would on a Kafka Consumer. |
2 | Add the desired Axual Proxies. |
3 | Add the rest of the required Axual properties |
4 | Add the Discovery API endpoint. This will be used to obtain the Bootstrap servers plugin. |
5 | Add the necessary SSL configurations. |
6 | Add the serializers for key and value. |
7 | Add any other Kafka Consumer Configuration entry necessary. |
8 | Initialize the consumer the same way you would create a Kafka Consumer. |
Producer Example
Map<String, Object> config = new HashMap<>(); (1)
config.put(CHAIN_CONFIG, ProxyChain.newBuilder() (2)
.append(SWITCHING_PROXY_ID)
.append(RESOLVING_PROXY_ID)
.append(HEADER_PROXY_ID)
.build());
config.put(CommonConfig.APPLICATION_ID, APPLICATION_ID); (3)
config.put(CommonConfig.APPLICATION_VERSION, APPLICATION_VERSION);
config.put(CommonConfig.TENANT, TENANT);
config.put(CommonConfig.ENVIRONMENT, ENVIRONMENT);
config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, ENDPOINT); (4)
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL); (5)
config.put(SslConfigs..., ...);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, WrappedSerializerInstance.class.getCanonicalName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, WrappedSerializerInstance.class.getCanonicalName()); (6)
config.put(ProducerConfig.ACKS_CONFIG, "-1"); (7)
config.put(ProducerConfig...., ...);
Producer<Application, ApplicationLogEvent> producer = new AxualProducer<>(configs); (8)
1 | Create a configuration map as you would on a Kafka Producer. |
2 | Add the desired Axual Proxies. |
3 | Add the rest of the required Axual properties |
4 | Add the Discovery API endpoint. This will be used to obtain the Bootstrap servers plugin. |
5 | Add the necessary SSL configurations. |
6 | Add the serializers for key and value. |
7 | Add any other Kafka Producer Configuration entry necessary. |
8 | Initialize the producer the same way you would create a Kafka Producer. |