Skip to content

MQTT Connector Capacity Limitation #63

@lxsaah

Description

@lxsaah

Issue Summary

The AimDB MQTT connector (tokio implementation) has a hard-coded channel capacity of 10, which causes deadlocks when subscribing to more than 10 topics.

Root Cause

File: aimdb-mqtt-connector/src/tokio_client.rs
Line: 221

// Create client and event loop
let (client, event_loop) = AsyncClient::new(mqtt_opts, 10);  // ← HARD-CODED TO 10

The rumqttc::AsyncClient::new() creates an internal channel with capacity 10. When subscribing to 12 topics (6 nodes × 2 sensors), the subscription confirmations from the broker fill this queue, causing the connector to hang during initialization.

Symptoms

Working Configuration (10 records)

# 5 nodes × 2 sensors = 10 records
- vienna (temp + gps)
- salzburg (temp + gps)
- graz (temp + gps)
- innsbruck (temp + gps)
- linz (temp + gps)

Logs:

Collected 10 inbound routes for scheme 'mqtt'
MQTT router has 10 topics
Connector built and spawned successfully: mqtt
Database running, background tasks active

Failing Configuration (12 records)

# 6 nodes × 2 sensors = 12 records
- vienna, salzburg, graz, innsbruck, linz, klagenfurt

Logs:

Collected 12 inbound routes for scheme 'mqtt'
MQTT router has 12 topics
[HANGS HERE - never reaches "Connector built and spawned successfully"]

Technical Details

Subscription Flow

  1. MqttConnectorImpl::build_internal() creates MQTT client with capacity 10
  2. Loop subscribes to all topics (lines 231-238):
    for topic in &topics {
        client_arc.subscribe(topic.as_ref(), rumqttc::QoS::AtLeastOnce).await?;
    }
  3. Each subscription sends a SUBACK message from broker to client
  4. SUBACK messages queue up in the internal channel (capacity 10)
  5. With 12 subscriptions, 2 SUBACK messages can't be queued → deadlock

Why 10 Works But 12 Doesn't

The rumqttc library's internal event loop processes:

  • Outbound subscription requests
  • Inbound SUBACK confirmations
  • Inbound MQTT messages

With 10 topics, all SUBACKs fit in the queue. With 12, the queue fills up, blocking new SUBACKs, which blocks the event loop, which blocks subscription completion.

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions