Skip to content

michaelhyi/distributed-message-queue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

286 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Distributed Message Queue

A distributed message queue implemented in C, modelling a very simplified version of Kafka. This distributed system uses a custom application-layer network protocol called DMQP and Apache ZooKeeper for distributed consensus, partition discovery, and metadata.

Features

  • Partitions: Topics, Sharding, Replication
  • Distributed: Consensus, Metadata, Locking, Leader Election
  • Networking: DMQP (Custom Application-Layer Protocol)
  • Reliability: Ordering, Atomicity, Security, Fault Tolerance

Design

Architecture

Architecture diagram of the distributed system

Partitions are nodes that store queues and host DMQP servers. Once partitions boot, they are registered into the distributed system's metadata via ZooKeeper. They are initially marked as free (unused) partitions. This facilitates partition discovery.

When a client creates a new topic, free partitions are allocated for the topic. For instance, if a client creates a new topic with 2 shards and a replication factor of 3, 6 partitions are allocated in total. Each shard is allocated 3 partitions. Different shards store different data, but replicas of the same shard store the same data. Sharding and partitioning provide horizontal scaling and fault tolerance.

Each shard has a partition that is elected as a leader. A write must be replicated by the entire shard's replica set before the request is fully serviced. A read can be serviced by any partition in the shard's replica set.

Data is distributed between shards using Round-Robin.

Here's a walkthrough of a write to a topic:

  1. User sends a write request to the client with a Topic ID
  2. Client queries ZooKeeper for the IP addresses and ports of the topic's shards. The client is only interested in partitions that are elected leaders for that shard
  3. Client applies Round-Robin to distribute data evenly and prevent hotspots
  4. Data is persisted by the partition both in-memory and on disk

Users can interface with this distributed system using a client defined in include/api.h.

Distributed Metadata

This system uses the following ZNodes in ZooKeeper:

PERSISTENT: /partitions
PERSISTENT: /partitions/lock
EPHEMERAL & SEQUENTIAL: /partitions/lock/lock-{sequence_id}
PERSISTENT: /partitions/free-count
EPHEMERAL & SEQUENTIAL: /partitions/partition-{sequence_id}

PERSISTENT: /topics
PERSISTENT: /topics/{topic_name}

PERSISTENT: /topics/{topic_name}/sequence-id
PERSISTENT: /topics/{topic_name}/sequence-id/lock
EPHEMERAL & SEQUENTIAL: /topics/{topic_name}/sequence-id/lock/lock-{sequence_id}

PERSISTENT: /topics/{topic_name}/shards
PERSISTENT & SEQUENTIAL: /topics/{topic_name}/shards/shard-{sequence_id}
PERSISTENT: /topics/{topic_name}/shards/shard-{sequence_id}/partitions
PERSISTENT & SEQUENTIAL: /topics/{topic_name}/shards/shard-{sequence_id}/partitions/partition-{sequence_id}

The /partitions parent ZNode is a global list of partitions. The /partitions/free-count ZNode stores the number of free partitions. Each partition has a ephemeral, sequential ZNode named /partitions/partition-{sequence_id}. It stores the IP address and port of the partition and the topic it is allocated to in the following format:

{partition_ip_addr}:{partition_port} // free or unallocated to topic
{partition_ip_addr}:{partition_port};/topics/{topic_name}/shards/shard-{sequence_id} // allocated to topic shard

The /topics/{topic_name}/sequence-id ZNode stores an integer, representing the next expected sequence ID of an incoming write to that topic. This facilitates ordering, which you can read about in the section below.

The /topics/{topic_name}/shards/shard-{sequence_id}/partitions/partition-{sequence_id} ZNode stores metadata regarding each partition of the given shard. The partition with the lowest sequence ID is the leader of the replica set. The ZNode is a pointer to a child of the /partitions parent ZNode by storing the partition's path.

Leader Election

When a leader node crashes, the node with the following sequence ID is notified via a watch they set on the leader ZNode. This is achieved by ensuring that allocated partitions set a watch on the ZNode with the preceding sequence ID, thus creating a queue of potential leaders. When a node becomes notified, they become elected the leader.

Distributed Locking

For instance, let's say the goal is to atomically get and increment the topic's sequence ID. To do so, a client must create the following sequential, ephemeral ZNode: /topics/{topic_name}/sequence-id/lock/lock-{sequence_id}.

If the client has the lowest sequence ID of all clients trying to obtain a lock, it has successfully obtained the lock. Otherwise, it sets a watch on the ZNode with the preceding sequence ID. When a client finishes its critical section, it deletes its ZNode, triggering the watch on the next ZNode in the queue to attain the lock.

Networking

This distributed system uses a custom application-layer protocol called DMQP. DMQP stands for Distributed Message Queue Protocol. DMQP uses TCP with persistent connections, leveraging keepalive to preserve resources only for active queue producers and consumers while maximizing throughput. Socket operations in DMQP have a timeout of 30 seconds each.

A DMQP message uses the following format:

+------------------------------------------+
|           Sequence ID (4 bytes)          |
+------------------------------------------+
|             Length (4 bytes)             |
+------------------------------------------+
| Method (2 bytes) | Status Code (2 bytes) |
+------------------------------------------+
|             Payload (Max 1MB)            |
+------------------------------------------+

The Sequence ID header contains the unique sequence number of a queue entry. This is used for message ordering.

The Length header states the size of the payload in bytes.

The Method header can be one of the following:

DMQP_PUSH
DMQP_POP
DMQP_PEEK_SEQUENCE_ID
DMQP_RESPONSE

DMQP_PUSH and DMQP_POP are self-explanatory. DMQP_PEEK_SEQUENCE_ID returns the sequence ID of the queue's head entry. DMQP_RESPONSE is specified if the message is a response to a request.

The Status Code header is a Unix errno.

The Payload contains data to be pushed onto the queue.

Reliability

Ordering & Atomicity

All queue operations at the partition level are protected by mutex locks. This guarantees ordering and atomicity at the partition level. All ZooKeeper operations used by the client are atomic.

Each topic has an expected sequence ID for its next message stored in the /topics/{topic_name}/sequence-id ZNode. When a partition receives a DMQP message, it expects it to contain the same sequence ID stored in the ZNode.

Security

All network messages are encrypted using TLS. Data is encrypted prior to any persistence on disk.

Fault Tolerance

Shards are replicated, providing redundancy. Therefore, if one replica fails, there are others that can still service requests.

Furthermore, all writes are simultaneously persisted to disk (write-through). When a partition boots, it looks for any available disk logs to recover. This provides crash recovery, since if an entire replica set fails, restarted nodes can recover all data persisted to disk.

Quick Start

Requirements

  • Docker

Enter the Docker container:

# run `chmod +x ./scripts/docker.sh` if permission denied
./scripts/docker.sh # add flag --attach to attach to running container

Compile and start a partition:

make
./partition/partition

Backlog

  • Question: What happens when a leader crashes, but comes back?
  • Split-Brain Problem
  • Write-Through -> High Latency
  • Synchronous Replication -> High Latency

Recommended Future Features

These are features that would be great for this project that I do not intend to implement. The reason for this is that I wanted to keep this project simple while learning what I wanted from distributed systems.

  • update_topic() (upscaling + downscaling) and delete_topic() (both operations should be disabled if topics not empty)
  • Consumer Grouping to Reduce
  • Compaction / Retention Policy (Queues May Grow Indefinitely)
  • Caching Metadata from ZooKeeper
  • Security: Authentication

About

Distributed message queue with distributed locking, leader election, partition discovery, and custom networking

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages