An MQTT connector based on emqtt for Broadway.
MQTT is a lightweight publish/subscribe protocol widely used in IoT, industrial automation, and telemetry. This library connects a Broadway pipeline to an MQTT broker, using the MQTT protocol itself for backpressure and message reliability rather than an in-process buffer.
def deps do
[
{:off_broadway_emqtt, "~> 0.3.0"}
]
endBy default, :emqtt compiles the Quic transport library. To build without it:
BUILD_WITHOUT_QUIC=1 mix deps.compiledefmodule MyBroadway do
use Broadway
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {OffBroadway.EMQTT.Producer,
topics: [
{"sensors/status", :at_most_once}, # QoS 0
{"sensors/temperature", :at_least_once}, # QoS 1
{"sensors/humidity", :exactly_once} # QoS 2
],
max_inflight: 100,
config: [
host: "mqtt.example.com",
port: 1883,
username: "my-user",
password: "my-password",
clientid: "my-pipeline"
]
},
concurrency: 1
],
processors: [default: [concurrency: 10]],
batchers: [default: [batch_size: 100, batch_timeout: 500, concurrency: 5]]
)
end
@impl true
def handle_message(_, message, _context) do
IO.inspect(message.data, label: "payload")
IO.inspect(message.metadata.topic, label: "topic")
message
end
@impl true
def handle_batch(_, messages, _, _) do
messages
end
endEach MQTT message is delivered as a Broadway message where data is the raw payload binary
and metadata contains the remaining fields from the MQTT packet (topic, qos, packet_id, etc.).
Use QoS 1 (:at_least_once) or QoS 2 (:exactly_once) for reliable delivery. Messages are only
acknowledged to the broker after Broadway successfully processes them. If Broadway fails to process
a message, the broker redelivers it (with QoS 1/2).
QoS 0 (:at_most_once) provides no delivery guarantee. The broker fires and forgets.
The max_inflight option limits how many unACKed QoS 1/2 messages the broker will send before
waiting for acknowledgements. This is the primary backpressure mechanism: the broker stops
delivering new messages once the window is full.
For MQTT v5 (config: [proto_ver: :v5]), Receive-Maximum is set automatically in the CONNECT
properties so the broker enforces the limit server-side.
clean_start defaults to false. When a producer restarts (after a crash or deployment), the
broker recognises the clientid and redelivers any QoS 1/2 messages that were not acknowledged
before the restart. No messages are lost between restarts.
If you want a fresh session on every connect (discarding unACKed messages), set
config: [clean_start: true] explicitly.
Note
Each producer instance connects with the configured clientid plus an
index suffix: my-pipeline_0, my-pipeline_1, and so on. A concurrency: 1 pipeline configured
with clientid: "my-pipeline" appears on the broker as my-pipeline_0. If you are upgrading
from v0.2.x and rely on an exact clientid for a persistent session or ACL, either change your
config to clientid: "my-pipeline_0" or accept that the session will be treated as new on the
first v0.3.0 connect.
By default, if the MQTT connection is lost the producer process stops and Broadway's supervisor
restarts it. The fresh producer creates a new emqtt connection and re-subscribes to all topics.
With clean_start: false (the default), the broker redelivers any unACKed QoS 1/2 messages.
Alternatively, you can enable emqtt's built-in reconnect by passing reconnect in config:
config: [
host: "mqtt.example.com",
clientid: "my-pipeline",
reconnect: :infinity,
reconnect_timeout: 5,
clean_start: false
]Important
emqtt's reconnect reopens the TCP connection but does not re-subscribe. You must
set clean_start: false so the broker restores the session and delivers messages again. If you
enable reconnect with clean_start: true, messages silently stop arriving after the first reconnect.
| Option | Value | Behaviour |
|---|---|---|
on_success |
:ack (default) |
ACKs the message to the broker after successful processing |
on_success |
:noop |
Does not ACK; broker will redeliver |
on_failure |
:noop (default) |
Does not ACK; broker will redeliver (QoS 1/2) |
on_failure |
:ack |
ACKs even on failure; message is not redelivered |
To distribute messages across multiple producer instances, set concurrency > 1 and provide a
shared_group name. Without shared_group, every producer instance receives every message,
causing duplicates.
producer: [
module: {OffBroadway.EMQTT.Producer,
topics: [{"work/queue", :at_least_once}],
shared_group: "my-pipeline",
config: [host: "mqtt.example.com", clientid: "worker"]
},
concurrency: 3
]This creates three MQTT connections (worker_0, worker_1, worker_2) all in the shared group
my-pipeline. The broker distributes messages across them using MQTT shared subscriptions
($share/my-pipeline/work/queue).
max_inflight applies per connection. With concurrency: 3 and max_inflight: 100, up to 300
unACKed messages can be in-flight across the pipeline at once.
Implement OffBroadway.EMQTT.MessageHandler to customise how MQTT messages are converted to
Broadway messages. The default handler places the raw payload binary in data and the rest of
the MQTT packet fields in metadata.
defmodule MyApp.JsonHandler do
@behaviour OffBroadway.EMQTT.MessageHandler
@impl true
def handle_message(message, ack_ref, _opts) do
{payload, metadata} = Map.pop(message, :payload)
%Broadway.Message{
data: Jason.decode!(payload),
metadata: metadata,
acknowledger: {OffBroadway.EMQTT.Acknowledger, ack_ref, %{}}
}
end
endproducer: [
module: {OffBroadway.EMQTT.Producer,
topics: [{"events/#", :at_least_once}],
message_handler: MyApp.JsonHandler,
config: [host: "mqtt.example.com"]
},
concurrency: 1
]| Event | Measurements | Metadata |
|---|---|---|
[:off_broadway_emqtt, :producer, :init] |
%{time: integer} |
%{broadway_name: term, producer_index: integer} |
[:off_broadway_emqtt, :producer, :terminate] |
%{time: integer} |
%{broadway_name: term, producer_index: integer, client_id: string | nil, reason: term} |
[:off_broadway_emqtt, :connection, :up] |
%{time: integer} |
%{client_id: string, producer_index: integer} |
[:off_broadway_emqtt, :connection, :down] |
%{time: integer} |
%{client_id: string, producer_index: integer, reason: term} |
[:off_broadway_emqtt, :subscription, :success] |
%{time: integer} |
%{client_id: string, producer_index: integer, topic: string, granted_qos: 0..2} |
[:off_broadway_emqtt, :subscription, :error] |
%{time: integer} |
%{client_id: string, producer_index: integer, topic: string, reason: term} |
[:off_broadway_emqtt, :receive_message, :start] |
%{time: integer, count: 1} |
%{client_id: string, producer_index: integer, topic: string, qos: integer} |
[:off_broadway_emqtt, :receive_message, :ack] |
%{time: integer, count: 1} |
%{topic: string, qos: integer, status: :on_success | :on_failure} |
See the Producer moduledoc for common connection.down reason shapes (auth failures, TLS errors, transport errors).
See CHANGELOG.md for upgrade instructions and release history.