-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathdice.py
More file actions
179 lines (152 loc) · 5.38 KB
/
dice.py
File metadata and controls
179 lines (152 loc) · 5.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
"""
Dice API
"""
import asyncio
import struct
import enum
from typing import Callable, Awaitable
class Color(enum.Enum):
"""
Color of a dice (dots)
"""
BLACK = 0
RED = 1
GREEN = 2
BLUE = 3
YELLOW = 4
ORANGE = 5
class StabilityDescriptor(enum.Enum):
"""
Value describing a movement state taking place at the time of position capture
"""
ROLLING = 1
STABLE = 2
TILT_STABLE = 3
FAKE_STABLE = 4
MOVE_STABLE = 5
class Dice:
"""
Represents a dice providing API to features
"""
def __init__(self, ble_client) -> None:
self._client = ble_client
self._rx_char = None
self._tx_char = None
self._color = None
self._color_upd_q = asyncio.Queue()
self._battery_lvl_upd_q = asyncio.Queue()
self._xyz_interpret_fn = None
async def _noop_cb(_num, _stab_descr):
pass
self._position_upd_cb = _noop_cb
async def connect(self):
await self._client.connect()
self._tx_char = self._client.services.get_characteristic(
"6e400002-b5a3-f393-e0a9-e50e24dcca9e"
)
self._rx_char = self._client.services.get_characteristic(
"6e400003-b5a3-f393-e0a9-e50e24dcca9e"
)
await self._client.start_notify(self._rx_char, self._handle_upd)
async def disconnect(self):
await self._client.disconnect()
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.disconnect()
async def set_led(self, led1_rgb_tuple, led2_rgb_tuple):
"""
Set LED color
:param led1_rgb_tuple: RGB tuple for LED 1
:param led2_rgb_tuple: RGB tuple for LED 2
"""
_validate_rgb_tuple(led1_rgb_tuple)
_validate_rgb_tuple(led2_rgb_tuple)
cmd_code = 8
data = bytearray([cmd_code, *led1_rgb_tuple, *led2_rgb_tuple])
await self._client.write_gatt_char(self._tx_char, data)
async def pulse_led(self, pulse_count, on_time_ms, off_time_ms, rgb_tuple):
"""
Pulses the die's leds for set time
:param pulse_count: How many pulses
:param on_time_ms: How much time to spend on (units of 10 ms)
:param off_time_ms: How much time to spend off (units of 10 ms)
:param rgb_tuple: tuple of RGB values set to pulse
"""
_validate_rgb_tuple(rgb_tuple)
cmd_code = 16
cmd_ending = [1, 0]
data = bytearray(
[cmd_code, pulse_count, on_time_ms, off_time_ms, *rgb_tuple, *cmd_ending]
)
await self._client.write_gatt_char(self._tx_char, data)
async def get_color(self):
"""
Read dice color
:returns: Color
"""
self._color = self._color or await self._fetch_color()
return self._color
async def _fetch_color(self):
cmd_code = 23
msg = bytearray([cmd_code])
await self._client.write_gatt_char(self._tx_char, msg)
color = await self._color_upd_q.get()
return color
async def get_battery_level(self):
"""
Read a battery level
:returns: int 0-100%
"""
cmd_code = 3
msg = bytearray([cmd_code])
await self._client.write_gatt_char(self._tx_char, msg)
return await self._battery_lvl_upd_q.get()
async def subscribe_number_notification(self, callback: Callable[[int, StabilityDescriptor], Awaitable[None]]):
"""
Subscribe to receiving position change notifications
:param callback: callback function receiving number update notifications
"""
self._position_upd_cb = callback
async def _handle_upd(self, _char, data: bytearray):
first_byte = data[0]
if first_byte == 82:
await self._position_upd_cb(0, StabilityDescriptor.ROLLING)
return
second_byte = data[1]
third_byte = data[2]
if first_byte == 66 and second_byte == 97 and third_byte == 116:
await self._battery_lvl_upd_q.put(data[3])
return
if first_byte == 67 and second_byte == 111 and third_byte == 108:
await self._color_upd_q.put(Color(data[3]))
return
if first_byte == 83:
xyz = _get_xyz_from_bytes(data[1:4])
rolled_value = self._xyz_interpret_fn(xyz)
await self._position_upd_cb(rolled_value, StabilityDescriptor.STABLE)
return
if second_byte == 83:
xyz = _get_xyz_from_bytes(data[2:5])
rolled_value = self._xyz_interpret_fn(xyz)
descr = None
if first_byte == 70:
descr = StabilityDescriptor.FAKE_STABLE
elif first_byte == 84:
descr = StabilityDescriptor.TILT_STABLE
elif first_byte == 77:
descr = StabilityDescriptor.MOVE_STABLE
else:
descr = None
await self._position_upd_cb(rolled_value, descr)
def _validate_rgb_tuple(rgb_tuple):
def is_any_out_of_range():
rgb_range = range(256)
return any(code not in rgb_range for code in rgb_tuple)
if len(rgb_tuple) != 3 or is_any_out_of_range():
raise ValueError(
f"rgb_tuple is expected to be a 3 numbers tuple, got {rgb_tuple}"
)
def _get_xyz_from_bytes(xyz_bytes):
return struct.unpack(">bbb", xyz_bytes)