Skip to content

Cluster: plan-based gateway, phased startup/shutdown, SWIM membership#31

Merged
farhan-syah merged 11 commits intomainfrom
cluster
Apr 15, 2026
Merged

Cluster: plan-based gateway, phased startup/shutdown, SWIM membership#31
farhan-syah merged 11 commits intomainfrom
cluster

Conversation

@farhan-syah
Copy link
Copy Markdown
Contributor

Summary

  • Replace SQL-string forwarding with a plan-based gateway that routes serialisable PhysicalPlan envelopes over QUIC; remove the legacy LocalForwarder / ClusterForwarder / pgwire forward.rs path.
  • Rework server bootstrap around a gate-based StartupSequencer and a phased ShutdownBus with per-phase drain budgets, making WAL failures fatal and ensuring the gateway is installed before listeners bind.
  • Split the monolithic rpc_codec.rs into focused modules and make all PhysicalPlan sub-types wire-serialisable (serde + zerompk + PartialEq), adding the ExecuteRequest / ExecuteResponse envelope and extending the protocol OpCode.
  • Add an in-place clear_and_install_from repair method across security / event registries so the catalog sanity checker can reconcile diverged in-memory state without restarting listeners.
  • Introduce a SWIM failure-detection and membership module (nodedb-cluster/src/swim/): incarnation counters, Alive/Suspect/Dead/Left state machine, pure merge logic, and MessagePack wire codec for Ping / PingReq / Ack / Nack.

Test plan

  • `cargo fmt --all --check`
  • `cargo clippy --all-targets -- -D warnings`
  • `cargo nextest run --all-features`
  • Manual: 3-node cluster startup, rolling restart, and induced-fault membership convergence

The monolithic rpc_codec.rs (955 lines) is replaced with a module
directory subdivided by message category: raft_msgs, raft_rpc,
cluster_mgmt, vshard, execute, header, and discriminants.

This keeps each encode/decode concern self-contained and below the
500-line file limit, making it straightforward to extend individual
message groups without touching unrelated codec paths.

Raft loop internals (handle_rpc, join, loop_core, tick) and cluster_info
are updated to import from the new module paths.
All PhysicalPlan sub-types now derive PartialEq, serde Serialize/Deserialize,
and zerompk ToMessagePack/FromMessagePack. A new wire.rs module provides the
ExecuteRequest/ExecuteResponse envelope types used by the gateway to ship
pre-planned physical plans over QUIC instead of forwarding raw SQL strings.

Protocol changes:
- Add Status opcode (0x03) to OpCode for unauthenticated readiness checks
- Add TryFrom<u8>/From<u8> impls for serde-compatible JSON numeric encoding
- Add MessagePack derivations to shared types (Value, graph, timeseries)
The previous startup sequencer used an atomic phase counter with watch
channels and a monolithic Sequencer struct. This is replaced by
StartupSequencer (startup_sequencer.rs) which models each phase as a
named gate — a tokio Notify pair registered before the owning subsystem
begins its work and fired immediately after it reports ready.

Benefits:
- Phase transitions are observable without polling: await_phase() is
  a zero-cost future on the gate's notifier
- Multiple concurrent subsystems can own separate gates within the same
  phase and the phase only advances when all registered gates have fired
- The GatewayGuard and startup snapshot are folded into the new health.rs
  module, removing guard.rs and snapshot.rs
- Eliminates sequencer.rs (411 lines) and guard.rs (207 lines)

SharedState is updated to hold Arc<StartupGate> so listeners installed
at different points in startup all see live phase transitions.
ShutdownBus (bus.rs) replaces the flat ShutdownWatch signal with a
structured drain sequence. Subsystems register named drain tasks per
ShutdownPhase, and the bus enforces a 500ms budget per phase before
aborting laggards and logging an offender report.

The flat ShutdownWatch is preserved as the underlying signal so all
existing watch::Receiver<bool> subscribers continue to work without
change.
…routing

Introduces the Gateway (control/gateway/) as the single cluster-aware
execution path for all protocol handlers. Instead of serialising a raw
SQL string and re-planning it on the remote node, handlers now ship the
pre-planned PhysicalPlan via ExecuteRequest over QUIC.

