Distribution Model
Axual Distribution model
Distribution model
For efficiency reasons, messages are not distributed from every cluster to every other cluster. Instead, the concept of logical “levels” is used. On every cluster, multiple distributor instances are running, one per level. Every distributor will distribute to other instances on the same level. The levels are laid out like a binary tree, with Kafka clusters as the leaf nodes; the root and intermediate nodes are only conceptual and do not correspond directly with physical components.
See the example below: in this case there are four Kafka clusters, and the binary tree has two logical levels. The levels (3 and 1 in this case) are somewhat arbitrary but are used in the distribution algorithm explained below.
hide empty members hide class circle hide interface circle skinparam minClassWidth 80 interface "Level 3" #PaleGreen interface l1a as "Level 1" #PaleGreen interface l1b as "Level 1" #PaleGreen "Level 3" <-- l1a "Level 3" <-- l1b l1a <-- "Cluster 1" l1a <-- "Cluster 2" l1b <-- "Cluster 3" l1b <-- "Cluster 4" "Cluster 1" .> "Cluster 2" "Cluster 1" .> "Cluster 3" "Cluster 3" .> "Cluster 4"
Copy flags and algorithm
The main goal of the distribution algorithm is to prevent message distribution loops; each message should be distributed to each cluster once. To achieve this, a custom header is added with so-called “copy flags”; this is a bit mask with each bit indicating if distribution already happened for that level. On any cluster, when a message comes in, each distributor examines its own copy flag bit and all the lower order bits. If any of the flags is set to 1, the distributor will decide that distribution already happened for its level and will not distribute again. Distributors distribute either to a single cluster, or to a subtree (see the picture above); in the latter case the target cluster can be any leaf of the subtree. This allows the operator to switch to another target cluster in the subtree if the original target becomes unavailable or is decommissioned.
Fresh messages coming in might not have the header; in this case the copy flags will be considered to be all zeroes.
together {
node "Cluster 4" {
    Component l81 as "Level 1\n mask 0001"
    Component l311 as "Level 3\nmask 0111"
}
node "Cluster 3" {
    Component l82 as "Level 1\n mask 0001"
    Component l312 as "Level 3\nmask 0111"
}
node "Cluster 2" {
    Component l83 as "Level 1\n mask 0001"
    Component l313 as "Level 3\nmask 0111"
}
node "Cluster 1" {
    Component l84 as "Level 1\n mask 0001"
    Component l314 as "Level 3\nmask 0111"
}
Consider the clusters above, corresponding to the logical tree shown earlier; each cluster has two distributors running, one for level 1 and one for level 3. Assuming that a message arrives at cluster 1, the following sequence of events happens:
- 
both distributors on cluster 1 pick up the message, assuming the copy flags to be 0000. Both will mask all zeroes with their respective masks and arrive at zero, meaning that they have work to do. The level 1 distributor will copy to cluster 2, setting the flag to0001; the level 3 will copy to cluster 3 and set the flag to0100.
- 
on cluster 2, both distributors see the message arrive with copyflag 0001. The level 1 distributor will mask this with mask0001and arrive at a non-zero result, indicating no work to be done. Similarly the level 3 distributor will mask0001with0111and also arrive at a non-zero result. Neither distributor copies the message.
- 
in the meantime on cluster 3, both distributors will see a message arrive with copyflag 0100. The level 3 distributor masks this with0111, arrives at a nonzero and does not copy further. The level 1 distributor masks0100with0001: it copies the message to cluster 4, adding its own bit to the copyflags, setting them to0101.
- 
Both distributors on cluster 4 see a message arrive with copyflag 0101. They mask this with0100and0001respectively, and both arrive at a non-zero result. No further copying happens.
Unbalanced trees
It is also possible to have an unbalanced tree, as pictured in the diagram below.
hide empty members hide class circle hide interface circle skinparam minClassWidth 80 interface "Level 3" #PaleGreen interface "Level 2" #PaleGreen interface l1a as "Level 1" #PaleGreen interface l1b as "Level 1" #PaleGreen 'interface l1c as "Level 1" #PaleGreen 'interface l1d as "Level 1" #PaleGreen "Level 3" <-- "Level 2" "Level 2" <-- "Cluster 1" "Level 2" <-- l1a "Level 3" <--- l1b l1a <-- "Cluster 2" l1a <-- "Cluster 3" l1b <-- "Cluster 4" l1b <-- "Cluster 5"
The algorithm works the same in this case, with more distributors present.
| it’s important to be aware that on any cluster (leaf
node) in the diagram there is one distributor running for every level
above that cluster in the distribution tree. In the diagram above, Cluster 2andCluster 3would have 3 distributor instances
running: level 1, level 2 and level 3.Cluster 1would have level 2
and level 3 distributors running, andCluster 4andCluster 5will have levels 1 and 3. | 
Distributing offsets
A special case is the distribution of consumer offsets, stored in the
__consumer_offsets topic. Consider the topic partition below; a
consumer has been following along with the topic, committing message
batches, and is now at offset 35. Due to the topic’s retention settings,
messages 1 through 24 have been deleted from disk.
hide empty members hide class circle hide interface circle skinparam minClassWidth 80 class m1 as "Message 1\ntimestamp t1" #OrangeRed class m2 as "Message 6\ntimestamp t2" #OrangeRed class m3 as "Message 14\ntimestamp t3" #OrangeRed class m4 as "Message 21\ntimestamp t4" #OrangeRed class m5 as "Message 25\ntimestamp t5" class m6 as "Message 30\ntimestamp t6" class m7 as "Message 35\ntimestamp t7" interface i7 as "Offset 35" #PaleGreen m1 - m2 m2 - m3 m3 - m4 m4 - m5 m5 - m6 m6 - m7 m7 <.. i7
Suppose a new cluster is brought online; then the distributor for the topic would replicate messages 25 trough 35 to the new cluster, but offset distribution would send offset 35 which does not exist on the target cluster (since it never saw messages 1-24 at all).
The solution is to follow the offset commits, and for the latest commit
retrieve the timestamp of the corresponding message. This timestamp is
then distributed (to the new cluster in this case). There are two
components involved: on the originating cluster the
offsetdistributor is responsible for reading the last committed
offset, translating that to a time stamp (by looking up the
corresponding message in the topic partition) and distributing this.
This is then sent as a map, with key topic/partition/consumer ID and
as value the last committed timestamp. On the receiving cluster, the
offsetcommitter reads the timestamp and tells Kafka to set the
partition offset to the message with this timestamp.
| to guarantee at least once delivery we need to allow for the time needed for processing and offset distribution. For that reason we don’t distribute the exact timestamp, but keep a configurable margin (default 60 seconds). This means that when a cluster switch is effected, the consumer group on the new cluster may see some messages that were already processed on the “old” cluster. | 
|Cluster A\nOffsetDistributor| start :read last committed offset (35); :read corresponding message\ntimestamp (t7); :offset timestamp with margin; :create map:\nconsumergroup|topic|partition -> timestamp; :send map; |#AntiqueWhite|Cluster B\nOffsetCommitter| :receive last committed timestamps; :set offset to message at\nor before last timestamp; stop
To make sure offsets are distributed everywhere, and to prevent
distribution “loops”, copyflags are used as described before. One
important difference is that on the __consumer_offsets topic access
to the headers is limited; therefore the copy flags are stored in the
message metadata.
Schema Distribution model
Internally the schema registry uses a Kafka topic to store schema
registrations. Schema’s are added based on subject name, and are stored
internally with a monotonous increasing ID if the schema is not present
yet. If the schema is already present (presumably for a different
subject), then the new subject is simply mapped to the existing ID. In
the case of Axual, there is an extra layer of indirection since the
topic names tenants see in the UI are internally mapped to technical
names; topic name mytopic internally is
tenant-instance-environment-mytopic.
All of this results in the constraint that schema registrations can only be updated on the primary cluster of the instance; this is the cluster where the instance API is running. This is the only cluster where the schema distributor is running, so there is no distribution tree like other distributors, and no copy flags are used.
Instead, schema distributor is configured with its own internal subject
format, and all target clusters with their subject format. When a schema
registration is distributed, the distributor will handle subject
translation. For example, on one cluster the subject naming might be
structured as [tenant|instance|environment]-topic while on another
cluster it might simply be [tenant|environment]-topic, while users
of the system just see the logical name [topic]. The schema used
should still be the same and have the same ID in both clusters.
"Cluster 1" #PaleGreen--> "Cluster 2" "Cluster 1" --> "Cluster 3" "Cluster 1" --> "Cluster 4"