Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 106 additions & 44 deletions polyapi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import copy
import os
import sys
import copy
from contextvars import ContextVar, Token
from dataclasses import dataclass
from typing import Any, Dict, Literal, Optional, overload

import truststore
from typing import Any, Dict, Optional, overload, Literal
from typing_extensions import TypedDict

from .cli_constants import CLI_COMMANDS

truststore.inject_into_ssl()
from .cli import CLI_COMMANDS

__all__ = ["poly"]

Expand All @@ -19,83 +24,140 @@

class PolyCustomDict(TypedDict, total=False):
"""Type definition for polyCustom dictionary."""
executionId: Optional[str] # Read-only

executionId: Optional[str] # Read-only unless explicitly unlocked
executionApiKey: Optional[str]
responseStatusCode: int
userSessionId: Optional[str]
responseStatusCode: Optional[int]
responseContentType: Optional[str]
responseHeaders: Dict[str, str]


class _PolyCustom:
def __init__(self):
self._internal_store = {
"executionId": None,
"executionApiKey": None,
"responseStatusCode": 200,
"responseContentType": None,
"responseHeaders": {},
}
self._execution_id_locked = False
responseHeaders: Dict[str, Any]


@dataclass
class _PolyCustomState:
internal_store: Dict[str, Any]
execution_id_locked: bool = False


class PolyCustom:
def __init__(self) -> None:
object.__setattr__(
self,
"_default_store",
{
"executionId": None,
"executionApiKey": None,
"userSessionId": None,
"responseStatusCode": 200,
"responseContentType": None,
"responseHeaders": {},
},
)
object.__setattr__(self, "_state_var", ContextVar("_poly_custom_state", default=None))

def _make_state(self) -> _PolyCustomState:
return _PolyCustomState(internal_store=copy.deepcopy(self._default_store))

def _get_state(self) -> _PolyCustomState:
state = self._state_var.get()
if state is None:
state = self._make_state()
self._state_var.set(state)
return state

def push_scope(self, initial_values: Optional[Dict[str, Any]] = None) -> Token:
state = self._make_state()
if initial_values:
state.internal_store.update(copy.deepcopy(initial_values))
if state.internal_store.get("executionId") is not None:
state.execution_id_locked = True
return self._state_var.set(state)

def pop_scope(self, token: Token) -> None:
self._state_var.reset(token)

def set_once(self, key: str, value: Any) -> None:
if key == "executionId" and self._execution_id_locked:
# Silently ignore attempts to overwrite locked executionId
state = self._get_state()
if key == "executionId" and state.execution_id_locked:
return
self._internal_store[key] = value
state.internal_store[key] = value
if key == "executionId":
# Lock executionId after setting it
self.lock_execution_id()
state.execution_id_locked = True

def get(self, key: str, default: Any = None) -> Any:
return self._internal_store.get(key, default)
return self._get_state().internal_store.get(key, default)

def lock_execution_id(self) -> None:
self._execution_id_locked = True
self._get_state().execution_id_locked = True

def unlock_execution_id(self) -> None:
self._execution_id_locked = False
self._get_state().execution_id_locked = False

@overload
def __getitem__(self, key: Literal["executionId"]) -> Optional[str]: ...

@overload
def __getitem__(self, key: Literal["executionApiKey"]) -> Optional[str]: ...

@overload
def __getitem__(self, key: Literal["responseStatusCode"]) -> int: ...

def __getitem__(self, key: Literal["userSessionId"]) -> Optional[str]: ...

@overload
def __getitem__(self, key: Literal["responseStatusCode"]) -> Optional[int]: ...

@overload
def __getitem__(self, key: Literal["responseContentType"]) -> Optional[str]: ...

@overload
def __getitem__(self, key: Literal["responseHeaders"]) -> Dict[str, str]: ...
def __getitem__(self, key: Literal["responseHeaders"]) -> Dict[str, Any]: ...

def __getitem__(self, key: str) -> Any:
return self.get(key)

@overload
def __setitem__(self, key: Literal["executionApiKey"], value: Optional[str]) -> None: ...


@overload
def __setitem__(self, key: Literal["userSessionId"], value: Optional[str]) -> None: ...

@overload
def __setitem__(self, key: Literal["responseStatusCode"], value: int) -> None: ...
def __setitem__(self, key: Literal["responseStatusCode"], value: Optional[int]) -> None: ...

@overload
def __setitem__(self, key: Literal["responseContentType"], value: Optional[str]) -> None: ...

@overload
def __setitem__(self, key: Literal["responseHeaders"], value: Dict[str, str]) -> None: ...
def __setitem__(self, key: Literal["responseHeaders"], value: Dict[str, Any]) -> None: ...

def __setitem__(self, key: str, value: Any) -> None:
self.set_once(key, value)

def __repr__(self) -> str:
return f"PolyCustom({self._internal_store})"
def __getattr__(self, key: str) -> Any:
if key in self._default_store:
return self.get(key)
raise AttributeError(f"{type(self).__name__!r} object has no attribute {key!r}")

def __setattr__(self, key: str, value: Any) -> None:
if key.startswith("_"):
object.__setattr__(self, key, value)
return
self.set_once(key, value)

def copy(self) -> '_PolyCustom':
new = _PolyCustom()
new._internal_store = copy.deepcopy(self._internal_store)
new._execution_id_locked = self._execution_id_locked
def __repr__(self) -> str:
return f"PolyCustom({self._get_state().internal_store})"

