This document explains the internal architecture of gz-transport-py.
┌─────────────────────────────────────────────────────────────┐
│ Application Layer │
│ (Your code using Node.advertise() / Node.subscribe()) │
└──────────────────┬──────────────────────────────────────────┘
│
┌──────────────────▼──────────────────────────────────────────┐
│ Node (node.py) │
│ - Manages publishers and subscribers │
│ - Applies namespaces and topic remapping │
│ - Coordinates between discovery and transport │
└──────┬─────────────────────────────────────────────┬────────┘
│ │
│ Discovery │ Data Transport
│ │
┌──────▼────────────────────┐ ┌────────▼─────────┐
│ Discovery (discovery.py) │ │ Publisher │
│ - UDP multicast │ │ Subscriber │
│ - Topic advertisement │ │ (ZeroMQ) │
│ - Heartbeat/timeout │ │ │
└───────────────────────────┘ └──────────────────┘
- ADVERTISE - Announce a new publisher
- UNADVERTISE - Remove a publisher
- SUBSCRIBE - Request discovery of a topic
- HEARTBEAT - Keep-alive message
- BYE - Node is shutting down
Publisher Node Network Subscriber Node
│ │ │
│ ADVERTISE(/topic) │ │
├───────────────────────────>│ │
│ │ ADVERTISE(/topic) │
│ ├───────────────────────────>│
│ │ │ Store publisher info
│ │ │ Connect ZeroMQ socket
│ │ │
│ │ SUBSCRIBE(/topic) │
│ │<───────────────────────────┤
│ ADVERTISE(/topic) [reply] │ │
│<───────────────────────────┤ │
│ │ ADVERTISE(/topic) │
│ ├───────────────────────────>│
│ │ │
│ HEARTBEAT (every 1s) │ │
├───────────────────────────>│ │
│ │ │
Messages are JSON-encoded:
{
"type": "advertise",
"process_uuid": "550e8400-e29b-41d4-a716-446655440000",
"data": {
"topic": "/example",
"msg_type": "gz.msgs.StringMsg",
"address": "tcp://192.168.1.100:5555",
"process_uuid": "550e8400-e29b-41d4-a716-446655440000",
"node_uuid": "6ba7b810-9dad-11d1-80b4-00c04fd430c8",
"scope": "all"
}
}Time: 0s 1s 2s 3s 4s
│ │ │ │ │
Pub: ──┼─HB─────┼─HB─────┼─HB─────┼─────────┼─────
│ │ │ │ │
Sub: ──┼─────────┼─────────┼─────────┼──TIMEOUT─┤ Remove publisher
│ │ │ │ │
│<──────────3s silence ─────────>│
- PUB (Publisher) - Binds to random port, broadcasts messages
- SUB (Subscriber) - Connects to publisher addresses, receives messages
Messages are sent as multipart ZeroMQ messages:
┌────────────────┐
│ Part 0: Topic │ UTF-8 encoded string
├────────────────┤
│ Part 1: Data │ Serialized protobuf bytes
└────────────────┘
Example:
socket.send_multipart([
b"/example", # Topic name
msg.SerializeToString() # Protobuf bytes
])1. Publisher advertises → Discovery broadcasts
2. Subscriber receives discovery message
3. Subscriber extracts ZeroMQ address
4. Subscriber connects to address
5. Data flows over ZeroMQ connection
Publisher Subscriber
│ │
│ bind(tcp://*:5555) │
│ │
│ ─── discovery msg ───> │
│ (contains: tcp://...:5555) │
│ │
│ │ connect(tcp://...:5555)
│ │
│<══════ ZeroMQ PUB/SUB ═════>│
│ │
│ ──── message data ────> │
│ │
# Started by Discovery.start()
def _recv_loop():
while running:
data = socket.recvfrom(65535) # UDP receive
handle_message(data) # Process message
check_timeouts() # Remove stale publishers# Started by Discovery.start()
def _heartbeat_loop():
while running:
time.sleep(HEARTBEAT_INTERVAL)
send_heartbeat()
re_advertise_topics()# One thread per subscriber
def _recv_loop():
while running:
parts = socket.recv_multipart() # ZeroMQ receive
msg = deserialize(parts[1])
callback(msg) # User callback{
'publishers': {
'/topic1': Publisher(...),
'/topic2': Publisher(...)
},
'subscribers': {
'/topic1': Subscriber(...),
'/topic3': Subscriber(...)
},
'publisher_sockets': {
'/topic1': zmq.Socket(...),
'/topic2': zmq.Socket(...)
}
}{
'publishers': {
'/topic1': [
PublisherInfo(address='tcp://...', uuid='...'),
PublisherInfo(address='tcp://...', uuid='...')
]
},
'last_heartbeat': {
'process-uuid-1': timestamp,
'process-uuid-2': timestamp
},
'local_publishers': [
PublisherInfo(...)
]
}All shared state is protected by locks:
class Discovery:
def __init__(self):
self.lock = threading.RLock()
def advertise(self, pub_info):
with self.lock:
self.local_publishers.append(pub_info)
# ... send message- Discovery vs Data: Discovery happens first, connections established before data flows
- Multiple Subscribers: Each gets its own socket and thread
- Cleanup: Proper shutdown order (stop threads → close sockets → cleanup state)
User provides: "/status"
Namespace: "robot1"
Partition: "sim"
Result: "@sim@/robot1/status"if publisher.scope == Scope.PROCESS:
# Only same process can see it
if publisher.process_uuid != self.process_uuid:
return # Ignore
elif publisher.scope == Scope.HOST:
# Only same machine
if not is_local_address(publisher.address):
return # Ignore- UDP Multicast - Efficient broadcast to all nodes
- ZeroMQ - High-performance message queue
- Protobuf - Compact binary serialization
- Thread per Subscriber - Parallel message processing
- Discovery Latency - ~1 second to discover new publishers
- Python GIL - Limits CPU-bound processing in callbacks
- JSON Discovery - Not as efficient as binary (could use protobuf)
- Invalid messages → Silently ignored
- Socket errors → Logged if verbose=True
- Timeouts → Automatic cleanup of stale publishers
- Publish failure → Returns False
- Subscribe error → Logged, thread continues
- Connection failure → Retried automatically by ZeroMQ
- Same discovery protocol concept
- Same ZeroMQ transport
- Same topic naming conventions
- Compatible protobuf messages
| Aspect | Python | C++ |
|---|---|---|
| Discovery encoding | JSON | Protobuf |
| Wire protocol version | Simplified | Full (v10) |
| Performance | ~10-100k msg/s | ~100k-1M msg/s |
| Memory usage | Higher | Lower |
| Services | Not implemented | Full support |
| Statistics | Not implemented | Full support |
- Binary Discovery - Use protobuf instead of JSON
- Service Support - Implement REQ/REP pattern
- Message Pooling - Reuse message objects
- C Extension - Critical paths in C for performance
- Async API - asyncio support
- Statistics - Topic bandwidth monitoring