Skip to content

Commit 834317e

Browse files
develop
1 parent e64fbbd commit 834317e

File tree

5 files changed

+36
-34
lines changed

5 files changed

+36
-34
lines changed

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[metadata]
22

33
name = data_pipe
4-
version = 0.2.0
4+
version = 0.2.2
55

66
requires-python = >=3.8
77

src/main/data_pipe/basic_trunk.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,15 @@
2323
from trio._core._run import GLOBAL_RUN_CONTEXT as trio_global_context
2424

2525

26+
@enum.unique
27+
class TrunkMode(enum.Enum):
28+
"framework type"
29+
30+
TRIO = enum.auto()
31+
CURIO = enum.auto()
32+
ASYNCIO = enum.auto()
33+
34+
2635
@enum.unique
2736
class ExecCode(enum.IntEnum):
2837
"server response status code"
@@ -150,15 +159,15 @@ async def wait_curio_writable(cls, fd:int) -> None:
150159
async def wait_asyncio_readable(cls, fd:int) -> None:
151160
loop = asyncio.get_running_loop()
152161
future = asyncio.Future()
153-
future.add_done_callback(lambda : loop.remove_reader(fd))
162+
future.add_done_callback(lambda *args : loop.remove_reader(fd))
154163
loop.add_reader(fd, future.set_result, None)
155164
await future
156165

157166
@classmethod
158167
async def wait_asyncio_writable(cls, fd:int) -> None:
159168
loop = asyncio.get_running_loop()
160169
future = asyncio.Future()
161-
future.add_done_callback(lambda : loop.remove_writer(fd))
170+
future.add_done_callback(lambda *args : loop.remove_writer(fd))
162171
loop.add_writer(fd, future.set_result, None)
163172
await future
164173

@@ -213,17 +222,29 @@ async def this_task(cls) -> int:
213222
raise RuntimeError(f"no loop")
214223

215224
@classmethod
216-
async def spawn_task(cls, coro_func:CoroutineType, *args) -> "AnyTask":
225+
async def spawn_task(cls, func:CoroutineType, *args) -> "AnyTask":
217226
"create and launch new background task"
218227
if cls.has_trio_loop():
219-
return trio.hazmat.spawn_system_task(coro_func, *args)
228+
return trio.hazmat.spawn_system_task(func, *args)
220229
elif cls.has_curio_loop():
221-
return await curio.spawn(coro_func, *args)
230+
return await curio.spawn(func, *args)
222231
elif cls.has_asyncio_loop():
223-
return asyncio.create_task(coro_func(*args))
232+
return asyncio.create_task(func(*args))
224233
else:
225234
raise RuntimeError(f"no loop")
226235

236+
@classmethod
237+
def invoke_main(cls, mode:TrunkMode, func:CoroutineType, *args, **kwargs) -> object:
238+
"create and launch main framework task"
239+
if mode == TrunkMode.TRIO:
240+
return trio.run(func, *args, **kwargs)
241+
elif mode == TrunkMode.CURIO:
242+
return curio.run(func, *args, **kwargs)
243+
elif mode == TrunkMode.ASYNCIO:
244+
return asyncio.run(func(*args, **kwargs))
245+
else:
246+
raise RuntimeError(f"no mode: {mode}")
247+
227248
@classmethod
228249
def default_tracer(cls,
229250
token:TrunkToken,

src/test/data_pipe_test/basic_trunk_test.py

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,6 @@
99
from data_pipe.basic_trunk import *
1010

1111

12-
@enum.unique
13-
class Mode(enum.Enum):
14-
"framework type"
15-
16-
TRIO = enum.auto()
17-
CURIO = enum.auto()
18-
ASYNCIO = enum.auto()
19-
20-
2112
class StatusStore(ctypes.Structure):
2213
"cross-process value"
2314

@@ -58,21 +49,10 @@ async def func_zen(cls, value:int) -> int:
5849
return value
5950

6051

61-
def setup_loop(mode:Mode, task:CoroutineType) -> None:
62-
if mode == Mode.TRIO:
63-
trio.run(task)
64-
elif mode == Mode.CURIO:
65-
curio.run(task)
66-
elif mode == Mode.ASYNCIO:
67-
asyncio.run(task())
68-
else:
69-
raise RuntimeError(f"no mode: {mode}")
70-
71-
72-
def verify_trunk(client_mode:Mode, server_mode:Mode, runner_class):
52+
def verify_trunk(client_mode:TrunkMode, server_mode:TrunkMode, runner_class):
7353

7454
runner_name = runner_class.__name__
75-
print(f"runner_class={runner_name} :: client_mode={client_mode} server_mode={server_mode}")
55+
print(f"runner={runner_name} :: client={client_mode.name} server={server_mode.name}")
7656

7757
counter = StatusStore.make()
7858
assert counter.value == 0
@@ -102,10 +82,10 @@ async def client_task():
10282
await basic_trunk.invoke(RPC.func_zen, 3)
10383

10484
def server_main():
105-
setup_loop(server_mode, server_task)
85+
BasicTrunk.invoke_main(server_mode, server_task)
10686

10787
def client_main():
108-
setup_loop(client_mode, client_task)
88+
BasicTrunk.invoke_main(client_mode, client_task)
10989

11090
server_runner = runner_class(target=server_main)
11191
client_runner = runner_class(target=client_main)
@@ -131,6 +111,6 @@ def client_main():
131111
def test_trunk():
132112
print()
133113
for runner_class in [threading.Thread, multiprocessing.Process]:
134-
for client_mode in Mode:
135-
for server_mode in Mode:
114+
for client_mode in TrunkMode:
115+
for server_mode in TrunkMode:
136116
verify_trunk(client_mode, server_mode, runner_class)

tool/github_squash.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from devrepo import shell
88

9-
point = "df57d41c8797052f08d3568f7b0159d285735700"
9+
point = "e64fbbdb04f9cd43f7f3e58688c4ac1e2c2bbb45"
1010
message = "develop"
1111

1212
shell(f"git reset --soft {point}")

tool/perform_tox.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@
66

77
from devrepo import shell
88

9+
shell(f"rm -rf .tox")
910
shell(f"tox")

0 commit comments

Comments
 (0)