Axual Kafka Proxy

Overview

The Kafka Proxy implementation provides a simple way to migrate from a pure Kafka implementation to the Axual platform. It enables a full fledged Axual platform functionality with minimal code wise effort limited to replacing of some key Kafka components and imports with their Axual counterparts.
In order to achieve full Axual support, the developer needs to provide the necessary proxies through configuration.

Proxy Chaining

The functionality provided by axual client can be plugged into the Producer or Consumer configuration keyed as CHAIN_CONFIG (package io.axual.client.proxy.axual.producer or io.axual.client.proxy.axual.consumer respectively).
The value of the configuration will be a io.axual.client.proxy.generic.registry.ProxyChain object which provides a builder for every axual proxy the user needs, given a collection of proxy ids (io.axual.client.proxy.generic.registry.ProxyTypeRegistry). The available proxies are summarized below.

Header Proxy

Enabled by providing the Header Proxy id (io.axual.client.proxy.generic.registry.HEADER_PROXY_ID).
Adds the Axual platform headers which are used internally. On the Axual platform this header should be enabled.

Logging Proxy

Enabled by providing the Logging Proxy id (io.axual.client.proxy.generic.registry.LOGGING_PROXY_ID).
Logging writes intermediate calls to a log. Is helpful for debugging purposes.

Resolving Proxy

Enabled by providing the Switching Proxy id (io.axual.client.proxy.generic.registry.RESOLVING_PROXY_ID).
This proxy facilitates multi-tenancy and multiple environments.

Switching Proxy

Enabled by providing the Switching Proxy id (io.axual.client.proxy.generic.registry.SWITCHING_PROXY_ID).
This adds the switching functionality, enabling users to facilitate the active-active Axual configuration. This covers cases where a cluster goes down and internally handles this case by creating a new Producer or Consumer pointed at a cluster that is available.

Examples

Consumer Example

Map<String, Object> config = new HashMap<>(); (1)

config.put(CHAIN_CONFIG, ProxyChain.newBuilder() (2)
        .append(SWITCHING_PROXY_ID)
        .append(RESOLVING_PROXY_ID)
        .append(HEADER_PROXY_ID)
        .build());
config.put(CommonConfig.APPLICATION_ID, APPLICATION_ID); (3)
config.put(CommonConfig.APPLICATION_VERSION, APPLICATION_VERSION);
config.put(CommonConfig.TENANT, TENANT);
config.put(CommonConfig.ENVIRONMENT, ENVIRONMENT);

config.put(BOOTSTRAP_SERVERS_CONFIG, ENDPOINT); (4)

config.put(SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL); (5)
config.put(SslConfigs..., ...);

config.put(KEY_DESERIALIZER_CLASS_CONFIG, new AxualDerializer<Application>()); (6)
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, new AxualDesializer<ApplicationLogEvent>());
config.put(AUTO_OFFSET_RESET_CONFIG, "earliest"); (7)

AxualConsumer consumer = new AxualConsumer(config); (8)
1 Create a configuration map as you would on a Kafka Consumer.
2 Add the desired Axual Proxies.
3 Add the rest of the required Axual properties
4 Add the Discovery API endpoint. This will be used to obtain the Bootstrap servers plugin.
5 Add the necessary SSL configurations.
6 Add the serializers for key and value.
7 Add any other Kafka Consumer Configuration entry necessary.
8 Initialize the consumer the same way you would create a Kafka Consumer.

Producer Example

Map<String, Object> config = new HashMap<>(); (1)

config.put(CHAIN_CONFIG, ProxyChain.newBuilder() (2)
        .append(SWITCHING_PROXY_ID)
        .append(RESOLVING_PROXY_ID)
        .append(HEADER_PROXY_ID)
        .build());
config.put(CommonConfig.APPLICATION_ID, APPLICATION_ID);  (3)
config.put(CommonConfig.APPLICATION_VERSION, APPLICATION_VERSION);
config.put(CommonConfig.TENANT, TENANT);
config.put(CommonConfig.ENVIRONMENT, ENVIRONMENT);

config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, ENDPOINT); (4)

config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL); (5)
config.put(SslConfigs..., ...);

config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, WrappedSerializerInstance.class.getCanonicalName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, WrappedSerializerInstance.class.getCanonicalName()); (6)
config.put(ProducerConfig.ACKS_CONFIG, "-1"); (7)
config.put(ProducerConfig...., ...);

Producer<Application, ApplicationLogEvent> producer = new AxualProducer<>(configs); (8)
1 Create a configuration map as you would on a Kafka Producer.
2 Add the desired Axual Proxies.
3 Add the rest of the required Axual properties
4 Add the Discovery API endpoint. This will be used to obtain the Bootstrap servers plugin.
5 Add the necessary SSL configurations.
6 Add the serializers for key and value.
7 Add any other Kafka Producer Configuration entry necessary.
8 Initialize the producer the same way you would create a Kafka Producer.