Skip to content

Commit f5529e0

Browse files
authored
feat(bindings): add mqtt bindings (LEGO#154)
1 parent 8d128db commit f5529e0

6 files changed

Lines changed: 414 additions & 0 deletions

File tree

src/LEGO.AsyncAPI.Bindings/BindingsCollection.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ namespace LEGO.AsyncAPI.Bindings
66
using LEGO.AsyncAPI.Bindings.AMQP;
77
using LEGO.AsyncAPI.Bindings.Http;
88
using LEGO.AsyncAPI.Bindings.Kafka;
9+
using LEGO.AsyncAPI.Bindings.MQTT;
910
using LEGO.AsyncAPI.Bindings.Pulsar;
1011
using LEGO.AsyncAPI.Bindings.Sns;
1112
using LEGO.AsyncAPI.Bindings.Sqs;
@@ -53,6 +54,7 @@ public static TCollection Add<TCollection, TItem>(
5354
Sqs,
5455
Sns,
5556
AMQP,
57+
MQTT,
5658
};
5759

5860
public static IEnumerable<IBindingParser<IBinding>> Http => new List<IBindingParser<IBinding>>
@@ -98,5 +100,12 @@ public static TCollection Add<TCollection, TItem>(
98100
new AMQPOperationBinding(),
99101
new AMQPMessageBinding(),
100102
};
103+
104+
public static IEnumerable<IBindingParser<IBinding>> MQTT => new List<IBindingParser<IBinding>>
105+
{
106+
new MQTTServerBinding(),
107+
new MQTTOperationBinding(),
108+
new MQTTMessageBinding(),
109+
};
101110
}
102111
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright (c) The LEGO Group. All rights reserved.
2+
3+
namespace LEGO.AsyncAPI.Bindings.MQTT
4+
{
5+
using LEGO.AsyncAPI.Models.Interfaces;
6+
using LEGO.AsyncAPI.Writers;
7+
using System;
8+
9+
public class LastWill : IAsyncApiElement
10+
{
11+
/// <summary>
12+
/// The topic where the Last Will and Testament message will be sent.
13+
/// </summary>
14+
public string Topic { get; set; }
15+
16+
/// <summary>
17+
/// Defines how hard the broker/client will try to ensure that
18+
/// the Last Will and Testament message is received.
19+
/// Its value MUST be either 0, 1 or 2.
20+
/// </summary>
21+
public uint? QoS { get; set; }
22+
23+
/// <summary>
24+
/// Last Will message.
25+
/// </summary>
26+
public string Message { get; set; }
27+
28+
/// <summary>
29+
/// Whether the broker should retain the Last Will and Testament message or not.
30+
/// </summary>
31+
public bool Retain { get; set; }
32+
33+
public void Serialize(IAsyncApiWriter writer)
34+
{
35+
if (writer is null)
36+
{
37+
throw new ArgumentNullException(nameof(writer));
38+
}
39+
40+
writer.WriteStartObject();
41+
writer.WriteRequiredProperty("topic", this.Topic);
42+
writer.WriteOptionalProperty("qos", (int?)this.QoS);
43+
writer.WriteOptionalProperty("message", this.Message);
44+
writer.WriteRequiredProperty("retain", this.Retain);
45+
writer.WriteEndObject();
46+
}
47+
}
48+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright (c) The LEGO Group. All rights reserved.
2+
3+
namespace LEGO.AsyncAPI.Bindings.MQTT
4+
{
5+
using System;
6+
using LEGO.AsyncAPI.Models;
7+
using LEGO.AsyncAPI.Readers;
8+
using LEGO.AsyncAPI.Readers.ParseNodes;
9+
using LEGO.AsyncAPI.Writers;
10+
11+
/// <summary>
12+
/// Binding class for MQTT messages.
13+
/// </summary>
14+
public class MQTTMessageBinding : MessageBinding<MQTTMessageBinding>
15+
{
16+
/// <summary>
17+
/// Indicates the format of the payload.
18+
/// Either: 0 (zero) for unspecified bytes, or 1 for UTF-8 encoded character data.
19+
/// </summary>
20+
public int? PayloadFormatIndicator { get; set; }
21+
22+
/// <summary>
23+
/// Correlation Data is used to identify the request the response message is for.
24+
/// </summary>
25+
public AsyncApiSchema CorrelationData { get; set; }
26+
27+
/// <summary>
28+
/// String describing the content type of the message payload.
29+
/// This should not conflict with the contentType field of the associated AsyncAPI Message object.
30+
/// </summary>
31+
public string ContentType { get; set; }
32+
33+
/// <summary>
34+
/// The topic (channel URI) for a response message.
35+
/// </summary>
36+
public string ResponseTopic { get; set; }
37+
38+
public override void SerializeProperties(IAsyncApiWriter writer)
39+
{
40+
if (writer is null)
41+
{
42+
throw new ArgumentNullException(nameof(writer));
43+
}
44+
45+
writer.WriteStartObject();
46+
writer.WriteOptionalProperty("payloadFormatIndicator", this.PayloadFormatIndicator);
47+
writer.WriteOptionalObject("correlationData", this.CorrelationData, (w, h) => h.SerializeV2(w));
48+
writer.WriteOptionalProperty("contentType", this.ContentType);
49+
writer.WriteOptionalProperty("responseTopic", this.ResponseTopic);
50+
writer.WriteOptionalProperty("bindingVersion", this.BindingVersion);
51+
writer.WriteExtensions(this.Extensions);
52+
writer.WriteEndObject();
53+
}
54+
55+
public override string BindingKey => "mqtt";
56+
57+
protected override FixedFieldMap<MQTTMessageBinding> FixedFieldMap => new ()
58+
{
59+
{ "payloadFormatIndicator", (a, n) => { a.PayloadFormatIndicator = n.GetIntegerValueOrDefault(); } },
60+
{ "correlationData", (a, n) => { a.CorrelationData = JsonSchemaDeserializer.LoadSchema(n); } },
61+
{ "contentType", (a, n) => { a.ContentType = n.GetScalarValue(); } },
62+
{ "responseTopic", (a, n) => { a.ResponseTopic = n.GetScalarValue(); } },
63+
};
64+
}
65+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright (c) The LEGO Group. All rights reserved.
2+
3+
namespace LEGO.AsyncAPI.Bindings.MQTT
4+
{
5+
using System;
6+
using System.Collections.Generic;
7+
using LEGO.AsyncAPI.Models;
8+
using LEGO.AsyncAPI.Readers;
9+
using LEGO.AsyncAPI.Readers.ParseNodes;
10+
using LEGO.AsyncAPI.Writers;
11+
12+
/// <summary>
13+
/// Binding class for MQTT operations.
14+
/// </summary>
15+
public class MQTTOperationBinding : OperationBinding<MQTTOperationBinding>
16+
{
17+
/// <summary>
18+
/// Defines the Quality of Service (QoS) levels for the message flow between client and server.
19+
/// Its value MUST be either 0 (At most once delivery), 1 (At least once delivery), or 2 (Exactly once delivery).
20+
/// </summary>
21+
public int QoS { get; set; }
22+
23+
/// <summary>
24+
/// Whether the broker should retain the message or not.
25+
/// </summary>
26+
public bool Retain { get; set; }
27+
28+
/// <summary>
29+
/// Interval in seconds or a Schema Object containing the definition of the lifetime of the message.
30+
/// </summary>
31+
public int? MessageExpiryInterval { get; set; }
32+
33+
public override string BindingKey => "mqtt";
34+
35+
protected override FixedFieldMap<MQTTOperationBinding> FixedFieldMap => new()
36+
{
37+
{ "qos", (a, n) => { a.QoS = n.GetIntegerValue(); } },
38+
{ "retain", (a, n) => { a.Retain = n.GetBooleanValue(); } },
39+
{ "messageExpiryInterval", (a, n) => { a.MessageExpiryInterval = n.GetIntegerValueOrDefault(); } },
40+
};
41+
42+
/// <summary>
43+
/// Serialize to AsyncAPI V2 document without using reference.
44+
/// </summary>
45+
public override void SerializeProperties(IAsyncApiWriter writer)
46+
{
47+
if (writer is null)
48+
{
49+
throw new ArgumentNullException(nameof(writer));
50+
}
51+
52+
writer.WriteStartObject();
53+
writer.WriteRequiredProperty("qos", this.QoS);
54+
writer.WriteRequiredProperty("retain", this.Retain);
55+
writer.WriteOptionalProperty("messageExpiryInterval", this.MessageExpiryInterval);
56+
writer.WriteOptionalProperty("bindingVersion", this.BindingVersion);
57+
writer.WriteExtensions(this.Extensions);
58+
writer.WriteEndObject();
59+
}
60+
}
61+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Copyright (c) The LEGO Group. All rights reserved.
2+
3+
namespace LEGO.AsyncAPI.Bindings.MQTT
4+
{
5+
using System;
6+
using LEGO.AsyncAPI.Models;
7+
using LEGO.AsyncAPI.Readers.ParseNodes;
8+
using LEGO.AsyncAPI.Writers;
9+
10+
/// <summary>
11+
/// Binding class for MQTT channel settings.
12+
/// </summary>
13+
public class MQTTServerBinding : ServerBinding<MQTTServerBinding>
14+
{
15+
/// <summary>
16+
/// The client identifier.
17+
/// </summary>
18+
public string ClientId { get; set; }
19+
20+
/// <summary>
21+
/// Whether to create a persistent connection or not.
22+
/// When false, the connection will be persistent.
23+
/// This is called clean start in MQTTv5.
24+
/// </summary>
25+
public bool? CleanSession { get; set; }
26+
27+
/// <summary>
28+
/// Last Will and Testament configuration.
29+
/// </summary>
30+
public LastWill LastWill { get; set; }
31+
32+
/// <summary>
33+
/// Interval in seconds of the longest period of time
34+
/// the broker and the client can endure without sending a message.
35+
/// </summary>
36+
public int? KeepAlive { get; set; }
37+
38+
/// <summary>
39+
/// Interval in seconds the broker maintains a session
40+
/// for a disconnected client until this interval expires.
41+
/// </summary>
42+
public int? SessionExpiryInterval { get; set; }
43+
44+
/// <summary>
45+
/// Number of bytes representing the maximum packet size
46+
/// the client is willing to accept.
47+
/// </summary>
48+
public int? MaximumPacketSize { get; set; }
49+
50+
public override string BindingKey => "mqtt";
51+
52+
protected override FixedFieldMap<MQTTServerBinding> FixedFieldMap => new ()
53+
{
54+
{ "bindingVersion", (a, n) => { a.BindingVersion = n.GetScalarValue(); } },
55+
{ "clientId", (a, n) => { a.ClientId = n.GetScalarValue(); } },
56+
{ "cleanSession", (a, n) => { a.CleanSession = n.GetBooleanValueOrDefault(); } },
57+
{ "lastWill", (a, n) => { a.LastWill = n.ParseMap(LastWillFixedFields); } },
58+
{ "keepAlive", (a, n) => { a.KeepAlive = n.GetIntegerValueOrDefault(); } },
59+
{ "sessionExpiryInterval", (a, n) => { a.SessionExpiryInterval = n.GetIntegerValueOrDefault(); } },
60+
{ "maximumPacketSize", (a, n) => { a.MaximumPacketSize = n.GetIntegerValueOrDefault(); } },
61+
};
62+
63+
private static FixedFieldMap<LastWill> LastWillFixedFields = new ()
64+
{
65+
{ "topic", (a, n) => { a.Topic = n.GetScalarValue(); } },
66+
{ "qos", (a, n) => { a.QoS = (uint?)n.GetIntegerValueOrDefault(); } },
67+
{ "message", (a, n) => { a.Message = n.GetScalarValue(); } },
68+
{ "retain", (a, n) => { a.Retain = n.GetBooleanValue(); } },
69+
};
70+
71+
/// <summary>
72+
/// Serialize to AsyncAPI V2 document without using reference.
73+
/// </summary>
74+
public override void SerializeProperties(IAsyncApiWriter writer)
75+
{
76+
if (writer is null)
77+
{
78+
throw new ArgumentNullException(nameof(writer));
79+
}
80+
81+
writer.WriteStartObject();
82+
writer.WriteRequiredProperty("clientId", this.ClientId);
83+
writer.WriteOptionalProperty("cleanSession", this.CleanSession);
84+
writer.WriteOptionalObject("lastWill", this.LastWill, (w, l) => l.Serialize(w));
85+
writer.WriteOptionalProperty("keepAlive", this.KeepAlive);
86+
writer.WriteOptionalProperty("sessionExpiryInterval", this.SessionExpiryInterval);
87+
writer.WriteOptionalProperty("maximumPacketSize", this.MaximumPacketSize);
88+
writer.WriteOptionalProperty(AsyncApiConstants.BindingVersion, this.BindingVersion);
89+
writer.WriteExtensions(this.Extensions);
90+
writer.WriteEndObject();
91+
}
92+
}
93+
}

0 commit comments

Comments
 (0)