diff --git a/polyapi/__init__.py b/polyapi/__init__.py index 583d1f3..f39a2f6 100644 --- a/polyapi/__init__.py +++ b/polyapi/__init__.py @@ -1,11 +1,16 @@ +import copy import os import sys -import copy +from contextvars import ContextVar, Token +from dataclasses import dataclass +from typing import Any, Dict, Literal, Optional, overload + import truststore -from typing import Any, Dict, Optional, overload, Literal from typing_extensions import TypedDict + +from .cli_constants import CLI_COMMANDS + truststore.inject_into_ssl() -from .cli import CLI_COMMANDS __all__ = ["poly"] @@ -19,83 +24,140 @@ class PolyCustomDict(TypedDict, total=False): """Type definition for polyCustom dictionary.""" - executionId: Optional[str] # Read-only + + executionId: Optional[str] # Read-only unless explicitly unlocked executionApiKey: Optional[str] - responseStatusCode: int + userSessionId: Optional[str] + responseStatusCode: Optional[int] responseContentType: Optional[str] - responseHeaders: Dict[str, str] - - -class _PolyCustom: - def __init__(self): - self._internal_store = { - "executionId": None, - "executionApiKey": None, - "responseStatusCode": 200, - "responseContentType": None, - "responseHeaders": {}, - } - self._execution_id_locked = False + responseHeaders: Dict[str, Any] + + +@dataclass +class _PolyCustomState: + internal_store: Dict[str, Any] + execution_id_locked: bool = False + + +class PolyCustom: + def __init__(self) -> None: + object.__setattr__( + self, + "_default_store", + { + "executionId": None, + "executionApiKey": None, + "userSessionId": None, + "responseStatusCode": 200, + "responseContentType": None, + "responseHeaders": {}, + }, + ) + object.__setattr__(self, "_state_var", ContextVar("_poly_custom_state", default=None)) + + def _make_state(self) -> _PolyCustomState: + return _PolyCustomState(internal_store=copy.deepcopy(self._default_store)) + + def _get_state(self) -> _PolyCustomState: + state = self._state_var.get() + if state is None: + state = self._make_state() + self._state_var.set(state) + return state + + def push_scope(self, initial_values: Optional[Dict[str, Any]] = None) -> Token: + state = self._make_state() + if initial_values: + state.internal_store.update(copy.deepcopy(initial_values)) + if state.internal_store.get("executionId") is not None: + state.execution_id_locked = True + return self._state_var.set(state) + + def pop_scope(self, token: Token) -> None: + self._state_var.reset(token) def set_once(self, key: str, value: Any) -> None: - if key == "executionId" and self._execution_id_locked: - # Silently ignore attempts to overwrite locked executionId + state = self._get_state() + if key == "executionId" and state.execution_id_locked: return - self._internal_store[key] = value + state.internal_store[key] = value if key == "executionId": - # Lock executionId after setting it - self.lock_execution_id() + state.execution_id_locked = True def get(self, key: str, default: Any = None) -> Any: - return self._internal_store.get(key, default) + return self._get_state().internal_store.get(key, default) def lock_execution_id(self) -> None: - self._execution_id_locked = True + self._get_state().execution_id_locked = True def unlock_execution_id(self) -> None: - self._execution_id_locked = False + self._get_state().execution_id_locked = False @overload def __getitem__(self, key: Literal["executionId"]) -> Optional[str]: ... - + @overload def __getitem__(self, key: Literal["executionApiKey"]) -> Optional[str]: ... - + @overload - def __getitem__(self, key: Literal["responseStatusCode"]) -> int: ... - + def __getitem__(self, key: Literal["userSessionId"]) -> Optional[str]: ... + + @overload + def __getitem__(self, key: Literal["responseStatusCode"]) -> Optional[int]: ... + @overload def __getitem__(self, key: Literal["responseContentType"]) -> Optional[str]: ... @overload - def __getitem__(self, key: Literal["responseHeaders"]) -> Dict[str, str]: ... - + def __getitem__(self, key: Literal["responseHeaders"]) -> Dict[str, Any]: ... + def __getitem__(self, key: str) -> Any: return self.get(key) @overload def __setitem__(self, key: Literal["executionApiKey"], value: Optional[str]) -> None: ... - + + @overload + def __setitem__(self, key: Literal["userSessionId"], value: Optional[str]) -> None: ... + @overload - def __setitem__(self, key: Literal["responseStatusCode"], value: int) -> None: ... - + def __setitem__(self, key: Literal["responseStatusCode"], value: Optional[int]) -> None: ... + @overload def __setitem__(self, key: Literal["responseContentType"], value: Optional[str]) -> None: ... @overload - def __setitem__(self, key: Literal["responseHeaders"], value: Dict[str, str]) -> None: ... - + def __setitem__(self, key: Literal["responseHeaders"], value: Dict[str, Any]) -> None: ... + def __setitem__(self, key: str, value: Any) -> None: self.set_once(key, value) - def __repr__(self) -> str: - return f"PolyCustom({self._internal_store})" + def __getattr__(self, key: str) -> Any: + if key in self._default_store: + return self.get(key) + raise AttributeError(f"{type(self).__name__!r} object has no attribute {key!r}") + + def __setattr__(self, key: str, value: Any) -> None: + if key.startswith("_"): + object.__setattr__(self, key, value) + return + self.set_once(key, value) - def copy(self) -> '_PolyCustom': - new = _PolyCustom() - new._internal_store = copy.deepcopy(self._internal_store) - new._execution_id_locked = self._execution_id_locked + def __repr__(self) -> str: + return f"PolyCustom({self._get_state().internal_store})" + + def copy(self) -> "PolyCustom": + new = PolyCustom() + state = self._get_state() + new._state_var.set( + _PolyCustomState( + internal_store=copy.deepcopy(state.internal_store), + execution_id_locked=state.execution_id_locked, + ) + ) return new -polyCustom: PolyCustomDict = _PolyCustom() \ No newline at end of file +_PolyCustom = PolyCustom + +polyCustom: PolyCustom = PolyCustom() diff --git a/polyapi/cli.py b/polyapi/cli.py index 43d9b6e..2187276 100644 --- a/polyapi/cli.py +++ b/polyapi/cli.py @@ -3,6 +3,7 @@ from polyapi.utils import print_green, print_red +from .cli_constants import CLI_COMMANDS from .config import initialize_config, set_api_key_and_url from .generate import generate, clear from .function_cli import function_add_or_update, function_execute @@ -11,9 +12,6 @@ from .sync import sync_deployables -CLI_COMMANDS = ["setup", "generate", "function", "clear", "help", "update_rendered_spec"] - - def _get_version_string(): """Get the version string for the package.""" try: diff --git a/polyapi/cli_constants.py b/polyapi/cli_constants.py new file mode 100644 index 0000000..c94112a --- /dev/null +++ b/polyapi/cli_constants.py @@ -0,0 +1,10 @@ +CLI_COMMANDS = ( + "setup", + "generate", + "function", + "clear", + "help", + "update_rendered_spec", + "prepare", + "sync", +) diff --git a/pyproject.toml b/pyproject.toml index 9805734..fccc197 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "polyapi-python" -version = "0.3.14.dev2" +version = "0.3.14.dev3" description = "The Python Client for PolyAPI, the IPaaS by Developers for Developers" authors = [{ name = "Dan Fellin", email = "dan@polyapi.io" }] dependencies = [ diff --git a/tests/test_poly_custom.py b/tests/test_poly_custom.py new file mode 100644 index 0000000..c1f967b --- /dev/null +++ b/tests/test_poly_custom.py @@ -0,0 +1,130 @@ +import asyncio +import importlib +import sys +from concurrent.futures import ThreadPoolExecutor + + +def _reload_polyapi(): + sys.modules.pop("polyapi", None) + sys.modules.pop("polyapi.cli", None) + return importlib.import_module("polyapi") + + +def test_import_polyapi_does_not_import_cli(): + polyapi = _reload_polyapi() + + assert polyapi is not None + assert "polyapi.cli" not in sys.modules + + +def test_cli_constants_shared_between_runtime_and_cli(): + cli_constants = importlib.import_module("polyapi.cli_constants") + cli_module = importlib.import_module("polyapi.cli") + + assert tuple(cli_module.CLI_COMMANDS) == cli_constants.CLI_COMMANDS + + +def test_poly_custom_nested_scopes_restore_previous_state(): + polyapi = _reload_polyapi() + poly_custom = polyapi.polyCustom + + outer_token = poly_custom.push_scope( + { + "executionId": "outer", + "responseHeaders": {"x-scope": "outer"}, + "responseStatusCode": None, + } + ) + try: + assert poly_custom["executionId"] == "outer" + + inner_token = poly_custom.push_scope( + { + "executionId": "inner", + "responseHeaders": {"x-scope": "inner"}, + "responseStatusCode": 202, + } + ) + try: + assert poly_custom["executionId"] == "inner" + assert poly_custom["responseHeaders"] == {"x-scope": "inner"} + assert poly_custom.responseStatusCode == 202 + + poly_custom["executionId"] = "should-not-overwrite" + assert poly_custom["executionId"] == "inner" + + poly_custom.unlock_execution_id() + poly_custom["executionId"] = "inner-updated" + assert poly_custom["executionId"] == "inner-updated" + finally: + poly_custom.pop_scope(inner_token) + + assert poly_custom["executionId"] == "outer" + assert poly_custom["responseHeaders"] == {"x-scope": "outer"} + assert poly_custom["responseStatusCode"] is None + finally: + poly_custom.pop_scope(outer_token) + + assert poly_custom["executionId"] is None + assert poly_custom["responseHeaders"] == {} + assert poly_custom["responseStatusCode"] == 200 + + +def test_poly_custom_isolated_across_async_tasks(): + polyapi = _reload_polyapi() + poly_custom = polyapi.polyCustom + + async def worker(execution_id: str) -> tuple[str, str]: + token = poly_custom.push_scope( + { + "executionId": execution_id, + "responseHeaders": {"worker": execution_id}, + "responseStatusCode": None, + } + ) + try: + await asyncio.sleep(0) + poly_custom["responseHeaders"]["seen"] = execution_id + await asyncio.sleep(0) + return poly_custom["executionId"], poly_custom["responseHeaders"]["seen"] + finally: + poly_custom.pop_scope(token) + + async def run_workers() -> tuple[tuple[str, str], tuple[str, str]]: + first_result, second_result = await asyncio.gather(worker("async-a"), worker("async-b")) + return first_result, second_result + + first, second = asyncio.run(run_workers()) + + assert first == ("async-a", "async-a") + assert second == ("async-b", "async-b") + assert poly_custom["executionId"] is None + assert poly_custom["responseHeaders"] == {} + + +def test_poly_custom_isolated_across_threads(): + polyapi = _reload_polyapi() + poly_custom = polyapi.polyCustom + + def worker(execution_id: str) -> tuple[str, str]: + token = poly_custom.push_scope( + { + "executionId": execution_id, + "responseHeaders": {"worker": execution_id}, + "responseStatusCode": None, + } + ) + try: + poly_custom["responseHeaders"]["seen"] = execution_id + return poly_custom["executionId"], poly_custom["responseHeaders"]["seen"] + finally: + poly_custom.pop_scope(token) + + with ThreadPoolExecutor(max_workers=2) as executor: + first = executor.submit(worker, "thread-a").result() + second = executor.submit(worker, "thread-b").result() + + assert first == ("thread-a", "thread-a") + assert second == ("thread-b", "thread-b") + assert poly_custom["executionId"] is None + assert poly_custom["responseHeaders"] == {}