Skip to content

Commit 5a986d5

Browse files
committed
added kaa client
1 parent e823110 commit 5a986d5

File tree

1 file changed

+103
-0
lines changed

1 file changed

+103
-0
lines changed

datagen/bms/kaa_client.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import paho.mqtt.client as mqtt
2+
import json
3+
import logging
4+
from typing import List
5+
from paho.mqtt.client import MQTTMessage
6+
import random
7+
8+
class CommandResponseDto(object):
9+
def __init__(self, command_id: int, status_code=None, payload=None):
10+
self.command_id = command_id
11+
self.status_code = status_code
12+
self.payload = payload
13+
14+
def to_dict(self) -> dict:
15+
return {
16+
"id": self.command_id,
17+
"statusCode": self.status_code,
18+
"payload": self.payload
19+
}
20+
21+
def to_json(self) -> str:
22+
return json.dumps(self.to_dict())
23+
24+
class ConfigurationStatusResponseDto(object):
25+
def __init__(self, payload):
26+
self.payload = json.loads(payload)
27+
28+
def to_dict(self):
29+
return {
30+
"configId": self.payload["configId"],
31+
"statusCode": self.payload["statusCode"],
32+
"reasonPhrase": self.payload["reasonPhrase"],
33+
"config": self.payload["config"]
34+
}
35+
36+
def to_json(self):
37+
return json.dumps(self.to_dict())
38+
39+
40+
class CommandsDto(object):
41+
def __init__(self, payload=None):
42+
self.payload = json.loads(payload)
43+
44+
def get_command_responses(self) -> List[CommandResponseDto]:
45+
return [CommandResponseDto(command["id"], payload=command["payload"]) for command in self.payload]
46+
47+
class KaaClient(object):
48+
def __init__(self, host, port, application_version, token, client_id="fleet_sim"):
49+
self.host = host
50+
self.port = port
51+
self.token = token
52+
self.application_version = application_version
53+
self.client = mqtt.Client(client_id=client_id)
54+
self.client.on_message = self.on_message
55+
56+
def add_command_handler(self, name, handler):
57+
self.client.message_callback_add(self._topic_command(name), handler)
58+
59+
@staticmethod
60+
def get_command_response_topic(message: MQTTMessage, command_name):
61+
return f"{message.topic}".replace(f"command/{command_name}/status", f"result/{command_name}")
62+
63+
def _topic_command(self, name):
64+
return f"kp1/{self.application_version}/cex/{self.token}/command/{name}/status"
65+
66+
def _topic_configuration_status(self):
67+
return f"kp1/{self.application_version}/cmx/{self.token}/config/json/status"
68+
69+
def publish_metadata(self, payload: dict) -> None:
70+
result = self.client.publish(topic=self.topic_metadata(), payload=json.dumps(payload))
71+
self._check_send_result(result)
72+
73+
def publish_data_collection(self, payload: dict) -> None:
74+
result = self.client.publish(topic=self._topic_data_collection(), payload=json.dumps(payload))
75+
self._check_send_result(result)
76+
77+
def topic_metadata(self):
78+
return f"kp1/{self.application_version}/epmx/{self.token}/update/keys/{random.randint(1,99)}"
79+
80+
def _topic_data_collection(self):
81+
return f"kp1/{self.application_version}/dcx/{self.token}/json/{random.randint(1,99)}"
82+
83+
def add_configuration_status_handler(self, handler) -> None:
84+
self.client.message_callback_add(self._topic_configuration_status(), handler)
85+
86+
def connect(self):
87+
self.client.connect(self.host, self.port, 60)
88+
self.client.on_message = self.on_message
89+
self.client.loop_start()
90+
91+
def _check_send_result(self, result):
92+
if result.rc != 0:
93+
logging.error("Unable to send data to platform try to reconnect")
94+
self.connect()
95+
96+
@staticmethod
97+
def compose_commands_result(commands: List[CommandResponseDto]):
98+
command_results = [command.to_dict() for command in commands]
99+
return json.dumps(command_results)
100+
101+
@staticmethod
102+
def on_message(client, userdata, message: MQTTMessage):
103+
logging.info(f"Message received: topic [{message.topic}] body [{str(message.payload.decode('utf-8'))}]")

0 commit comments

Comments
 (0)