-
Notifications
You must be signed in to change notification settings - Fork 1
5918 http asyncing #93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
bee902b
central http client module with async detect
harshi922 6d367c6
fix main point execute
harshi922 4eca160
none the timeout since sleepysfx
harshi922 ca59142
sync simple replace
harshi922 310184b
more replaces simple
harshi922 abc5105
last few replaces asynced reqs
harshi922 d88f3b8
testing async calls feature
harshi922 1dd93a6
one check error
harshi922 ad4c5cb
all fixed func way
harshi922 9859eaa
all fixed func way tests
harshi922 ef15b20
fixes
harshi922 156c87a
Add seperate closes and revert two check errors
harshi922 7d56f64
consistency changes
harshi922 2d6ab56
bump and dependency
harshi922 78175f8
bump and dependency
harshi922 c118a13
bump undo
harshi922 710fc09
fix tests
harshi922 d2d60b3
bump
harshi922 fc66bf7
Merge branch 'develop' into 5918-http-asyncing
harshi922 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,109 +1,224 @@ | ||
| from typing import Dict, Optional | ||
| import requests | ||
| import httpx | ||
| import os | ||
| import logging | ||
| from requests import Response | ||
| from polyapi.config import get_api_key_and_url, get_mtls_config | ||
| from polyapi.exceptions import PolyApiException | ||
| from polyapi import http_client | ||
|
|
||
| logger = logging.getLogger("poly") | ||
|
|
||
| def direct_execute(function_type, function_id, data) -> Response: | ||
| """ execute a specific function id/type | ||
| """ | ||
| api_key, api_url = get_api_key_and_url() | ||
| headers = {"Authorization": f"Bearer {api_key}"} | ||
| url = f"{api_url}/functions/{function_type}/{function_id}/direct-execute" | ||
|
|
||
| endpoint_info = requests.post(url, json=data, headers=headers) | ||
| if endpoint_info.status_code < 200 or endpoint_info.status_code >= 300: | ||
| error_content = endpoint_info.content.decode("utf-8", errors="ignore") | ||
| def _check_response_error(resp, function_type, function_id, data): | ||
| if resp.status_code < 200 or resp.status_code >= 300: | ||
| error_content = resp.content.decode("utf-8", errors="ignore") | ||
| if function_type == 'api' and os.getenv("LOGS_ENABLED"): | ||
| raise PolyApiException(f"Error executing api function with id: {function_id}. Status code: {endpoint_info.status_code}. Request data: {data}, Response: {error_content}") | ||
| logger.error(f"Error executing api function with id: {function_id}. Status code: {resp.status_code}. Request data: {data}, Response: {error_content}") | ||
| elif function_type != 'api': | ||
| raise PolyApiException(f"{endpoint_info.status_code}: {error_content}") | ||
|
|
||
| endpoint_info_data = endpoint_info.json() | ||
| raise PolyApiException(f"{resp.status_code}: {error_content}") | ||
|
|
||
|
|
||
| def _check_endpoint_error(resp, function_type, function_id, data): | ||
| if resp.status_code < 200 or resp.status_code >= 300: | ||
| error_content = resp.content.decode("utf-8", errors="ignore") | ||
| if function_type == 'api' and os.getenv("LOGS_ENABLED"): | ||
| raise PolyApiException(f"Error executing api function with id: {function_id}. Status code: {resp.status_code}. Request data: {data}, Response: {error_content}") | ||
| elif function_type != 'api': | ||
| raise PolyApiException(f"{resp.status_code}: {error_content}") | ||
|
|
||
|
|
||
| def _build_direct_execute_params(endpoint_info_data): | ||
| request_params = endpoint_info_data.copy() | ||
| request_params.pop("url", None) | ||
|
|
||
| if "maxRedirects" in request_params: | ||
| request_params["allow_redirects"] = request_params.pop("maxRedirects") > 0 | ||
|
|
||
| request_params["follow_redirects"] = request_params.pop("maxRedirects") > 0 | ||
| return request_params | ||
|
|
||
|
|
||
| def _sync_direct_execute(function_type, function_id, data) -> httpx.Response: | ||
| api_key, api_url = get_api_key_and_url() | ||
| headers = {"Authorization": f"Bearer {api_key}"} | ||
| url = f"{api_url}/functions/{function_type}/{function_id}/direct-execute" | ||
|
|
||
| endpoint_info = http_client.post(url, json=data, headers=headers) | ||
| _check_endpoint_error(endpoint_info, function_type, function_id, data) | ||
|
|
||
| endpoint_info_data = endpoint_info.json() | ||
| request_params = _build_direct_execute_params(endpoint_info_data) | ||
|
|
||
| has_mtls, cert_path, key_path, ca_path = get_mtls_config() | ||
|
|
||
|
|
||
| # Direct-execute hits URL that may need custom TLS | ||
| # settings (mTLS certs or disabled verification). httpx Client.request() | ||
| # doesn't accept per-request transport kwargs, so use one-off calls. | ||
| if has_mtls: | ||
| resp = requests.request( | ||
| resp = httpx.request( | ||
| url=endpoint_info_data["url"], | ||
| cert=(cert_path, key_path), | ||
| verify=ca_path, | ||
| timeout=None, | ||
| **request_params | ||
| ) | ||
| else: | ||
| resp = requests.request( | ||
| resp = httpx.request( | ||
| url=endpoint_info_data["url"], | ||
| verify=False, | ||
| timeout=None, | ||
| **request_params | ||
| ) | ||
|
|
||
| if (resp.status_code < 200 or resp.status_code >= 300): | ||
| error_content = resp.content.decode("utf-8", errors="ignore") | ||
| if function_type == 'api' and os.getenv("LOGS_ENABLED"): | ||
| logger.error(f"Error executing api function with id: {function_id}. Status code: {resp.status_code}. Request data: {data}, Response: {error_content}") | ||
| elif function_type != 'api': | ||
| raise PolyApiException(f"{resp.status_code}: {error_content}") | ||
|
|
||
| _check_response_error(resp, function_type, function_id, data) | ||
| return resp | ||
|
|
||
|
|
||
| async def _async_direct_execute(function_type, function_id, data) -> httpx.Response: | ||
| api_key, api_url = get_api_key_and_url() | ||
| headers = {"Authorization": f"Bearer {api_key}"} | ||
| url = f"{api_url}/functions/{function_type}/{function_id}/direct-execute" | ||
|
|
||
| endpoint_info = await http_client.async_post(url, json=data, headers=headers) | ||
| _check_endpoint_error(endpoint_info, function_type, function_id, data) | ||
|
|
||
| endpoint_info_data = endpoint_info.json() | ||
| request_params = _build_direct_execute_params(endpoint_info_data) | ||
|
|
||
| has_mtls, cert_path, key_path, ca_path = get_mtls_config() | ||
|
|
||
| # One-off async client for custom TLS settings on external URLs. | ||
| if has_mtls: | ||
| async with httpx.AsyncClient( | ||
| cert=(cert_path, key_path), verify=ca_path, timeout=None | ||
| ) as client: | ||
| resp = await client.request( | ||
| url=endpoint_info_data["url"], **request_params | ||
| ) | ||
| else: | ||
| async with httpx.AsyncClient(verify=False, timeout=None) as client: | ||
| resp = await client.request( | ||
| url=endpoint_info_data["url"], **request_params | ||
| ) | ||
|
|
||
| _check_response_error(resp, function_type, function_id, data) | ||
| return resp | ||
|
|
||
| def execute(function_type, function_id, data) -> Response: | ||
| """ execute a specific function id/type | ||
|
|
||
| def direct_execute(function_type, function_id, data) -> httpx.Response: | ||
| """ execute a specific function id/type (sync) | ||
| """ | ||
| return _sync_direct_execute(function_type, function_id, data) | ||
|
|
||
|
|
||
| async def direct_execute_async(function_type, function_id, data) -> httpx.Response: | ||
| """ execute a specific function id/type (async) | ||
| """ | ||
| return await _async_direct_execute(function_type, function_id, data) | ||
|
|
||
|
|
||
| def _sync_execute(function_type, function_id, data) -> httpx.Response: | ||
| api_key, api_url = get_api_key_and_url() | ||
| headers = {"Authorization": f"Bearer {api_key}"} | ||
| url = f"{api_url}/functions/{function_type}/{function_id}/execute" | ||
|
|
||
| resp = http_client.post(url, json=data, headers=headers) | ||
| _check_response_error(resp, function_type, function_id, data) | ||
| return resp | ||
|
|
||
|
|
||
| async def _async_execute(function_type, function_id, data) -> httpx.Response: | ||
| api_key, api_url = get_api_key_and_url() | ||
| headers = {"Authorization": f"Bearer {api_key}"} | ||
| url = f"{api_url}/functions/{function_type}/{function_id}/execute" | ||
|
|
||
| # Make the request | ||
| resp = requests.post( | ||
| url, | ||
| json=data, | ||
| headers=headers, | ||
| ) | ||
|
|
||
| if (resp.status_code < 200 or resp.status_code >= 300) and os.getenv("LOGS_ENABLED"): | ||
| error_content = resp.content.decode("utf-8", errors="ignore") | ||
| if function_type == 'api' and os.getenv("LOGS_ENABLED"): | ||
| logger.error(f"Error executing api function with id: {function_id}. Status code: {resp.status_code}. Request data: {data}, Response: {error_content}") | ||
| elif function_type != 'api': | ||
| raise PolyApiException(f"{resp.status_code}: {error_content}") | ||
|
|
||
| resp = await http_client.async_post(url, json=data, headers=headers) | ||
| _check_response_error(resp, function_type, function_id, data) | ||
| return resp | ||
|
|
||
|
|
||
| def execute_post(path, data): | ||
| def execute(function_type, function_id, data) -> httpx.Response: | ||
| """ execute a specific function id/type (sync) | ||
| """ | ||
| return _sync_execute(function_type, function_id, data) | ||
|
|
||
|
|
||
| async def execute_async(function_type, function_id, data) -> httpx.Response: | ||
| """ execute a specific function id/type (async) | ||
| """ | ||
| return await _async_execute(function_type, function_id, data) | ||
|
|
||
|
|
||
| def _sync_execute_post(path, data): | ||
| api_key, api_url = get_api_key_and_url() | ||
| headers = {"Authorization": f"Bearer {api_key}"} | ||
| return http_client.post(api_url + path, json=data, headers=headers) | ||
|
|
||
|
|
||
| async def _async_execute_post(path, data): | ||
| api_key, api_url = get_api_key_and_url() | ||
| headers = {"Authorization": f"Bearer {api_key}"} | ||
| resp = requests.post(api_url + path, json=data, headers=headers) | ||
| return await http_client.async_post(api_url + path, json=data, headers=headers) | ||
|
|
||
|
|
||
| def execute_post(path, data): | ||
| return _sync_execute_post(path, data) | ||
|
|
||
|
|
||
| async def execute_post_async(path, data): | ||
| return await _async_execute_post(path, data) | ||
|
|
||
|
|
||
| def _sync_variable_get(variable_id: str) -> httpx.Response: | ||
| api_key, base_url = get_api_key_and_url() | ||
| headers = {"Authorization": f"Bearer {api_key}"} | ||
| url = f"{base_url}/variables/{variable_id}/value" | ||
| resp = http_client.get(url, headers=headers) | ||
| if resp.status_code != 200 and resp.status_code != 201: | ||
| error_content = resp.content.decode("utf-8", errors="ignore") | ||
| raise PolyApiException(f"{resp.status_code}: {error_content}") | ||
| return resp | ||
|
|
||
|
|
||
| def variable_get(variable_id: str) -> Response: | ||
| async def _async_variable_get(variable_id: str) -> httpx.Response: | ||
| api_key, base_url = get_api_key_and_url() | ||
| headers = {"Authorization": f"Bearer {api_key}"} | ||
| url = f"{base_url}/variables/{variable_id}/value" | ||
| resp = requests.get(url, headers=headers) | ||
| resp = await http_client.async_get(url, headers=headers) | ||
| if resp.status_code != 200 and resp.status_code != 201: | ||
| error_content = resp.content.decode("utf-8", errors="ignore") | ||
| raise PolyApiException(f"{resp.status_code}: {error_content}") | ||
| return resp | ||
|
|
||
|
|
||
| def variable_update(variable_id: str, value) -> Response: | ||
| def variable_get(variable_id: str) -> httpx.Response: | ||
| return _sync_variable_get(variable_id) | ||
|
|
||
|
|
||
| async def variable_get_async(variable_id: str) -> httpx.Response: | ||
| return await _async_variable_get(variable_id) | ||
|
|
||
|
|
||
| def _sync_variable_update(variable_id: str, value) -> httpx.Response: | ||
| api_key, base_url = get_api_key_and_url() | ||
| headers = {"Authorization": f"Bearer {api_key}"} | ||
| url = f"{base_url}/variables/{variable_id}" | ||
| resp = requests.patch(url, data={"value": value}, headers=headers) | ||
| resp = http_client.patch(url, data={"value": value}, headers=headers) | ||
| if resp.status_code != 200 and resp.status_code != 201: | ||
| error_content = resp.content.decode("utf-8", errors="ignore") | ||
| raise PolyApiException(f"{resp.status_code}: {error_content}") | ||
| return resp | ||
| return resp | ||
|
|
||
|
|
||
| async def _async_variable_update(variable_id: str, value) -> httpx.Response: | ||
| api_key, base_url = get_api_key_and_url() | ||
| headers = {"Authorization": f"Bearer {api_key}"} | ||
| url = f"{base_url}/variables/{variable_id}" | ||
| resp = await http_client.async_patch(url, data={"value": value}, headers=headers) | ||
| if resp.status_code != 200 and resp.status_code != 201: | ||
| error_content = resp.content.decode("utf-8", errors="ignore") | ||
| raise PolyApiException(f"{resp.status_code}: {error_content}") | ||
| return resp | ||
|
|
||
|
|
||
| def variable_update(variable_id: str, value) -> httpx.Response: | ||
| return _sync_variable_update(variable_id, value) | ||
|
|
||
|
|
||
| async def variable_update_async(variable_id: str, value) -> httpx.Response: | ||
| return await _async_variable_update(variable_id, value) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.