Axual Streams

AxualStreams<K, V> (Class)

Package: io.axual.streams

AxualStreams provides a wrapper around Kafka Streams.

Known Limitations

When using Axual Streams there is some things that need to be considered prior to starting a Topology.

Using KTable

To make a KTable from a stream, we first need to create change-log stream (naming should follow the convention: <application_name>-<store_name>-changelog). This changelog stream is created under the hood in a plain Kafka streams application, however in the Axual platform, registration of streams is restricted to data owners and handled via the Self Service.

Generate topology

In order to get started with AxualStreams, the first step is to create a Topology object that can be plugged into a StreamRunner. The following example illustrates a simple indicative TopologyFactory object that aggregates input messages based on key.

A method that creates the TopologyFactory can be generated as follows:

private TopologyFactory buildTopologyFactory(final AxualSerde<String> keySerde,
                                             final AxualSerde<String> valueSerde) {  (1)
    final Reducer<String> reducer = (value1, value2) -> value1 + ":" + value2;

    return builder -> {
        builder
                .stream(FROM_TOPIC, Consumed.with(keySerde, valueSerde))  (2)
                .peek((key, value) -> log.info("Incoming message <k, v>: <{}, {}>", key, value))  (3)
                .selectKey((key, value) -> {
                    final String[] parts = key.split(":");
                    if (parts.length != 3) {
                        return key;
                    }
                    return parts[0] + ":" + parts[1];
                })  (4)
                .groupByKey(Grouped.with(keySerde, valueSerde))
                .reduce(reducer, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(STORE_NAME)
                        .withKeySerde(keySerde)
                        .withValueSerde(valueSerde))
                .toStream()
                .peek((key, value) -> log.info("Outgoing message: key: {} value: {}", key, value))
                .to(TO_TOPIC, Produced.with(keySerde, valueSerde));  (5)
        return builder.build();
    };
}
1 Provide the serializers that are necessary for the output stream
2 Input topic name
3 Logs all incoming messages
4 Filtering
5 Define the output stream

Start the streaming application

With a defined topology, what’s left is to plug it into the Axual StreamRunner

try (final AxualSerde<String> keySerde = new AxualSerde<>(configs, true);
     final AxualSerde<String> valueSerde = new AxualSerde<>(configs, false)) {
    final StreamRunnerConfig aggregatorConfig = StreamRunnerConfig.builder()
            .setDeliveryStrategy(AT_LEAST_ONCE)  (1)
            .setUncaughtExceptionHandler(new DefaultHandlerFactory())  (2)
            .setDefaultKeySerde(Serdes.String())
            .setDefaultValueSerde(Serdes.String())
            .setTopologyFactory(buildTopologyFactory(keySerde, valueSerde))  (3)
            .build();

    StreamRunner aggregator = streamsClient.buildStreamRunner(aggregatorConfig);  (4)
    aggregator.start();  (5)
} catch (Exception e) {
    ...
}
1 Define the delivery strategy as defined in producer DeliveryStrategy.
2 Provide the other mandatory configs.
3 Build the topology and plug it into the StreamRunnerConfig object topology .
4 Build the StreamRunner object.
5 Start the streaming application