Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 109 additions & 41 deletions polyapi/poly_tables.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,64 @@
import os
import requests
from typing_extensions import NotRequired, TypedDict
from typing import List, Union, Type, Dict, Any, Literal, Tuple, Optional, get_args, get_origin
from typing import (
List,
Union,
Type,
Dict,
Any,
Literal,
Tuple,
Optional,
get_args,
get_origin,
)
from polyapi.utils import add_import_to_init, init_the_init
from polyapi.typedefs import TableSpecDto
from polyapi.constants import JSONSCHEMA_TO_PYTHON_TYPE_MAP
from polyapi.config import get_api_key_and_url

def scrub(data) -> Dict[str, Any]:
if (not data or not isinstance(data, (Dict, List))): return data
TABI_MODULE_IMPORTS = "\n".join(
[
"from typing_extensions import NotRequired, TypedDict",
"from typing import Union, List, Dict, Any, Literal, Optional, Required, overload",
"from polyapi.poly_tables import execute_query, first_result, transform_query, delete_one_response",
"from polyapi.typedefs import Table, PolyCountResult, PolyDeleteResult, PolyDeleteResults, SortOrder, StringFilter, NullableStringFilter, NumberFilter, NullableNumberFilter, BooleanFilter, NullableBooleanFilter, NullableObjectFilter",
]
)


def scrub(data: Any) -> Any:
if not data or not isinstance(data, (Dict, List)):
return data
if isinstance(data, List):
return [scrub(item) for item in data]
else:
temp = {}
secrets = ["x_api_key", "x-api-key", "access_token", "access-token", "authorization", "api_key", "api-key", "apikey", "accesstoken", "token", "password", "key"]
secrets = [
"x_api_key",
"x-api-key",
"access_token",
"access-token",
"authorization",
"api_key",
"api-key",
"apikey",
"accesstoken",
"token",
"password",
"key",
]
for key, value in data.items():
if isinstance(value, (Dict, List)):
temp[key] = scrub(data[key])
elif key.lower() in secrets:
temp[key] = '********'
temp[key] = "********"
else:
temp[key] = data[key]
return temp


def scrub_keys(e: Exception) -> Dict[str, Any]:
"""
Scrub the keys of an exception to remove sensitive information.
Expand All @@ -31,18 +68,26 @@ def scrub_keys(e: Exception) -> Dict[str, Any]:
"error": str(e),
"type": type(e).__name__,
"message": str(e),
"args": scrub(getattr(e, 'args', None))
"args": scrub(getattr(e, "args", None)),
}


def execute_query(table_id, method, query):
from polyapi import polyCustom
from polyapi.poly.client_id import client_id

try:
url = f"/tables/{table_id}/{method}?clientId={client_id}"
headers = {{
'x-poly-execution-id': polyCustom.get('executionId')
}}
Comment on lines -43 to -45
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦‍♂️

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lawl

api_key, base_url = get_api_key_and_url()
if not base_url:
raise ValueError(
"PolyAPI Instance URL is not configured, run `python -m polyapi setup`."
)

auth_key = polyCustom.get("executionApiKey") or api_key
url = f"{base_url.rstrip('/')}/tables/{table_id}/{method}?clientId={client_id}"
headers = {"x-poly-execution-id": polyCustom.get("executionId")}
if auth_key:
headers["Authorization"] = f"Bearer {auth_key}"
response = requests.post(url, json=query, headers=headers)
response.raise_for_status()
return response.json()
Expand All @@ -51,14 +96,16 @@ def execute_query(table_id, method, query):


def first_result(rsp):
if isinstance(rsp, dict) and isinstance(rsp.get('results'), list):
return rsp['results'][0] if rsp['results'] else None
if isinstance(rsp, dict) and isinstance(rsp.get("results"), list):
return rsp["results"][0] if rsp["results"] else None
return rsp


def delete_one_response(rsp):
if isinstance(rsp, dict) and isinstance(rsp.get('deleted'), int):
return { 'deleted': bool(rsp.get('deleted')) }
return { 'deleted': false }
if isinstance(rsp, dict) and isinstance(rsp.get("deleted"), int):
return {"deleted": bool(rsp.get("deleted"))}
return {"deleted": False}


_key_transform_map = {
"not_": "not",
Expand All @@ -72,8 +119,7 @@ def delete_one_response(rsp):
def _transform_keys(obj: Any) -> Any:
if isinstance(obj, dict):
return {
_key_transform_map.get(k, k): _transform_keys(v)
for k, v in obj.items()
_key_transform_map.get(k, k): _transform_keys(v) for k, v in obj.items()
}

elif isinstance(obj, list):
Expand All @@ -88,13 +134,13 @@ def transform_query(query: dict) -> dict:
return {
**query,
"where": _transform_keys(query["where"]) if query["where"] else None,
"orderBy": query["order_by"] if query["order_by"] else None
"orderBy": query["order_by"] if query["order_by"] else None,
}

return query


TABI_TABLE_TEMPLATE = '''
TABI_TABLE_TEMPLATE = """
{table_name}Columns = Literal[{table_columns}]


