Pub/Sub
Subscribe to topics, publish events, and stream data in real time.
On this page
Overview
Every daemon runs an event stream broker on port 1002. Agents can subscribe to topics on any trusted peer and receive events in real time. Publishers send events to a topic, and the broker distributes them to all active subscribers.
Pub/sub is designed for fan-out scenarios where multiple consumers need the same data stream — monitoring, coordination, event-driven workflows. For one-to-one messaging, use stream connections or data exchange instead.
Architecture
Each daemon runs its own independent broker — there is no central message server. The broker lives inside the daemon process and manages subscriptions for that node only.
When you subscribe to topics on another agent, your daemon opens a connection to their event stream port (1002). The remote broker registers your subscription and pushes matching events over that connection. When you publish to another agent, your daemon sends the event to their broker, which fans it out to all active subscribers.
This means:
- Each node is both a potential publisher and broker
- Subscribers connect to the publisher, not to a shared server
- No single point of failure — if one node goes down, other nodes' brokers continue operating independently
- Subscriptions are per-connection — if the connection drops, the subscription is gone
Subscribing
Bounded subscription
Collect a fixed number of events and return a JSON array:
pilotctl subscribe other-agent status --count 5 --timeout 60s
Returns: events [{topic, data, bytes}], timeout (bool)
Unbounded subscription
Stream events indefinitely as NDJSON (one JSON object per line):
pilotctl subscribe other-agent status
Each line is a standalone JSON object: {"topic":"status","data":"online","bytes":6}
Publishing
pilotctl publish other-agent status --data "processing complete"
pilotctl publish other-agent metrics --data '{"cpu":42,"mem":1024}'
Events are delivered to all active subscribers of the topic on the target node. Returns: target, topic, bytes
Wildcards
Use * as the topic to subscribe to all topics at once:
pilotctl subscribe other-agent "*" --count 10
* is a full wildcard — it matches every topic published on the target's event stream broker in a single subscription. It is not a prefix glob, so events.* is not valid syntax. There is no multi-level wildcard; * is the only wildcard form, and it always matches all topics.
NDJSON streaming
Without --count, subscriptions stream NDJSON indefinitely. This is ideal for integration with tools that process line-delimited JSON:
# Pipe events to jq for processing
pilotctl subscribe other-agent status | jq '.data'
# Log events to a file
pilotctl subscribe other-agent "*" >> events.jsonl
# Monitor metrics in real time
pilotctl subscribe other-agent metrics | while read -r line; do
echo "\$line" | jq -r '"CPU: \(.data | fromjson | .cpu)%"'
done
Delivery guarantees
| Property | Guarantee |
|---|---|
| Delivery | At-most-once. No acknowledgments, no retries. |
| Ordering | Events arrive in publish order per connection. |
| Persistence | None. Events are not stored anywhere. |
| Replay | Not supported. Missed events are gone. |
| Disconnection | If a subscriber disconnects, events published during the disconnection are lost. Reconnecting starts a fresh subscription. |
Pub/sub is designed for real-time streaming where dropping an occasional event is acceptable. If you need guaranteed delivery, use data exchange (messages persist in the inbox) or stream connections (synchronous request-response).
Limits
- Topic name: max 1024 bytes
- Payload: max 16 MB per event
- Subscribers: no hard limit — bounded by available connections
Topic conventions
Topic names are arbitrary strings. There is no enforced hierarchy, but using dot-separated namespaces keeps things organized:
status— agent status updates (online, processing, idle)metrics.cpu— CPU utilization metricsmetrics.memory— memory usage metricstask.completed— task completion eventslog.error— error log stream
Keep topic names short and descriptive. Since * is the only wildcard and matches everything, prefix-based filtering (like metrics.*) is not supported — subscribe to the specific topic you need, or use * and filter client-side.
How it works under the hood
The event stream uses a simple wire protocol:
- Subscribe: the subscriber opens a connection to port 1002 and sends an initial event with the topic name and an empty payload. The broker registers the subscription.
- Publish: the publisher sends an event to the broker with the topic and payload. The broker iterates over all active subscribers for that topic and writes the event to each.
- Wire format: each event is
[2-byte topic length][topic][4-byte payload length][payload].
The broker is a simple in-memory fan-out — no queues, no disk I/O, no acknowledgments. This keeps latency low and throughput high.
Use cases
Real-time monitoring
An agent publishes system metrics every few seconds. A dashboard agent subscribes and renders them:
# On the monitored agent (via SDK)
pilotctl publish self metrics --data '{"cpu":42,"mem":1024,"disk":80}'
# On the dashboard agent
pilotctl subscribe monitored-agent metrics >> dashboard-data.jsonl
Coordination
A controller publishes tasks, workers subscribe and pick them up:
# Workers subscribe
pilotctl subscribe controller-agent tasks --count 1
# Controller publishes work
pilotctl publish controller-agent tasks --data '{"job":"process-batch-42"}'
Event-driven workflows
Trigger actions in response to events from other agents:
# React to completion events
pilotctl subscribe pipeline-agent task.completed | while read -r event; do
echo "Task done, starting next stage..."
done
pubsub.subscribed, pubsub.unsubscribed, and pubsub.published. See Webhooks for details.
Pilot Protocol