def copy(self) -> "PolyCustom":
new = PolyCustom()
state = self._get_state()
new._state_var.set(
_PolyCustomState(
internal_store=copy.deepcopy(state.internal_store),
execution_id_locked=state.execution_id_locked,
)
)
return new


polyCustom: PolyCustomDict = _PolyCustom()
_PolyCustom = PolyCustom

polyCustom: PolyCustom = PolyCustom()
4 changes: 1 addition & 3 deletions polyapi/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from polyapi.utils import print_green, print_red

from .cli_constants import CLI_COMMANDS
from .config import initialize_config, set_api_key_and_url
from .generate import generate, clear
from .function_cli import function_add_or_update, function_execute
Expand All @@ -11,9 +12,6 @@
from .sync import sync_deployables


CLI_COMMANDS = ["setup", "generate", "function", "clear", "help", "update_rendered_spec"]


def _get_version_string():
"""Get the version string for the package."""
try:
Expand Down
10 changes: 10 additions & 0 deletions polyapi/cli_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CLI_COMMANDS = (
"setup",
"generate",
"function",
"clear",
"help",
"update_rendered_spec",
"prepare",
"sync",
)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "polyapi-python"
version = "0.3.14.dev2"
version = "0.3.14.dev3"
description = "The Python Client for PolyAPI, the IPaaS by Developers for Developers"
authors = [{ name = "Dan Fellin", email = "[email protected]" }]
dependencies = [
Expand Down
130 changes: 130 additions & 0 deletions tests/test_poly_custom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import asyncio
import importlib
import sys
from concurrent.futures import ThreadPoolExecutor


def _reload_polyapi():
sys.modules.pop("polyapi", None)
sys.modules.pop("polyapi.cli", None)
return importlib.import_module("polyapi")


def test_import_polyapi_does_not_import_cli():
polyapi = _reload_polyapi()

assert polyapi is not None
assert "polyapi.cli" not in sys.modules


def test_cli_constants_shared_between_runtime_and_cli():
cli_constants = importlib.import_module("polyapi.cli_constants")
cli_module = importlib.import_module("polyapi.cli")

assert tuple(cli_module.CLI_COMMANDS) == cli_constants.CLI_COMMANDS


def test_poly_custom_nested_scopes_restore_previous_state():
polyapi = _reload_polyapi()
poly_custom = polyapi.polyCustom

outer_token = poly_custom.push_scope(
{
"executionId": "outer",
"responseHeaders": {"x-scope": "outer"},
"responseStatusCode": None,
}
)
try:
assert poly_custom["executionId"] == "outer"

inner_token = poly_custom.push_scope(
{
"executionId": "inner",
"responseHeaders": {"x-scope": "inner"},
"responseStatusCode": 202,
}
)
try:
assert poly_custom["executionId"] == "inner"
assert poly_custom["responseHeaders"] == {"x-scope": "inner"}
assert poly_custom.responseStatusCode == 202

poly_custom["executionId"] = "should-not-overwrite"
assert poly_custom["executionId"] == "inner"

poly_custom.unlock_execution_id()
poly_custom["executionId"] = "inner-updated"
assert poly_custom["executionId"] == "inner-updated"
finally:
poly_custom.pop_scope(inner_token)

assert poly_custom["executionId"] == "outer"
assert poly_custom["responseHeaders"] == {"x-scope": "outer"}
assert poly_custom["responseStatusCode"] is None
finally:
poly_custom.pop_scope(outer_token)

assert poly_custom["executionId"] is None
assert poly_custom["responseHeaders"] == {}
assert poly_custom["responseStatusCode"] == 200


def test_poly_custom_isolated_across_async_tasks():
polyapi = _reload_polyapi()
poly_custom = polyapi.polyCustom

async def worker(execution_id: str) -> tuple[str, str]:
token = poly_custom.push_scope(
{
"executionId": execution_id,
"responseHeaders": {"worker": execution_id},
"responseStatusCode": None,
}
)
try:
await asyncio.sleep(0)
poly_custom["responseHeaders"]["seen"] = execution_id
await asyncio.sleep(0)
return poly_custom["executionId"], poly_custom["responseHeaders"]["seen"]
finally:
poly_custom.pop_scope(token)

async def run_workers() -> tuple[tuple[str, str], tuple[str, str]]:
first_result, second_result = await asyncio.gather(worker("async-a"), worker("async-b"))
return first_result, second_result

first, second = asyncio.run(run_workers())

assert first == ("async-a", "async-a")
assert second == ("async-b", "async-b")
assert poly_custom["executionId"] is None
assert poly_custom["responseHeaders"] == {}


def test_poly_custom_isolated_across_threads():
polyapi = _reload_polyapi()
poly_custom = polyapi.polyCustom

def worker(execution_id: str) -> tuple[str, str]:
token = poly_custom.push_scope(
{
"executionId": execution_id,
"responseHeaders": {"worker": execution_id},
"responseStatusCode": None,
}
)
try:
poly_custom["responseHeaders"]["seen"] = execution_id
return poly_custom["executionId"], poly_custom["responseHeaders"]["seen"]
finally:
poly_custom.pop_scope(token)

with ThreadPoolExecutor(max_workers=2) as executor:
first = executor.submit(worker, "thread-a").result()
second = executor.submit(worker, "thread-b").result()

assert first == ("thread-a", "thread-a")
assert second == ("thread-b", "thread-b")
assert poly_custom["executionId"] is None
assert poly_custom["responseHeaders"] == {}
Loading