Skip to content

Commit 3522c70

Browse files
committed
Fix formating
1 parent feda4db commit 3522c70

3 files changed

Lines changed: 179 additions & 184 deletions

File tree

s3.py

Lines changed: 116 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -1,131 +1,123 @@
11
if __package__ == "":
2-
from tion import tion
2+
from tion import tion
33
else:
4-
from . import tion
4+
from . import tion
55

66
from bluepy import btle
77
import time
88

99
class s3(tion):
10-
uuid = "6e400001-b5a3-f393-e0a9-e50e24dcca9e"
11-
uuid_write = "6e400002-b5a3-f393-e0a9-e50e24dcca9e"
12-
uuid_notify = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"
13-
write = None
14-
notify = None
15-
statuses = [ 'off', 'on' ]
16-
modes = [ 'recirculation', 'mixed' ]
17-
_btle = None
18-
19-
command_prefix = 61
20-
command_suffix = 90
21-
22-
command_PAIR = 5
23-
command_REQUEST_PARAMS = 1
24-
command_SET_PARAMS = 2
25-
26-
def __init__(self):
27-
self._btle = btle.Peripheral(None)
28-
29-
def pair(self, mac: str):
30-
self._btle.connect(mac, btle.ADDR_TYPE_RANDOM)
31-
characteristic = self._btle.getServiceByUUID(self.uuid).getCharacteristics()[0]
32-
characteristic.write(bytes(self._get_pair_command()))
33-
self._btle.disconnect()
34-
35-
def create_command(self, command: int) -> bytearray:
36-
command_special = 1 if command == self.command_PAIR else 0
37-
return bytearray([self.command_prefix, command, command_special, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, self.command_suffix])
38-
39-
def _get_pair_command(self) -> bytearray:
40-
return self.create_command(self.command_PAIR)
41-
42-
def _get_status_command(self) -> bytearray:
43-
return self.create_command(self.command_REQUEST_PARAMS)
44-
45-
def _process_mode(self, mode_code: int) -> str:
46-
try:
47-
result = self.modes[mode_code]
48-
except IndexError:
49-
result = 'outside'
50-
return result
51-
52-
def _encode_mode(self, mode: str) -> int:
53-
return self.modes.index(mode) if mode in self.modes else 2
54-
55-
def _encode_status(self, status: str) -> int:
56-
return self.statuses.index(status) if status in self.statuses else 0
57-
58-
def _process_status(self, code: int) -> str:
59-
try:
60-
result = self.statuses[code]
61-
except IndexError:
62-
result = 'unknown'
63-
return result
64-
65-
def _encode_request(self, mac: str, request: dict) -> bytearray:
66-
try:
67-
if request["fan_speed"] == 0:
68-
del request["fan_speed"]
69-
reqest["status"] = "off"
70-
except KeyError:
71-
pass
72-
73-
settings = {**self.get(mac, False), **request}
74-
new_settings = self.create_command(self.command_SET_PARAMS)
75-
new_settings[2] = settings["fan_speed"]
76-
new_settings[3] = settings["heater_temp"]
77-
new_settings[4] = self._encode_mode(settings["mode"])
78-
new_settings[5] = self._encode_status(settings["heater"]) | (self._encode_status(settings["status"])<<1) | (self._encode_status(settings["sound"])<<3)
79-
return new_settings
80-
81-
def _decode_response(self, response: bytearray) -> dict:
82-
try:
83-
result = {
84-
"code": 200,
85-
"heater": self._process_status(response[4] & 1),
86-
"status": self._process_status(response[4] >> 1 & 1),
87-
"sound": self._process_status(response[4] >> 3 & 1),
88-
"mode": self._process_mode(int(list("{:02x}".format(response[2]))[0])),
89-
"fan_speed": int(list("{:02x}".format(response[2]))[1]),
90-
"heater_temp": response[3],
91-
"in_temp": self.decode_temperature(response[8]),
92-
"out_temp": self.decode_temperature(response[7]),
93-
"filter_remain": response[10]*256 + response[9],
94-
"time": "{}:{}".format(response[11],response[12]),
95-
"request_error_code": response[13],
96-
"fw_version": "{:02x}{:02x}".format(response[16],response[17])
97-
}
98-
except IndexError as e:
99-
result = {
100-
"code": 400,
101-
"error": "Got bad response from Tion '%s': %s while parsing" % (response, str(e))
102-
}
103-
finally:
104-
return result
105-
106-
def _connect(self, mac: str, new_connection = True):
107-
if new_connection:
108-
self._btle.connect(mac, btle.ADDR_TYPE_RANDOM)
109-
for tc in self._btle.getCharacteristics():
110-
if tc.uuid == self.uuid_notify:
111-
self.notify = tc
112-
if tc.uuid == self.uuid_write:
113-
self.write = tc
114-
115-
def get(self, mac: str, new_connection = True) -> dict:
116-
response = ""
117-
self._connect(mac, new_connection) #new_connection processed inside
118-
119-
self.notify.read()
120-
self.write.write(self._get_status_command())
121-
response = self._btle.getServiceByUUID(self.uuid).getCharacteristics()[0].read()
122-
123-
if new_connection:
124-
self._btle.disconnect()
125-
return self._decode_response(response)
126-
127-
def set(self, mac: str, request: dict):
128-
self._connect(mac)
129-
self.notify.read()
130-
self.write.write(self._encode_request(mac, request))
131-
self._btle.disconnect()
10+
uuid = "6e400001-b5a3-f393-e0a9-e50e24dcca9e"
11+
uuid_write = "6e400002-b5a3-f393-e0a9-e50e24dcca9e"
12+
uuid_notify = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"
13+
write = None
14+
notify = None
15+
statuses = ['off', 'on']
16+
modes = ['recirculation', 'mixed']
17+
_btle = None
18+
19+
command_prefix = 61
20+
command_suffix = 90
21+
22+
command_PAIR = 5
23+
command_REQUEST_PARAMS = 1
24+
command_SET_PARAMS = 2
25+
26+
def __init__(self):
27+
self._btle = btle.Peripheral(None)
28+
29+
def pair(self, mac: str):
30+
self._btle.connect(mac, btle.ADDR_TYPE_RANDOM)
31+
characteristic = self._btle.getServiceByUUID(self.uuid).getCharacteristics()[0]
32+
characteristic.write(bytes(self._get_pair_command()))
33+
self._btle.disconnect()
34+
35+
def create_command(self, command: int) -> bytearray:
36+
command_special = 1 if command == self.command_PAIR else 0
37+
return bytearray([self.command_prefix, command, command_special, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
38+
self.command_suffix])
39+
40+
def _get_pair_command(self) -> bytearray:
41+
return self.create_command(self.command_PAIR)
42+
43+
def _get_status_command(self) -> bytearray:
44+
return self.create_command(self.command_REQUEST_PARAMS)
45+
46+
def _process_mode(self, mode_code: int) -> str:
47+
try:
48+
result = self.modes[mode_code]
49+
except IndexError:
50+
result = 'outside'
51+
return result
52+
53+
def _encode_mode(self, mode: str) -> int:
54+
return self.modes.index(mode) if mode in self.modes else 2
55+
56+
def _encode_status(self, status: str) -> int:
57+
return self.statuses.index(status) if status in self.statuses else 0
58+
59+
def _process_status(self, code: int) -> str:
60+
try:
61+
result = self.statuses[code]
62+
except IndexError:
63+
result = 'unknown'
64+
return result
65+
66+
def _encode_request(self, mac: str, request: dict) -> bytearray:
67+
try:
68+
if request["fan_speed"] == 0:
69+
del request["fan_speed"]
70+
reqest["status"] = "off"
71+
except KeyError:
72+
pass
73+
74+
settings = {**self.get(mac, False), **request}
75+
new_settings = self.create_command(self.command_SET_PARAMS)
76+
new_settings[2] = settings["fan_speed"]
77+
new_settings[3] = settings["heater_temp"]
78+
new_settings[4] = self._encode_mode(settings["mode"])
79+
new_settings[5] = self._encode_status(settings["heater"]) | (self._encode_status(settings["status"]) << 1) | (
80+
self._encode_status(settings["sound"]) << 3)
81+
return new_settings
82+
83+
def _decode_response(self, response: bytearray) -> dict:
84+
try:
85+
result = {"code": 200, "heater": self._process_status(response[4] & 1),
86+
"status": self._process_status(response[4] >> 1 & 1),
87+
"sound": self._process_status(response[4] >> 3 & 1),
88+
"mode": self._process_mode(int(list("{:02x}".format(response[2]))[0])),
89+
"fan_speed": int(list("{:02x}".format(response[2]))[1]), "heater_temp": response[3],
90+
"in_temp": self.decode_temperature(response[8]), "out_temp": self.decode_temperature(response[7]),
91+
"filter_remain": response[10] * 256 + response[9], "time": "{}:{}".format(response[11], response[12]),
92+
"request_error_code": response[13], "fw_version": "{:02x}{:02x}".format(response[16], response[17])}
93+
except IndexError as e:
94+
result = {"code": 400, "error": "Got bad response from Tion '%s': %s while parsing" % (response, str(e))}
95+
finally:
96+
return result
97+
98+
def _connect(self, mac: str, new_connection=True):
99+
if new_connection:
100+
self._btle.connect(mac, btle.ADDR_TYPE_RANDOM)
101+
for tc in self._btle.getCharacteristics():
102+
if tc.uuid == self.uuid_notify:
103+
self.notify = tc
104+
if tc.uuid == self.uuid_write:
105+
self.write = tc
106+
107+
def get(self, mac: str, new_connection=True) -> dict:
108+
response = ""
109+
self._connect(mac, new_connection) # new_connection processed inside
110+
111+
self.notify.read()
112+
self.write.write(self._get_status_command())
113+
response = self._btle.getServiceByUUID(self.uuid).getCharacteristics()[0].read()
114+
115+
if new_connection:
116+
self._btle.disconnect()
117+
return self._decode_response(response)
118+
119+
def set(self, mac: str, request: dict):
120+
self._connect(mac)
121+
self.notify.read()
122+
self.write.write(self._encode_request(mac, request))
123+
self._btle.disconnect()

