A distributed message queue implemented in C, modelling a very simplified version of Kafka. This distributed system uses a custom application-layer network protocol called DMQP and Apache ZooKeeper for distributed consensus, partition discovery, and metadata.
- Partitions: Topics, Sharding, Replication
- Distributed: Consensus, Metadata, Locking, Leader Election
- Networking: DMQP (Custom Application-Layer Protocol)
- Reliability: Ordering, Atomicity, Security, Fault Tolerance
Partitions are nodes that store queues and host DMQP servers. Once partitions boot, they are registered into the distributed system's metadata via ZooKeeper. They are initially marked as free (unused) partitions. This facilitates partition discovery.
When a client creates a new topic, free partitions are allocated for the topic. For instance, if a client creates a new topic with 2 shards and a replication factor of 3, 6 partitions are allocated in total. Each shard is allocated 3 partitions. Different shards store different data, but replicas of the same shard store the same data. Sharding and partitioning provide horizontal scaling and fault tolerance.
Each shard has a partition that is elected as a leader. A write must be replicated by the entire shard's replica set before the request is fully serviced. A read can be serviced by any partition in the shard's replica set.
Data is distributed between shards using Round-Robin.
Here's a walkthrough of a write to a topic:
- User sends a write request to the client with a Topic ID
- Client queries ZooKeeper for the IP addresses and ports of the topic's shards. The client is only interested in partitions that are elected leaders for that shard
- Client applies Round-Robin to distribute data evenly and prevent hotspots
- Data is persisted by the partition both in-memory and on disk
Users can interface with this distributed system using a client defined in
include/api.h.
This system uses the following ZNodes in ZooKeeper:
PERSISTENT: /partitions
PERSISTENT: /partitions/lock
EPHEMERAL & SEQUENTIAL: /partitions/lock/lock-{sequence_id}
PERSISTENT: /partitions/free-count
EPHEMERAL & SEQUENTIAL: /partitions/partition-{sequence_id}
PERSISTENT: /topics
PERSISTENT: /topics/{topic_name}
PERSISTENT: /topics/{topic_name}/sequence-id
PERSISTENT: /topics/{topic_name}/sequence-id/lock
EPHEMERAL & SEQUENTIAL: /topics/{topic_name}/sequence-id/lock/lock-{sequence_id}
PERSISTENT: /topics/{topic_name}/shards
PERSISTENT & SEQUENTIAL: /topics/{topic_name}/shards/shard-{sequence_id}
PERSISTENT: /topics/{topic_name}/shards/shard-{sequence_id}/partitions
PERSISTENT & SEQUENTIAL: /topics/{topic_name}/shards/shard-{sequence_id}/partitions/partition-{sequence_id}
The /partitions parent ZNode is a global list of partitions. The
/partitions/free-count ZNode stores the number of free partitions. Each
partition has a ephemeral, sequential ZNode named
/partitions/partition-{sequence_id}. It stores the IP address and port of the
partition and the topic it is allocated to in the following format:
{partition_ip_addr}:{partition_port} // free or unallocated to topic
{partition_ip_addr}:{partition_port};/topics/{topic_name}/shards/shard-{sequence_id} // allocated to topic shard
The /topics/{topic_name}/sequence-id ZNode stores an integer, representing the
next expected sequence ID of an incoming write to that topic. This facilitates
ordering, which you can read about in the section below.
The /topics/{topic_name}/shards/shard-{sequence_id}/partitions/partition-{sequence_id}
ZNode stores metadata regarding each partition of the given shard. The
partition with the lowest sequence ID is the leader of the replica set. The
ZNode is a pointer to a child of the /partitions parent ZNode by storing the
partition's path.
When a leader node crashes, the node with the following sequence ID is notified
via a watch they set on the leader ZNode. This is achieved by ensuring that
allocated partitions set a watch on the ZNode with the preceding sequence ID,
thus creating a queue of potential leaders. When a node becomes notified, they
become elected the leader.
For instance, let's say the goal is to atomically get and increment the topic's
sequence ID. To do so, a client must create the following sequential, ephemeral
ZNode: /topics/{topic_name}/sequence-id/lock/lock-{sequence_id}.
If the client has the lowest sequence ID of all clients trying to obtain a lock,
it has successfully obtained the lock. Otherwise, it sets a watch on the ZNode
with the preceding sequence ID. When a client finishes its critical section, it
deletes its ZNode, triggering the watch on the next ZNode in the queue to attain
the lock.
This distributed system uses a custom application-layer protocol called DMQP. DMQP stands for Distributed Message Queue Protocol. DMQP uses TCP with persistent connections, leveraging keepalive to preserve resources only for active queue producers and consumers while maximizing throughput. Socket operations in DMQP have a timeout of 30 seconds each.
A DMQP message uses the following format:
+------------------------------------------+
| Sequence ID (4 bytes) |
+------------------------------------------+
| Length (4 bytes) |
+------------------------------------------+
| Method (2 bytes) | Status Code (2 bytes) |
+------------------------------------------+
| Payload (Max 1MB) |
+------------------------------------------+
The Sequence ID header contains the unique sequence number of a queue entry.
This is used for message ordering.
The Length header states the size of the payload in bytes.
The Method header can be one of the following:
DMQP_PUSH
DMQP_POP
DMQP_PEEK_SEQUENCE_ID
DMQP_RESPONSE
DMQP_PUSH and DMQP_POP are self-explanatory. DMQP_PEEK_SEQUENCE_ID returns
the sequence ID of the queue's head entry. DMQP_RESPONSE is specified if the
message is a response to a request.
The Status Code header is a Unix errno.
The Payload contains data to be pushed onto the queue.
All queue operations at the partition level are protected by mutex locks. This guarantees ordering and atomicity at the partition level. All ZooKeeper operations used by the client are atomic.
Each topic has an expected sequence ID for its next message stored in the
/topics/{topic_name}/sequence-id ZNode. When a partition receives a DMQP
message, it expects it to contain the same sequence ID stored in the ZNode.
All network messages are encrypted using TLS. Data is encrypted prior to any persistence on disk.
Shards are replicated, providing redundancy. Therefore, if one replica fails, there are others that can still service requests.
Furthermore, all writes are simultaneously persisted to disk (write-through). When a partition boots, it looks for any available disk logs to recover. This provides crash recovery, since if an entire replica set fails, restarted nodes can recover all data persisted to disk.
- Docker
Enter the Docker container:
# run `chmod +x ./scripts/docker.sh` if permission denied
./scripts/docker.sh # add flag --attach to attach to running containerCompile and start a partition:
make
./partition/partition- Question: What happens when a leader crashes, but comes back?
- Split-Brain Problem
- Write-Through -> High Latency
- Synchronous Replication -> High Latency
These are features that would be great for this project that I do not intend to implement. The reason for this is that I wanted to keep this project simple while learning what I wanted from distributed systems.
- update_topic() (upscaling + downscaling) and delete_topic() (both operations should be disabled if topics not empty)
- Consumer Grouping to Reduce
- Compaction / Retention Policy (Queues May Grow Indefinitely)
- Caching Metadata from ZooKeeper
- Security: Authentication