Expand Down Expand Up @@ -369,19 +415,25 @@ def delete_one(*args, **kwargs) -> PolyDeleteResult:
query["where"]["id"] = kwargs["id"]
query.pop("id", None)
return delete_one_response(execute_query({table_name}.table_id, "delete", transform_query(query)))
'''
"""


def _get_column_type_str(name: str, schema: Dict[str, Any], is_required: bool) -> str:
result = ""

col_type = schema.get("type", "object")
if isinstance(col_type, list):
subtypes = [_get_column_type_str(name, { **schema, "type": t }, is_required) for t in col_type]
subtypes = [
_get_column_type_str(name, {**schema, "type": t}, is_required)
for t in col_type
]
result = f"Union[{', '.join(subtypes)}]"
elif col_type == "array":
if isinstance(schema["items"], list):
subtypes = [_get_column_type_str(f"{name}{i}", s, True) for i, s in enumerate(schema["items"])]
subtypes = [
_get_column_type_str(f"{name}{i}", s, True)
for i, s in enumerate(schema["items"])
]
result = f"Tuple[{', '.join(subtypes)}]"
elif isinstance(schema["items"], dict):
result = f"List[{_get_column_type_str(name, schema['items'], True)}]"
Expand All @@ -391,7 +443,10 @@ def _get_column_type_str(name: str, schema: Dict[str, Any], is_required: bool) -
if isinstance(schema.get("patternProperties"), dict):
# TODO: Handle multiple pattern properties
result = f"Dict[str, {_get_column_type_str(f'{name}_', schema['patternProperties'], True)}]"
elif isinstance(schema.get("properties"), dict) and len(schema["properties"].values()) > 0:
elif (
isinstance(schema.get("properties"), dict)
and len(schema["properties"].values()) > 0
):
# TODO: Handle x-poly-refs
result = f'"{name}"'
else:
Expand All @@ -413,24 +468,32 @@ def _render_table_row_classes(table_name: str, schema: Dict[str, Any]) -> str:
return output[1].split("\n", 1)[1].strip()


def _render_table_subset_class(table_name: str, columns: List[Tuple[str, Dict[str, Any]]], required: List[str]) -> str:
def _render_table_subset_class(
table_name: str, columns: List[Tuple[str, Dict[str, Any]]], required: List[str]
) -> str:
# Generate class which can match any subset of a table row
lines = [f"class {table_name}Subset(TypedDict):"]

for name, schema in columns:
type_str = _get_column_type_str(f"_{table_name}Row{name}", schema, name in required)
type_str = _get_column_type_str(
f"_{table_name}Row{name}", schema, name in required
)
lines.append(f" {name}: NotRequired[{type_str}]")

return "\n".join(lines)


def _render_table_where_class(table_name: str, columns: List[Tuple[str, Dict[str, Any]]], required: List[str]) -> str:
def _render_table_where_class(
table_name: str, columns: List[Tuple[str, Dict[str, Any]]], required: List[str]
) -> str:
# Generate class for the 'where' part of the query
lines = [f"class {table_name}WhereFilter(TypedDict):"]

for name, schema in columns:
ftype_str = ""
type_str = _get_column_type_str(f"_{table_name}Row{name}", schema, True) # force required to avoid wrapping type in Optional[]
type_str = _get_column_type_str(
f"_{table_name}Row{name}", schema, True
) # force required to avoid wrapping type in Optional[]
is_required = name in required
if type_str == "bool":
ftype_str = "BooleanFilter" if is_required else "NullableBooleanFilter"
Expand All @@ -445,24 +508,34 @@ def _render_table_where_class(table_name: str, columns: List[Tuple[str, Dict[str
if ftype_str:
lines.append(f" {name}: NotRequired[Union[{type_str}, {ftype_str}]]")

lines.append(f' AND: NotRequired[Union["{table_name}WhereFilter", List["{table_name}WhereFilter"]]]')
lines.append(
f' AND: NotRequired[Union["{table_name}WhereFilter", List["{table_name}WhereFilter"]]]'
)
lines.append(f' OR: NotRequired[List["{table_name}WhereFilter"]]')
lines.append(f' NOT: NotRequired[Union["{table_name}WhereFilter", List["{table_name}WhereFilter"]]]')
lines.append(
f' NOT: NotRequired[Union["{table_name}WhereFilter", List["{table_name}WhereFilter"]]]'
)

return "\n".join(lines)


def _render_table(table: TableSpecDto) -> str:
columns = list(table["schema"]["properties"].items())
required_colunms = table["schema"].get("required", [])
required_columns = table["schema"].get("required", [])

table_columns = ",".join([ f'"{k}"' for k,_ in columns])
table_columns = ",".join([f'"{k}"' for k, _ in columns])
table_row_classes = _render_table_row_classes(table["name"], table["schema"])
table_row_subset_class = _render_table_subset_class(table["name"], columns, required_colunms)
table_where_class = _render_table_where_class(table["name"], columns, required_colunms)
table_row_subset_class = _render_table_subset_class(
table["name"], columns, required_columns
)
table_where_class = _render_table_where_class(
table["name"], columns, required_columns
)
if table.get("description", ""):
table_description = '\n """'
table_description += '\n '.join(table["description"].replace('"', "'").split("\n"))
table_description = '\n """'
table_description += "\n ".join(
table["description"].replace('"', "'").split("\n")
)
table_description += '\n """'
else:
table_description = ""
Expand Down Expand Up @@ -502,12 +575,7 @@ def _create_table(table: TableSpecDto) -> None:

init_path = os.path.join(full_path, "__init__.py")

imports = "\n".join([
"from typing_extensions import NotRequired, TypedDict",
"from typing import Union, List, Dict, Any, Literal, Optional, Required, overload",
"from polyapi.poly_tables import execute_query, first_result, transform_query",
"from polyapi.typedefs import Table, PolyCountResult, PolyDeleteResults, SortOrder, StringFilter, NullableStringFilter, NumberFilter, NullableNumberFilter, BooleanFilter, NullableBooleanFilter, NullableObjectFilter",
])
imports = TABI_MODULE_IMPORTS
table_contents = _render_table(table)

file_contents = ""
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "polyapi-python"
version = "0.3.13.dev2" # bump
version = "0.3.13.dev3"
description = "The Python Client for PolyAPI, the IPaaS by Developers for Developers"
authors = [{ name = "Dan Fellin", email = "[email protected]" }]
dependencies = [
Expand Down
Loading