|
1 | 1 | if __package__ == "": |
2 | | - from tion import tion |
| 2 | + from tion import tion |
3 | 3 | else: |
4 | | - from . import tion |
| 4 | + from . import tion |
5 | 5 |
|
6 | 6 | from bluepy import btle |
7 | 7 | import time |
8 | 8 |
|
9 | 9 | class s3(tion): |
10 | | - uuid = "6e400001-b5a3-f393-e0a9-e50e24dcca9e" |
11 | | - uuid_write = "6e400002-b5a3-f393-e0a9-e50e24dcca9e" |
12 | | - uuid_notify = "6e400003-b5a3-f393-e0a9-e50e24dcca9e" |
13 | | - write = None |
14 | | - notify = None |
15 | | - statuses = [ 'off', 'on' ] |
16 | | - modes = [ 'recirculation', 'mixed' ] |
17 | | - _btle = None |
18 | | - |
19 | | - command_prefix = 61 |
20 | | - command_suffix = 90 |
21 | | - |
22 | | - command_PAIR = 5 |
23 | | - command_REQUEST_PARAMS = 1 |
24 | | - command_SET_PARAMS = 2 |
25 | | - |
26 | | - def __init__(self): |
27 | | - self._btle = btle.Peripheral(None) |
28 | | - |
29 | | - def pair(self, mac: str): |
30 | | - self._btle.connect(mac, btle.ADDR_TYPE_RANDOM) |
31 | | - characteristic = self._btle.getServiceByUUID(self.uuid).getCharacteristics()[0] |
32 | | - characteristic.write(bytes(self._get_pair_command())) |
33 | | - self._btle.disconnect() |
34 | | - |
35 | | - def create_command(self, command: int) -> bytearray: |
36 | | - command_special = 1 if command == self.command_PAIR else 0 |
37 | | - return bytearray([self.command_prefix, command, command_special, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, self.command_suffix]) |
38 | | - |
39 | | - def _get_pair_command(self) -> bytearray: |
40 | | - return self.create_command(self.command_PAIR) |
41 | | - |
42 | | - def _get_status_command(self) -> bytearray: |
43 | | - return self.create_command(self.command_REQUEST_PARAMS) |
44 | | - |
45 | | - def _process_mode(self, mode_code: int) -> str: |
46 | | - try: |
47 | | - result = self.modes[mode_code] |
48 | | - except IndexError: |
49 | | - result = 'outside' |
50 | | - return result |
51 | | - |
52 | | - def _encode_mode(self, mode: str) -> int: |
53 | | - return self.modes.index(mode) if mode in self.modes else 2 |
54 | | - |
55 | | - def _encode_status(self, status: str) -> int: |
56 | | - return self.statuses.index(status) if status in self.statuses else 0 |
57 | | - |
58 | | - def _process_status(self, code: int) -> str: |
59 | | - try: |
60 | | - result = self.statuses[code] |
61 | | - except IndexError: |
62 | | - result = 'unknown' |
63 | | - return result |
64 | | - |
65 | | - def _encode_request(self, mac: str, request: dict) -> bytearray: |
66 | | - try: |
67 | | - if request["fan_speed"] == 0: |
68 | | - del request["fan_speed"] |
69 | | - reqest["status"] = "off" |
70 | | - except KeyError: |
71 | | - pass |
72 | | - |
73 | | - settings = {**self.get(mac, False), **request} |
74 | | - new_settings = self.create_command(self.command_SET_PARAMS) |
75 | | - new_settings[2] = settings["fan_speed"] |
76 | | - new_settings[3] = settings["heater_temp"] |
77 | | - new_settings[4] = self._encode_mode(settings["mode"]) |
78 | | - new_settings[5] = self._encode_status(settings["heater"]) | (self._encode_status(settings["status"])<<1) | (self._encode_status(settings["sound"])<<3) |
79 | | - return new_settings |
80 | | - |
81 | | - def _decode_response(self, response: bytearray) -> dict: |
82 | | - try: |
83 | | - result = { |
84 | | - "code": 200, |
85 | | - "heater": self._process_status(response[4] & 1), |
86 | | - "status": self._process_status(response[4] >> 1 & 1), |
87 | | - "sound": self._process_status(response[4] >> 3 & 1), |
88 | | - "mode": self._process_mode(int(list("{:02x}".format(response[2]))[0])), |
89 | | - "fan_speed": int(list("{:02x}".format(response[2]))[1]), |
90 | | - "heater_temp": response[3], |
91 | | - "in_temp": self.decode_temperature(response[8]), |
92 | | - "out_temp": self.decode_temperature(response[7]), |
93 | | - "filter_remain": response[10]*256 + response[9], |
94 | | - "time": "{}:{}".format(response[11],response[12]), |
95 | | - "request_error_code": response[13], |
96 | | - "fw_version": "{:02x}{:02x}".format(response[16],response[17]) |
97 | | - } |
98 | | - except IndexError as e: |
99 | | - result = { |
100 | | - "code": 400, |
101 | | - "error": "Got bad response from Tion '%s': %s while parsing" % (response, str(e)) |
102 | | - } |
103 | | - finally: |
104 | | - return result |
105 | | - |
106 | | - def _connect(self, mac: str, new_connection = True): |
107 | | - if new_connection: |
108 | | - self._btle.connect(mac, btle.ADDR_TYPE_RANDOM) |
109 | | - for tc in self._btle.getCharacteristics(): |
110 | | - if tc.uuid == self.uuid_notify: |
111 | | - self.notify = tc |
112 | | - if tc.uuid == self.uuid_write: |
113 | | - self.write = tc |
114 | | - |
115 | | - def get(self, mac: str, new_connection = True) -> dict: |
116 | | - response = "" |
117 | | - self._connect(mac, new_connection) #new_connection processed inside |
118 | | - |
119 | | - self.notify.read() |
120 | | - self.write.write(self._get_status_command()) |
121 | | - response = self._btle.getServiceByUUID(self.uuid).getCharacteristics()[0].read() |
122 | | - |
123 | | - if new_connection: |
124 | | - self._btle.disconnect() |
125 | | - return self._decode_response(response) |
126 | | - |
127 | | - def set(self, mac: str, request: dict): |
128 | | - self._connect(mac) |
129 | | - self.notify.read() |
130 | | - self.write.write(self._encode_request(mac, request)) |
131 | | - self._btle.disconnect() |
| 10 | + uuid = "6e400001-b5a3-f393-e0a9-e50e24dcca9e" |
| 11 | + uuid_write = "6e400002-b5a3-f393-e0a9-e50e24dcca9e" |
| 12 | + uuid_notify = "6e400003-b5a3-f393-e0a9-e50e24dcca9e" |
| 13 | + write = None |
| 14 | + notify = None |
| 15 | + statuses = ['off', 'on'] |
| 16 | + modes = ['recirculation', 'mixed'] |
| 17 | + _btle = None |
| 18 | + |
| 19 | + command_prefix = 61 |
| 20 | + command_suffix = 90 |
| 21 | + |
| 22 | + command_PAIR = 5 |
| 23 | + command_REQUEST_PARAMS = 1 |
| 24 | + command_SET_PARAMS = 2 |
| 25 | + |
| 26 | + def __init__(self): |
| 27 | + self._btle = btle.Peripheral(None) |
| 28 | + |
| 29 | + def pair(self, mac: str): |
| 30 | + self._btle.connect(mac, btle.ADDR_TYPE_RANDOM) |
| 31 | + characteristic = self._btle.getServiceByUUID(self.uuid).getCharacteristics()[0] |
| 32 | + characteristic.write(bytes(self._get_pair_command())) |
| 33 | + self._btle.disconnect() |
| 34 | + |
| 35 | + def create_command(self, command: int) -> bytearray: |
| 36 | + command_special = 1 if command == self.command_PAIR else 0 |
| 37 | + return bytearray([self.command_prefix, command, command_special, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, |
| 38 | + self.command_suffix]) |
| 39 | + |
| 40 | + def _get_pair_command(self) -> bytearray: |
| 41 | + return self.create_command(self.command_PAIR) |
| 42 | + |
| 43 | + def _get_status_command(self) -> bytearray: |
| 44 | + return self.create_command(self.command_REQUEST_PARAMS) |
| 45 | + |
| 46 | + def _process_mode(self, mode_code: int) -> str: |
| 47 | + try: |
| 48 | + result = self.modes[mode_code] |
| 49 | + except IndexError: |
| 50 | + result = 'outside' |
| 51 | + return result |
| 52 | + |
| 53 | + def _encode_mode(self, mode: str) -> int: |
| 54 | + return self.modes.index(mode) if mode in self.modes else 2 |
| 55 | + |
| 56 | + def _encode_status(self, status: str) -> int: |
| 57 | + return self.statuses.index(status) if status in self.statuses else 0 |
| 58 | + |
| 59 | + def _process_status(self, code: int) -> str: |
| 60 | + try: |
| 61 | + result = self.statuses[code] |
| 62 | + except IndexError: |
| 63 | + result = 'unknown' |
| 64 | + return result |
| 65 | + |
| 66 | + def _encode_request(self, mac: str, request: dict) -> bytearray: |
| 67 | + try: |
| 68 | + if request["fan_speed"] == 0: |
| 69 | + del request["fan_speed"] |
| 70 | + reqest["status"] = "off" |
| 71 | + except KeyError: |
| 72 | + pass |
| 73 | + |
| 74 | + settings = {**self.get(mac, False), **request} |
| 75 | + new_settings = self.create_command(self.command_SET_PARAMS) |
| 76 | + new_settings[2] = settings["fan_speed"] |
| 77 | + new_settings[3] = settings["heater_temp"] |
| 78 | + new_settings[4] = self._encode_mode(settings["mode"]) |
| 79 | + new_settings[5] = self._encode_status(settings["heater"]) | (self._encode_status(settings["status"]) << 1) | ( |
| 80 | + self._encode_status(settings["sound"]) << 3) |
| 81 | + return new_settings |
| 82 | + |
| 83 | + def _decode_response(self, response: bytearray) -> dict: |
| 84 | + try: |
| 85 | + result = {"code": 200, "heater": self._process_status(response[4] & 1), |
| 86 | + "status": self._process_status(response[4] >> 1 & 1), |
| 87 | + "sound": self._process_status(response[4] >> 3 & 1), |
| 88 | + "mode": self._process_mode(int(list("{:02x}".format(response[2]))[0])), |
| 89 | + "fan_speed": int(list("{:02x}".format(response[2]))[1]), "heater_temp": response[3], |
| 90 | + "in_temp": self.decode_temperature(response[8]), "out_temp": self.decode_temperature(response[7]), |
| 91 | + "filter_remain": response[10] * 256 + response[9], "time": "{}:{}".format(response[11], response[12]), |
| 92 | + "request_error_code": response[13], "fw_version": "{:02x}{:02x}".format(response[16], response[17])} |
| 93 | + except IndexError as e: |
| 94 | + result = {"code": 400, "error": "Got bad response from Tion '%s': %s while parsing" % (response, str(e))} |
| 95 | + finally: |
| 96 | + return result |
| 97 | + |
| 98 | + def _connect(self, mac: str, new_connection=True): |
| 99 | + if new_connection: |
| 100 | + self._btle.connect(mac, btle.ADDR_TYPE_RANDOM) |
| 101 | + for tc in self._btle.getCharacteristics(): |
| 102 | + if tc.uuid == self.uuid_notify: |
| 103 | + self.notify = tc |
| 104 | + if tc.uuid == self.uuid_write: |
| 105 | + self.write = tc |
| 106 | + |
| 107 | + def get(self, mac: str, new_connection=True) -> dict: |
| 108 | + response = "" |
| 109 | + self._connect(mac, new_connection) # new_connection processed inside |
| 110 | + |
| 111 | + self.notify.read() |
| 112 | + self.write.write(self._get_status_command()) |
| 113 | + response = self._btle.getServiceByUUID(self.uuid).getCharacteristics()[0].read() |
| 114 | + |
| 115 | + if new_connection: |
| 116 | + self._btle.disconnect() |
| 117 | + return self._decode_response(response) |
| 118 | + |
| 119 | + def set(self, mac: str, request: dict): |
| 120 | + self._connect(mac) |
| 121 | + self.notify.read() |
| 122 | + self.write.write(self._encode_request(mac, request)) |
| 123 | + self._btle.disconnect() |
0 commit comments