Flow

Pub/Sub

Subscribe to topics, publish events, and stream data in real time.

Overview

Every daemon runs an event stream broker on port 1002. Agents can subscribe to topics on any trusted peer and receive events in real time. Publishers send events to a topic, and the broker distributes them to all active subscribers.

Pub/sub is designed for fan-out scenarios where multiple consumers need the same data stream — monitoring, coordination, event-driven workflows. For one-to-one messaging, use stream connections or data exchange instead.

Architecture

Each daemon runs its own independent broker — there is no central message server. The broker lives inside the daemon process and manages subscriptions for that node only.

When you subscribe to topics on another agent, your daemon opens a connection to their event stream port (1002). The remote broker registers your subscription and pushes matching events over that connection. When you publish to another agent, your daemon sends the event to their broker, which fans it out to all active subscribers.

This means:

Subscribing

Bounded subscription

Collect a fixed number of events and return a JSON array:

pilotctl subscribe other-agent status --count 5 --timeout 60s

Returns: events [{topic, data, bytes}], timeout (bool)

Unbounded subscription

Stream events indefinitely as NDJSON (one JSON object per line):

pilotctl subscribe other-agent status

Each line is a standalone JSON object: {"topic":"status","data":"online","bytes":6}

Publishing

pilotctl publish other-agent status --data "processing complete"
pilotctl publish other-agent metrics --data '{"cpu":42,"mem":1024}'

Events are delivered to all active subscribers of the topic on the target node. Returns: target, topic, bytes

Wildcards

Use * as the topic to subscribe to all topics at once:

pilotctl subscribe other-agent "*" --count 10

* is a full wildcard — it matches every topic published on the target's event stream broker in a single subscription. It is not a prefix glob, so events.* is not valid syntax. There is no multi-level wildcard; * is the only wildcard form, and it always matches all topics.

NDJSON streaming

Without --count, subscriptions stream NDJSON indefinitely. This is ideal for integration with tools that process line-delimited JSON:

# Pipe events to jq for processing
pilotctl subscribe other-agent status | jq '.data'

# Log events to a file
pilotctl subscribe other-agent "*" >> events.jsonl

# Monitor metrics in real time
pilotctl subscribe other-agent metrics | while read -r line; do
  echo "\$line" | jq -r '"CPU: \(.data | fromjson | .cpu)%"'
done

Delivery guarantees

PropertyGuarantee
DeliveryAt-most-once. No acknowledgments, no retries.
OrderingEvents arrive in publish order per connection.
PersistenceNone. Events are not stored anywhere.
ReplayNot supported. Missed events are gone.
DisconnectionIf a subscriber disconnects, events published during the disconnection are lost. Reconnecting starts a fresh subscription.

Pub/sub is designed for real-time streaming where dropping an occasional event is acceptable. If you need guaranteed delivery, use data exchange (messages persist in the inbox) or stream connections (synchronous request-response).

Limits

Topic conventions

Topic names are arbitrary strings. There is no enforced hierarchy, but using dot-separated namespaces keeps things organized:

Keep topic names short and descriptive. Since * is the only wildcard and matches everything, prefix-based filtering (like metrics.*) is not supported — subscribe to the specific topic you need, or use * and filter client-side.

How it works under the hood

The event stream uses a simple wire protocol:

  1. Subscribe: the subscriber opens a connection to port 1002 and sends an initial event with the topic name and an empty payload. The broker registers the subscription.
  2. Publish: the publisher sends an event to the broker with the topic and payload. The broker iterates over all active subscribers for that topic and writes the event to each.
  3. Wire format: each event is [2-byte topic length][topic][4-byte payload length][payload].

The broker is a simple in-memory fan-out — no queues, no disk I/O, no acknowledgments. This keeps latency low and throughput high.

Use cases

Real-time monitoring

An agent publishes system metrics every few seconds. A dashboard agent subscribes and renders them:

# On the monitored agent (via SDK)
pilotctl publish self metrics --data '{"cpu":42,"mem":1024,"disk":80}'

# On the dashboard agent
pilotctl subscribe monitored-agent metrics >> dashboard-data.jsonl

Coordination

A controller publishes tasks, workers subscribe and pick them up:

# Workers subscribe
pilotctl subscribe controller-agent tasks --count 1

# Controller publishes work
pilotctl publish controller-agent tasks --data '{"job":"process-batch-42"}'

Event-driven workflows

Trigger actions in response to events from other agents:

# React to completion events
pilotctl subscribe pipeline-agent task.completed | while read -r event; do
  echo "Task done, starting next stage..."
done
Webhooks integration: The daemon fires webhook events for pub/sub activity — pubsub.subscribed, pubsub.unsubscribed, and pubsub.published. See Webhooks for details.