Distribution Model
Axual Distribution model
Distribution model
For efficiency reasons, messages are not distributed from every cluster to every other cluster. Instead, the concept of logical “levels” is used. On every cluster, multiple distributor instances are running, one per level. Every distributor will distribute to other instances on the same level. The levels are laid out like a binary tree, with Kafka clusters as the leaf nodes; the root and intermediate nodes are only conceptual and do not correspond directly with physical components.
See the example below: in this case there are four Kafka clusters, and the binary tree has two logical levels. The levels (3 and 1 in this case) are somewhat arbitrary but are used in the distribution algorithm explained below.
Copy flags and algorithm
The main goal of the distribution algorithm is to prevent message distribution loops; each message should be distributed to each cluster once. To achieve this, a custom header is added with so-called “copy flags”; this is a bit mask with each bit indicating if distribution already happened for that level. On any cluster, when a message comes in, each distributor examines its own copy flag bit and all the lower order bits. If any of the flags is set to 1, the distributor will decide that distribution already happened for its level and will not distribute again. Distributors distribute either to a single cluster, or to a subtree (see the picture above); in the latter case the target cluster can be any leaf of the subtree. This allows the operator to switch to another target cluster in the subtree if the original target becomes unavailable or is decommissioned.
Fresh messages coming in might not have the header; in this case the copy flags will be considered to be all zeroes.
Consider the clusters above, corresponding to the logical tree shown earlier; each cluster has two distributors running, one for level 1 and one for level 3. Assuming that a message arrives at cluster 1, the following sequence of events happens:
-
both distributors on cluster 1 pick up the message, assuming the copy flags to be
0000
. Both will mask all zeroes with their respective masks and arrive at zero, meaning that they have work to do. The level 1 distributor will copy to cluster 2, setting the flag to0001
; the level 3 will copy to cluster 3 and set the flag to0100
. -
on cluster 2, both distributors see the message arrive with copyflag
0001
. The level 1 distributor will mask this with mask0001
and arrive at a non-zero result, indicating no work to be done. Similarly the level 3 distributor will mask0001
with0111
and also arrive at a non-zero result. Neither distributor copies the message. -
in the meantime on cluster 3, both distributors will see a message arrive with copyflag
0100
. The level 3 distributor masks this with0111
, arrives at a nonzero and does not copy further. The level 1 distributor masks0100
with0001
: it copies the message to cluster 4, adding its own bit to the copyflags, setting them to0101
. -
Both distributors on cluster 4 see a message arrive with copyflag
0101
. They mask this with0100
and0001
respectively, and both arrive at a non-zero result. No further copying happens.
Unbalanced trees
It is also possible to have an unbalanced tree, as pictured in the diagram below.
The algorithm works the same in this case, with more distributors present.
it’s important to be aware that on any cluster (leaf
node) in the diagram there is one distributor running for every level
above that cluster in the distribution tree. In the diagram above,
Cluster 2 and Cluster 3 would have 3 distributor instances
running: level 1, level 2 and level 3. Cluster 1 would have level 2
and level 3 distributors running, and Cluster 4 and Cluster 5
will have levels 1 and 3.
|
Distributing offsets
A special case is the distribution of consumer offsets, stored in the
__consumer_offsets
topic. Consider the topic partition below; a
consumer has been following along with the topic, committing message
batches, and is now at offset 35. Due to the topic’s retention settings,
messages 1 through 24 have been deleted from disk.
Suppose a new cluster is brought online; then the distributor for the topic would replicate messages 25 trough 35 to the new cluster, but offset distribution would send offset 35 which does not exist on the target cluster (since it never saw messages 1-24 at all).
The solution is to follow the offset commits, and for the latest commit
retrieve the timestamp of the corresponding message. This timestamp is
then distributed (to the new cluster in this case). There are two
components involved: on the originating cluster the
offsetdistributor
is responsible for reading the last committed
offset, translating that to a time stamp (by looking up the
corresponding message in the topic partition) and distributing this.
This is then sent as a map, with key topic/partition/consumer ID
and
as value the last committed timestamp. On the receiving cluster, the
offsetcommitter
reads the timestamp and tells Kafka to set the
partition offset to the message with this timestamp.
to guarantee at least once delivery we need to allow for the time needed for processing and offset distribution. For that reason we don’t distribute the exact timestamp, but keep a configurable margin (default 60 seconds). This means that when a cluster switch is effected, the consumer group on the new cluster may see some messages that were already processed on the “old” cluster. |
To make sure offsets are distributed everywhere, and to prevent
distribution “loops”, copyflags are used as described before. One
important difference is that on the __consumer_offsets
topic access
to the headers is limited; therefore the copy flags are stored in the
message metadata.
Schema Distribution model
Internally the schema registry uses a Kafka topic to store schema
registrations. Schema’s are added based on subject name, and are stored
internally with a monotonous increasing ID if the schema is not present
yet. If the schema is already present (presumably for a different
subject), then the new subject is simply mapped to the existing ID. In
the case of Axual, there is an extra layer of indirection since the
topic names tenants see in the UI are internally mapped to technical
names; topic name mytopic
internally is
tenant-instance-environment-mytopic
.
All of this results in the constraint that schema registrations can only be updated on the primary cluster of the instance; this is the cluster where the instance API is running. This is the only cluster where the schema distributor is running, so there is no distribution tree like other distributors, and no copy flags are used.
Instead, schema distributor is configured with its own internal subject
format, and all target clusters with their subject format. When a schema
registration is distributed, the distributor will handle subject
translation. For example, on one cluster the subject naming might be
structured as [tenant|instance|environment]-topic
while on another
cluster it might simply be [tenant|environment]-topic
, while users
of the system just see the logical name [topic]
. The schema used
should still be the same and have the same ID in both clusters.