tests/decode_temperature.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@
99

1010
import tion
1111

12+
1213
class TestDecodeTemperature(unittest.TestCase, tion.tion):
13-
def test_positive(self):
14-
self.assertEqual(self.decode_temperature(0x09), 9, "Should be 9")
15-
def test_negative(self):
16-
self.assertEqual(self.decode_temperature(0xFF), -1, "Should be -1")
14+
def test_positive(self):
15+
self.assertEqual(self.decode_temperature(0x09), 9, "Should be 9")
16+
17+
def test_negative(self):
18+
self.assertEqual(self.decode_temperature(0xFF), -1, "Should be -1")
19+
1720

1821
if __name__ == '__main__':
19-
unittest.main()
22+
unittest.main()

tion.py

Lines changed: 55 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,58 @@
11
import abc
22

33
class tion:
4-
@abc.abstractmethod
5-
def _send_request(self, request: bytearray) -> bytearray:
6-
""" Send request to device
7-
8-
Args:
9-
request : array of bytes to send to device
10-
Returns:
11-
array of bytes with device response
12-
"""
13-
pass
14-
15-
@abc.abstractmethod
16-
def _decode_response(self, response: bytearray) -> dict:
17-
""" Decode response from device
18-
19-
Args:
20-
response: array of bytes with data from device, taken from _send_request
21-
Returns:
22-
dictionary with device response
23-
"""
24-
pass
25-
26-
@abc.abstractmethod
27-
def _encode_request(self, request: dict) -> bytearray:
28-
""" Encode dictionary of request to byte array
29-
30-
Args:
31-
request: dictionry with request
32-
Returns:
33-
Byte array for sending to device
34-
"""
35-
pass
36-
37-
@abc.abstractmethod
38-
def get() -> dict:
39-
""" Get device information
40-
Returns:
41-
dictionay with device paramters
42-
"""
43-
pass
44-
45-
def decode_temperature(self, raw: bytes) -> int:
46-
""" Converts temperature from bytes with addition code to int
47-
Args:
48-
raw: raw temperature value from Tion
49-
Returns:
50-
Integer value for temperature
51-
"""
52-
barrier = 0b10000000
53-
if (raw < barrier):
54-
result = raw
55-
else:
56-
result = -(~(result - barrier) + barrier + 1)
57-
58-
return result
4+
@abc.abstractmethod
5+
def _send_request(self, request: bytearray) -> bytearray:
6+
""" Send request to device
7+
8+
Args:
9+
request : array of bytes to send to device
10+
Returns:
11+
array of bytes with device response
12+
"""
13+
pass
14+
15+
@abc.abstractmethod
16+
def _decode_response(self, response: bytearray) -> dict:
17+
""" Decode response from device
18+
19+
Args:
20+
response: array of bytes with data from device, taken from _send_request
21+
Returns:
22+
dictionary with device response
23+
"""
24+
pass
25+
26+
@abc.abstractmethod
27+
def _encode_request(self, request: dict) -> bytearray:
28+
""" Encode dictionary of request to byte array
29+
30+
Args:
31+
request: dictionry with request
32+
Returns:
33+
Byte array for sending to device
34+
"""
35+
pass
36+
37+
@abc.abstractmethod
38+
def get() -> dict:
39+
""" Get device information
40+
Returns:
41+
dictionay with device paramters
42+
"""
43+
pass
44+
45+
def decode_temperature(self, raw: bytes) -> int:
46+
""" Converts temperature from bytes with addition code to int
47+
Args:
48+
raw: raw temperature value from Tion
49+
Returns:
50+
Integer value for temperature
51+
"""
52+
barrier = 0b10000000
53+
if (raw < barrier):
54+
result = raw
55+
else:
56+
result = -(~(result - barrier) + barrier + 1)
57+
58+
return result

0 commit comments

Comments
 (0)