-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathnetwork.py
More file actions
126 lines (105 loc) · 3.42 KB
/
network.py
File metadata and controls
126 lines (105 loc) · 3.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import asyncio
import traceback
from typing import Optional
from .log import log
_logger = log.fork("network")
class Connection:
def __init__(
self,
host: str,
port: int,
ssl: bool = False,
timeout: Optional[float] = 10,
) -> None:
self._host = host
self._port = port
self._ssl = ssl
self._stop_flag = False
self._stop_ev = asyncio.Event()
self.timeout = timeout
self._reader: Optional[asyncio.StreamReader] = None
self._writer: Optional[asyncio.StreamWriter] = None
@property
def host(self) -> str:
return self._host
@property
def port(self) -> int:
return self._port
@property
def ssl(self) -> bool:
return self._ssl
@property
def writer(self) -> asyncio.StreamWriter:
if not self._writer:
raise RuntimeError("Connection closed!")
return self._writer
@property
def reader(self) -> asyncio.StreamReader:
if not self._reader:
raise RuntimeError("Connection closed!")
return self._reader
@property
def closed(self) -> bool:
return not (self._reader or self._writer) or self._stop_flag
async def wait_closed(self) -> None:
await self._stop_ev.wait()
async def connect(self) -> None:
if self._stop_flag:
raise RuntimeError("Connection already stopped")
self._reader, self._writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port, ssl=self.ssl), self.timeout
)
async def close(self, force: bool = False):
if self._stop_flag and not force:
return
_logger.debug("Closing connection")
await self.on_close()
self._writer.close() # type: ignore
await self.writer.wait_closed()
self._reader = None
self._writer = None
async def stop(self):
if self._stop_flag:
return
self._stop_flag = True
await self.close(force=True)
self._stop_ev.set()
async def _read_loop(self):
try:
while not self.closed:
length = (
int.from_bytes(await self.reader.readexactly(4), byteorder="big")
- 4
)
if length:
await self.on_message(length)
else:
await self.close()
except asyncio.CancelledError:
await self.stop()
except Exception as e:
if not await self.on_error():
raise e
async def loop(self):
fail = False
while not self._stop_flag:
try:
await self.connect()
fail = False
except (OSError, ConnectionError) as e:
if fail:
_logger.debug(f"connect retry fail: {repr(e)}")
else:
_logger.error("Connect fail, retrying...")
fail = True
await asyncio.sleep(1 if not fail else 5)
continue
await self.on_connected()
await self._read_loop()
async def on_connected(self): ...
async def on_close(self): ...
async def on_message(self, message_length: int): ...
async def on_error(self) -> bool:
"""use sys.exc_info() to catch exceptions"""
traceback.print_exc()
return True