forked from python-trio/unasync
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path__init__.py
More file actions
178 lines (137 loc) · 5.48 KB
/
__init__.py
File metadata and controls
178 lines (137 loc) · 5.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
"""Top-level package for unasync."""
import collections
import errno
import os
import sys
import tokenize as std_tokenize
import tokenize_rt
from setuptools.command import build_py as orig
from ._version import __version__ # NOQA
__all__ = [
"Rule",
"unasync_files",
"cmdclass_build_py",
]
_ASYNC_TO_SYNC = {
"__aenter__": "__enter__",
"__aexit__": "__exit__",
"__aiter__": "__iter__",
"__anext__": "__next__",
"asynccontextmanager": "contextmanager",
"AsyncIterable": "Iterable",
"AsyncIterator": "Iterator",
"AsyncGenerator": "Generator",
# TODO StopIteration is still accepted in Python 2, but the right change
# is 'raise StopAsyncIteration' -> 'return' since we want to use unasynced
# code in Python 3.7+
"StopAsyncIteration": "StopIteration",
}
class Rule:
"""A single set of rules for 'unasync'ing file(s)"""
def __init__(self, fromdir, todir, additional_replacements=None):
self.fromdir = fromdir.replace("/", os.sep)
self.todir = todir.replace("/", os.sep)
# Add any additional user-defined token replacements to our list.
self.token_replacements = _ASYNC_TO_SYNC.copy()
for key, val in (additional_replacements or {}).items():
self.token_replacements[key] = val
def _match(self, filepath):
"""Determines if a Rule matches a given filepath and if so
returns a higher comparable value if the match is more specific.
"""
file_segments = [x for x in filepath.split(os.sep) if x]
from_segments = [x for x in self.fromdir.split(os.sep) if x]
len_from_segments = len(from_segments)
if len_from_segments > len(file_segments):
return False
return next(
(
(len_from_segments, i)
for i in range(len(file_segments) - len_from_segments + 1)
if file_segments[i : i + len_from_segments] == from_segments
),
False,
)
def _unasync_file(self, filepath):
with open(filepath, "rb") as f:
encoding, _ = std_tokenize.detect_encoding(f.readline)
with open(filepath, "rt", encoding=encoding) as f:
tokens = tokenize_rt.src_to_tokens(f.read())
tokens = self._unasync_tokens(tokens)
result = tokenize_rt.tokens_to_src(tokens)
outfilepath = filepath.replace(self.fromdir, self.todir)
os.makedirs(os.path.dirname(outfilepath), exist_ok=True)
with open(outfilepath, "wb") as f:
f.write(result.encode(encoding))
def _unasync_tokens(self, tokens):
skip_next = False
for i, token in enumerate(tokens):
if skip_next:
skip_next = False
continue
if token.src in ["async", "await"]:
# When removing async or await, we want to skip the following whitespace
# so that `print(await stuff)` becomes `print(stuff)` and not `print( stuff)`
skip_next = True
else:
if token.name == "NAME":
token = token._replace(src=self._unasync_name(token.src))
elif token.name == "STRING":
left_quote, name, right_quote = (
token.src[0],
token.src[1:-1],
token.src[-1],
)
token = token._replace(
src=left_quote + self._unasync_name(name) + right_quote
)
yield token
def _unasync_name(self, name):
if name in self.token_replacements:
return self.token_replacements[name]
elif len(name) > 5 and name.startswith("Async") and name[5].isupper():
return f"Sync{name[5:]}"
return name
def unasync_files(fpath_list, rules):
for f in fpath_list:
found_rule = None
found_weight = None
for rule in rules:
weight = rule._match(f)
if weight and (found_weight is None or weight > found_weight):
found_rule = rule
found_weight = weight
if found_rule:
found_rule._unasync_file(f)
Token = collections.namedtuple("Token", ["type", "string", "start", "end", "line"])
_DEFAULT_RULE = Rule(fromdir="/_async/", todir="/_sync/")
class _build_py(orig.build_py):
"""
Subclass build_py from setuptools to modify its behavior.
Convert files in _async dir from being asynchronous to synchronous
and saves them in _sync dir.
"""
UNASYNC_RULES = (_DEFAULT_RULE,)
def run(self):
rules = self.UNASYNC_RULES
self._updated_files = []
# Base class code
if self.py_modules:
self.build_modules()
if self.packages:
self.build_packages()
self.build_package_data()
# Our modification!
unasync_files(self._updated_files, rules)
# Remaining base class code
self.byte_compile(self.get_outputs(include_bytecode=0))
def build_module(self, module, module_file, package):
outfile, copied = super().build_module(module, module_file, package)
if copied:
self._updated_files.append(outfile)
return outfile, copied
def cmdclass_build_py(rules=(_DEFAULT_RULE,)):
"""Creates a 'build_py' class for use within 'cmdclass={"build_py": ...}'"""
class _custom_build_py(_build_py):
UNASYNC_RULES = rules
return _custom_build_py