Distribution Model

Axual Distribution model

Distribution model

For efficiency reasons, messages are not distributed from every cluster to every other cluster. Instead, the concept of logical “levels” is used. On every cluster, multiple distributor instances are running, one per level. Every distributor will distribute to other instances on the same level. The levels are laid out like a binary tree, with Kafka clusters as the leaf nodes; the root and intermediate nodes are only conceptual and do not correspond directly with physical components.

See the example below: in this case there are four Kafka clusters, and the binary tree has two logical levels. The levels (3 and 1 in this case) are somewhat arbitrary but are used in the distribution algorithm explained below.

model

Copy flags and algorithm

The main goal of the distribution algorithm is to prevent message distribution loops; each message should be distributed to each cluster once. To achieve this, a custom header is added with so-called “copy flags”; this is a bit mask with each bit indicating if distribution already happened for that level. On any cluster, when a message comes in, each distributor examines its own copy flag bit and all the lower order bits. If any of the flags is set to 1, the distributor will decide that distribution already happened for its level and will not distribute again. Distributors distribute either to a single cluster, or to a subtree (see the picture above); in the latter case the target cluster can be any leaf of the subtree. This allows the operator to switch to another target cluster in the subtree if the original target becomes unavailable or is decommissioned.

Fresh messages coming in might not have the header; in this case the copy flags will be considered to be all zeroes.

copyflags

Consider the clusters above, corresponding to the logical tree shown earlier; each cluster has two distributors running, one for level 1 and one for level 3. Assuming that a message arrives at cluster 1, the following sequence of events happens:

  1. both distributors on cluster 1 pick up the message, assuming the copy flags to be 0000. Both will mask all zeroes with their respective masks and arrive at zero, meaning that they have work to do. The level 1 distributor will copy to cluster 2, setting the flag to 0001; the level 3 will copy to cluster 3 and set the flag to 0100.

  2. on cluster 2, both distributors see the message arrive with copyflag 0001. The level 1 distributor will mask this with mask 0001 and arrive at a non-zero result, indicating no work to be done. Similarly the level 3 distributor will mask 0001 with 0111 and also arrive at a non-zero result. Neither distributor copies the message.

  3. in the meantime on cluster 3, both distributors will see a message arrive with copyflag 0100. The level 3 distributor masks this with 0111, arrives at a nonzero and does not copy further. The level 1 distributor masks 0100 with 0001: it copies the message to cluster 4, adding its own bit to the copyflags, setting them to 0101.

  4. Both distributors on cluster 4 see a message arrive with copyflag 0101. They mask this with 0100 and 0001 respectively, and both arrive at a non-zero result. No further copying happens.

Unbalanced trees

It is also possible to have an unbalanced tree, as pictured in the diagram below.

unbalanced

The algorithm works the same in this case, with more distributors present.

it’s important to be aware that on any cluster (leaf node) in the diagram there is one distributor running for every level above that cluster in the distribution tree. In the diagram above, Cluster 2 and Cluster 3 would have 3 distributor instances running: level 1, level 2 and level 3. Cluster 1 would have level 2 and level 3 distributors running, and Cluster 4 and Cluster 5 will have levels 1 and 3.

Distributing offsets

A special case is the distribution of consumer offsets, stored in the __consumer_offsets topic. Consider the topic partition below; a consumer has been following along with the topic, committing message batches, and is now at offset 35. Due to the topic’s retention settings, messages 1 through 24 have been deleted from disk.

offsets

Suppose a new cluster is brought online; then the distributor for the topic would replicate messages 25 trough 35 to the new cluster, but offset distribution would send offset 35 which does not exist on the target cluster (since it never saw messages 1-24 at all).

The solution is to follow the offset commits, and for the latest commit retrieve the timestamp of the corresponding message. This timestamp is then distributed (to the new cluster in this case). There are two components involved: on the originating cluster the offsetdistributor is responsible for reading the last committed offset, translating that to a time stamp (by looking up the corresponding message in the topic partition) and distributing this. This is then sent as a map, with key topic/partition/consumer ID and as value the last committed timestamp. On the receiving cluster, the offsetcommitter reads the timestamp and tells Kafka to set the partition offset to the message with this timestamp.

to guarantee at least once delivery we need to allow for the time needed for processing and offset distribution. For that reason we don’t distribute the exact timestamp, but keep a configurable margin (default 60 seconds). This means that when a cluster switch is effected, the consumer group on the new cluster may see some messages that were already processed on the “old” cluster.
offset2

To make sure offsets are distributed everywhere, and to prevent distribution “loops”, copyflags are used as described before. One important difference is that on the __consumer_offsets topic access to the headers is limited; therefore the copy flags are stored in the message metadata.

Schema Distribution model

Internally the schema registry uses a Kafka topic to store schema registrations. Schema’s are added based on subject name, and are stored internally with a monotonous increasing ID if the schema is not present yet. If the schema is already present (presumably for a different subject), then the new subject is simply mapped to the existing ID. In the case of Axual, there is an extra layer of indirection since the topic names tenants see in the UI are internally mapped to technical names; topic name mytopic internally is tenant-instance-environment-mytopic.

All of this results in the constraint that schema registrations can only be updated on the primary cluster of the instance; this is the cluster where the instance API is running. This is the only cluster where the schema distributor is running, so there is no distribution tree like other distributors, and no copy flags are used.

Instead, schema distributor is configured with its own internal subject format, and all target clusters with their subject format. When a schema registration is distributed, the distributor will handle subject translation. For example, on one cluster the subject naming might be structured as [tenant|instance|environment]-topic while on another cluster it might simply be [tenant|environment]-topic, while users of the system just see the logical name [topic]. The schema used should still be the same and have the same ID in both clusters.

Schema Distribution with Cluster 1 as primary cluster
Figure 1. Schema Distribution with Cluster 1 as primary cluster