Axual Streams
AxualStreams<K, V> (Class)
Package: io.axual.streams
AxualStreams provides a wrapper around Kafka Streams.
Known Limitations
When using Axual Streams there is some things that need to be considered prior to starting a Topology.
Using KTable
To make a KTable from a stream, we first need to create change-log stream (naming should follow
the convention: <application_name>-<store_name>-changelog
).
This changelog stream is created under the hood in a plain Kafka streams application, however in the
Axual platform, registration of streams is restricted to data owners and handled via the Self
Service.
Generate topology
In order to get started with AxualStreams, the first step is to create a Topology object that can
be plugged into a StreamRunner.
The following example illustrates a simple indicative TopologyFactory
object that aggregates input
messages based on key.
A method that creates the TopologyFactory
can be generated as follows:
private TopologyFactory buildTopologyFactory(final AxualSerde<String> keySerde,
final AxualSerde<String> valueSerde) { (1)
final Reducer<String> reducer = (value1, value2) -> value1 + ":" + value2;
return builder -> {
builder
.stream(FROM_TOPIC, Consumed.with(keySerde, valueSerde)) (2)
.peek((key, value) -> log.info("Incoming message <k, v>: <{}, {}>", key, value)) (3)
.selectKey((key, value) -> {
final String[] parts = key.split(":");
if (parts.length != 3) {
return key;
}
return parts[0] + ":" + parts[1];
}) (4)
.groupByKey(Grouped.with(keySerde, valueSerde))
.reduce(reducer, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(STORE_NAME)
.withKeySerde(keySerde)
.withValueSerde(valueSerde))
.toStream()
.peek((key, value) -> log.info("Outgoing message: key: {} value: {}", key, value))
.to(TO_TOPIC, Produced.with(keySerde, valueSerde)); (5)
return builder.build();
};
}
1 | Provide the serializers that are necessary for the output stream |
2 | Input topic name |
3 | Logs all incoming messages |
4 | Filtering |
5 | Define the output stream |
Start the streaming application
With a defined topology, what’s left is to plug it into the Axual StreamRunner
try (final AxualSerde<String> keySerde = new AxualSerde<>(configs, true);
final AxualSerde<String> valueSerde = new AxualSerde<>(configs, false)) {
final StreamRunnerConfig aggregatorConfig = StreamRunnerConfig.builder()
.setDeliveryStrategy(AT_LEAST_ONCE) (1)
.setUncaughtExceptionHandler(new DefaultHandlerFactory()) (2)
.setDefaultKeySerde(Serdes.String())
.setDefaultValueSerde(Serdes.String())
.setTopologyFactory(buildTopologyFactory(keySerde, valueSerde)) (3)
.build();
StreamRunner aggregator = streamsClient.buildStreamRunner(aggregatorConfig); (4)
aggregator.start(); (5)
} catch (Exception e) {
...
}
1 | Define the delivery strategy as defined in producer DeliveryStrategy . |
2 | Provide the other mandatory configs. |
3 | Build the topology and plug it into the StreamRunnerConfig object topology . |
4 | Build the StreamRunner object. |
5 | Start the streaming application |