Removed:
- control/forward.rs — LocalForwarder (SQL-string execution handler)
- control/cluster_forwarder.rs — thin wrapper around ForwardRequest
- server/pgwire/handler/routing/forward.rs — forward_sql / remote_leader_for_tasks
- server/pgwire/handler/retry.rs — ad-hoc retry wrapper superseded by gateway retry

All protocol handlers (pgwire, native, HTTP, RESP, ILP, WebSocket) are
updated to call gateway.execute() where available, with a local SPSC
fallback for single-node boot before the gateway is installed.

CDC consume_remote and topic publish are updated to route through
gateway.execute_sql instead of the old ForwardRequest path.
…strap

main.rs is updated to construct StartupSequencer gates before each
subsystem, install the real startup gate on SharedState after open(),
and construct the Gateway + PlanCacheInvalidator before listeners bind.

Additional bootstrap changes:
- WAL validation runs before wal_gate fires; corrupt segments are now
  fatal rather than silently ignored, preventing startup on a corrupted WAL
- WAL replay failure is now fatal (was a warn + empty state)
- ShutdownBus is wired with system metrics for per-phase drain telemetry
- A NODEDB_TEST_SLOW_DRAIN_TASK env hook enables drain-abort integration tests

Supporting changes:
- recovery_check module added for catalog sanity check on startup
- catalog_entry post_apply and invalidation tests updated
- system metrics expanded for startup and shutdown phase telemetry
- WAL manager gains validate_for_startup
- Error types extended for new startup and gateway error variants
All in-memory security and event registries (permissions, roles, API keys,
blacklist, credentials, RLS policies, change streams, consumer groups,
scheduler, streaming MVs, alerts, triggers, retention policies) gain a
clear_and_install_from method used by the catalog recovery sanity checker.

When the checker detects divergence between the in-memory registry and the
redb system catalog, it loads a fresh store from redb and calls
clear_and_install_from to repair in place, keeping all existing Arc
references stable so listeners need not be restarted.
…lan types

Data plane dispatch (text, vector), enforcement, and engine handler modules
are updated for the renamed/restructured plan types introduced by the
PhysicalPlan serialisation work. Plan builders for graph, text, and vector
native dispatch are aligned to the new type paths.

Also update .gitignore for build artefacts introduced by the new gateway
and startup modules.
…phases

New test suites cover the gateway and shutdown work introduced in this
branch:

Gateway:
- gateway_execute: basic execute roundtrip through the Gateway
- cluster_execute_request: ExecuteRequest routing over QUIC in a 3-node cluster
- {http,ilp,native,pgwire,resp}_gateway_migration: verify each protocol
  handler falls back correctly from direct dispatch to gateway routing
- listeners_gateway_smoke: all listeners accept queries after gateway enable
- listeners_typed_not_leader: non-leader nodes return NOT_LEADER via gateway
- catalog_recovery_check: diverged in-memory registry is repaired on startup

Startup gates:
- startup_gate_{pgwire,http,native,ilp,resp}: each listener blocks connections
  before GatewayEnable and accepts them immediately after
- startup_failure: node in Failed state rejects all connections

Shutdown phases:
- shutdown_{idempotent,budget,in_flight,abort_offender,event_plane}: phased
  drain correctness, per-phase budget enforcement, and offender abort

Existing tests updated for renamed harness helpers and new startup/shutdown APIs.
Implement a SWIM (Scalable Weakly-consistent Infection-style Membership)
failure detector for the cluster layer. The protocol provides probabilistic
membership convergence with O(log n) dissemination, replacing the need for
a central membership oracle.

Includes:
- Incarnation counter for detecting stale membership state after restarts
- Member records and state machine (Alive/Suspect/Dead/Left)
- MembershipList with merge semantics for gossip convergence
- Wire message format and QUIC-framed codec for probe/ack/indirect-probe
- Configuration surface (probe interval, suspicion timeout, fanout)
- NodeId MessagePack derive to support membership wire encoding
@farhan-syah farhan-syah merged commit 6513eca into main Apr 15, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant