-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_mocks.py
More file actions
112 lines (89 loc) · 3.63 KB
/
_mocks.py
File metadata and controls
112 lines (89 loc) · 3.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# tests/_mocks.py
import sys
import time as _time
# Mock MicroPython modules for PC testing
class MockMachine:
class UART:
def __init__(self, *args, **kwargs):
self.tx_buffer = bytearray()
self.rx_buffer = bytearray()
self.rx_pos = 0
self.name = kwargs.get("name", "mock")
self.auto_ack = kwargs.get("auto_ack", False)
def write(self, data):
# print(f"[{self.name}] UART TX: {data}")
self.tx_buffer.extend(data)
if self.auto_ack:
# Dynamically import to avoid circular dependencies
from urst.codec import URSTCodec
from urst.constants import (
FRAME_ACK,
FRAME_DATA,
FRAME_FRAG,
FRAME_UPDATE_REQUEST,
)
# Create a temporary UART and codec to decode the frame
temp_uart = MockMachine.UART()
temp_uart.rx_buffer = data
temp_uart.rx_pos = 0
temp_codec = URSTCodec(temp_uart)
decoded_payload = temp_codec.recv_frame()
if decoded_payload and len(decoded_payload) >= 2:
frame_type = decoded_payload[0]
seq_num = decoded_payload[1]
if frame_type in [FRAME_DATA, FRAME_UPDATE_REQUEST, FRAME_FRAG]:
# Create and encode an ACK frame
ack_uart = MockMachine.UART(name=f"{self.name}_resp")
ack_codec = URSTCodec(ack_uart)
ack_payload = bytes([FRAME_ACK, seq_num])
ack_codec.send_frame(ack_payload)
self.rx_buffer.extend(ack_uart.tx_buffer)
return len(data)
def read(self, num_bytes=None):
if self.rx_pos >= len(self.rx_buffer):
return b""
if num_bytes:
data = self.rx_buffer[self.rx_pos : self.rx_pos + num_bytes]
self.rx_pos += len(data)
else:
data = self.rx_buffer[self.rx_pos :]
self.rx_pos = len(self.rx_buffer)
# print(f"{[self.name]} UART RX: {data}")
return bytes(data)
def any(self):
return len(self.rx_buffer) - self.rx_pos
class Pin:
def __init__(self, pin_num):
self.pin = pin_num
class MockMicropython:
@staticmethod
def const(x):
return x
class MockTime:
"""
Mock of MicroPython's time module.
Inherits all functions/attributes from CPython's time,
and adds ticks_ms / ticks_diff for MicroPython compatibility.
"""
_start = _time.monotonic()
_WRAP = 1 << 30 # mimic MicroPython's ~12.4 days rollover at 1ms resolution
@staticmethod
def ticks_ms() -> int:
"""Return milliseconds since MockTime was first imported, with rollover."""
return int((_time.monotonic() - MockTime._start) * 1000) % MockTime._WRAP
@staticmethod
def ticks_diff(t1: int, t0: int) -> int:
"""
Compute signed difference between two tick values.
Works across rollover, consistent with MicroPython.
"""
diff = (t1 - t0 + MockTime._WRAP // 2) % MockTime._WRAP - MockTime._WRAP // 2
return diff
# Copy all attributes from the real CPython time module into MockTime
for _attr in dir(_time):
if not hasattr(MockTime, _attr):
setattr(MockTime, _attr, getattr(_time, _attr))
# Install mocks
sys.modules["machine"] = MockMachine()
sys.modules["micropython"] = MockMicropython()
sys.modules["time"] = MockTime()