Handling Axual-specific headers
The Problem
When Axual distributes messages between clusters, it adds an Axual-Copy-Flags header to each message to prevent distribution loops. This header uses a bitmask to track which distributor levels have already processed the message.
Why This Causes Issues
When you use Kafka Streams, KSML, or Kafka Connect to process messages:
-
Your application consumes messages (including all their headers)
-
Your application produces new messages, often preserving the original headers
-
These output messages still contain the original
Axual-Copy-Flagsheader -
When the distributor processes these messages, it sees the copy flags are already set
-
The distributor skips these messages, thinking they were "already distributed"
Result: Your processed messages are never distributed to other clusters.
Additional Headers
Besides Axual-Copy-Flags, Axual also adds lineage headers for tracing and debugging purposes. While these don’t break distribution, they can:
-
Become stale/incorrect after intermediate processing
-
Accumulate and affect performance in high-throughput scenarios
-
Cause confusion in lineage tracking
The Solution: Header Cleaning
To solve this problem, Axual provides an interceptor that automatically removes all Axual headers from your messages. The interceptor can be configured on Kafka producers and consumers, and in Kafka Streams applications. It will remove the following headers:
-
Axual-Copy-Flags -
Axual-Message-Id -
Axual-Producer-Id -
Axual-Producer-Version -
Axual-Intermediate-Id -
Axual-Intermediate-Version -
Axual-Serialization-Time -
Axual-Deserialization-Time -
Axual-System -
Axual-Instance -
Axual-Cluster -
Axual-Tenant -
Axual-Environment
Using the cleaning interceptor
Include the dependency
Include the dependency into your Maven project:
<dependency>
<groupId>io.axual.utils</groupId>
<artifactId>axual-header-cleaning</artifactId>
<scope>runtime</scope>
<version>1.0.0</version>
</dependency>
| Check the releases for the correct/latest version number; they can be found at https://mvnrepository.com/artifact/io.axual.utils/axual-header-cleaning |
Configure the interceptor for Kafka producer and consumer
In your producer or consumer configuration, add the following to enable interceptors, or alternatively add the class to an existing list of interceptors.
interceptor.classes=io.axual.utils.headers.cleaning.AxualHeaderCleaningInterceptor
Configure the interceptor for Kafka Streams internal producers or consumers
In your producer/consumer configuration, add the following configuration to enable interceptors, or add the class to an existing list of interceptors.
producer.interceptor.classes=io.axual.utils.headers.cleaning.AxualHeaderCleaningInterceptor
consumer.interceptor.classes=io.axual.utils.headers.cleaning.AxualHeaderCleaningInterceptor