-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathutils.py
More file actions
240 lines (191 loc) · 8.1 KB
/
utils.py
File metadata and controls
240 lines (191 loc) · 8.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
import keyword
import re
import os
import uuid
from typing import Tuple, List
from colorama import Fore, Style
from polyapi.constants import BASIC_PYTHON_TYPES
from polyapi.typedefs import PropertySpecification, PropertyType
from polyapi.schema import wrapped_generate_schema_types, clean_title, map_primitive_types
# this string should be in every __init__ file.
# it contains all the imports needed for the function or variable code to run
CODE_IMPORTS = "from typing import List, Dict, Any, Optional, Callable\nfrom typing_extensions import TypedDict, NotRequired\nimport logging\nimport requests\nimport socketio # type: ignore\nfrom polyapi.config import get_api_key_and_url\nfrom polyapi.execute import execute, execute_post, variable_get, variable_update\n\n"
def init_the_init(full_path: str, code_imports="") -> None:
init_path = os.path.join(full_path, "__init__.py")
if not os.path.exists(init_path):
code_imports = code_imports or CODE_IMPORTS
with open(init_path, "w") as f:
f.write(code_imports)
def add_import_to_init(full_path: str, next: str, code_imports="") -> None:
init_the_init(full_path, code_imports=code_imports)
init_path = os.path.join(full_path, "__init__.py")
with open(init_path, "a+") as f:
import_stmt = "from . import {}\n".format(next)
f.seek(0)
lines = f.readlines()
if import_stmt not in set(lines):
f.write(import_stmt)
def get_auth_headers(api_key: str):
return {"Authorization": f"Bearer {api_key}"}
def camelCase(s: str) -> str:
s = s.strip()
if " " in s or "-" in s:
s = re.sub(r"(_|-)+", " ", s).title().replace(" ", "")
return ''.join([s[0].lower(), s[1:]])
else:
# s is already in camelcase as best as we can tell, just move on!
return s
def pascalCase(s) -> str:
return re.sub(r"(^|_)([a-z])", lambda match: match.group(2).upper(), s)
def print_green(s: str):
print(Fore.GREEN + s + Style.RESET_ALL)
def print_yellow(s: str):
print(Fore.YELLOW + s + Style.RESET_ALL)
def print_red(s: str):
print(Fore.RED + s + Style.RESET_ALL)
def add_type_import_path(function_name: str, arg: str) -> str:
""" if not basic type, coerce to camelCase and add the import path
"""
# for now, just treat Callables as basic types
if arg.startswith("Callable"):
return arg
if arg in BASIC_PYTHON_TYPES:
return arg
if arg.startswith("List["):
sub = arg[5:-1]
if sub in BASIC_PYTHON_TYPES:
return arg
else:
if '"' in sub:
sub = sub.replace('"', "")
return f'List["{to_func_namespace(function_name)}.{camelCase(sub)}"]'
else:
return f'List[{to_func_namespace(function_name)}.{camelCase(sub)}]'
return f'{to_func_namespace(function_name)}.{camelCase(arg)}'
def get_type_and_def(type_spec: PropertyType) -> Tuple[str, str]:
if type_spec["kind"] == "plain":
value = type_spec["value"]
if value.endswith("[]"):
primitive = map_primitive_types(value[:-2])
return f"List[{primitive}]", ""
else:
return map_primitive_types(value), ""
elif type_spec["kind"] == "primitive":
return map_primitive_types(type_spec["type"]), ""
elif type_spec["kind"] == "array":
if type_spec.get("items"):
items = type_spec["items"]
if items.get("$ref"):
return wrapped_generate_schema_types(type_spec, "ResponseType", "Dict") # type: ignore
else:
item_type, _ = get_type_and_def(items)
title = f"List[{item_type}]"
title = clean_title(title)
return title, ""
else:
return "List", ""
elif type_spec["kind"] == "void":
return "None", ""
elif type_spec["kind"] == "object":
if type_spec.get("schema"):
schema = type_spec["schema"]
title = schema.get("title", schema.get("name", ""))
if title:
assert isinstance(title, str)
return wrapped_generate_schema_types(schema, title, "Dict") # type: ignore
elif schema.get("allOf") and len(schema['allOf']):
# we are in a case of a single allOf, lets strip off the allOf and move on!
# our library doesn't handle allOf well yet
allOf = schema['allOf'][0]
title = allOf.get("title", allOf.get("name", ""))
return wrapped_generate_schema_types(allOf, title, "Dict")
elif schema.get("items"):
# fallback to schema $ref name if no explicit title
items = schema.get("items") # type: ignore
title = items.get("title") # type: ignore
if not title:
# title is actually a reference to another schema
title = items.get("$ref", "") # type: ignore
title = title.rsplit("/", 1)[-1]
if not title:
return "List", ""
title = f"List[{title}]"
return wrapped_generate_schema_types(schema, title, "List")
else:
return "Any", ""
else:
return "Dict", ""
elif type_spec["kind"] == "function":
arg_types = []
arg_defs = []
if "spec" in type_spec:
return_type, _ = get_type_and_def(type_spec["spec"]["returnType"])
if return_type not in BASIC_PYTHON_TYPES:
# for now only Python only supports basic types as return types
return_type = "Any"
for argument in type_spec["spec"]["arguments"]:
arg_type, arg_def = get_type_and_def(argument["type"])
arg_types.append(arg_type)
if arg_def:
arg_defs.append(arg_def)
final_arg_type = "Callable[[{}], {}]".format(", ".join(arg_types), return_type)
return final_arg_type, "\n".join(arg_defs)
else:
return "Callable", ""
elif type_spec["kind"] == "any":
return "Any", ""
else:
return "Any", ""
def parse_arguments(function_name: str, arguments: List[PropertySpecification]) -> Tuple[str, str]:
args_def = []
arg_string = ""
for idx, a in enumerate(arguments):
arg_type, arg_def = get_type_and_def(a["type"])
if arg_def:
args_def.append(arg_def)
a["name"] = rewrite_arg_name(a["name"])
arg_string += f" {a['name']}: {add_type_import_path(function_name, arg_type)}"
description = a.get("description", "")
description = description.replace("\n", " ")
if description:
if idx == len(arguments) - 1:
arg_string += f" # {description}\n"
else:
arg_string += f", # {description}\n"
else:
arg_string += ",\n"
return arg_string.rstrip("\n"), "\n\n".join(args_def)
def poly_full_path(context, name) -> str:
"""get the functions path as it will be exposed in the poly library"""
if context:
path = context + "." + name
else:
path = name
return f"poly.{path}"
RESERVED_WORDS = {"List", "Dict", "Any", "Optional", "Callable"} | set(keyword.kwlist)
def to_func_namespace(s: str) -> str:
""" convert a function name to some function namespace
by default it is
"""
rv = s[0].upper() + s[1:]
rv = rewrite_reserved(rv)
return rv
def rewrite_reserved(s: str) -> str:
if s in RESERVED_WORDS:
return "_" + s
else:
return s
def rewrite_arg_name(s: str):
return rewrite_reserved(camelCase(s))
valid_subdomains = ["na[1-2]", "eu[1-2]", "dev"]
def is_valid_polyapi_url(_url: str):
# Join the subdomains into a pattern
subdomain_pattern = "|".join(valid_subdomains)
pattern = rf"^https://({subdomain_pattern})\.polyapi\.io$"
return re.match(pattern, _url) is not None
def is_valid_uuid(uuid_string, version=4):
try:
uuid_obj = uuid.UUID(uuid_string, version=version)
except ValueError:
return False
return str(uuid_obj) == uuid_string