This repository was archived by the owner on Aug 8, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathasynch.py
More file actions
83 lines (69 loc) · 2.59 KB
/
asynch.py
File metadata and controls
83 lines (69 loc) · 2.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from typing import List, Union
import oxapi
from oxapi.abstract.api import ModelAPI
class AsyncCallPipe:
"""Class for performing multiple calls to OxAPI in parallel."""
def __init__(self, call_list: List[ModelAPI] = None):
"""Constructor.
Args:
call_list: the list of API calls. It is allowed to create an AsyncCallPipe without its call_list
defined at instantiation time (calls can be added later with add method).
"""
if call_list is None:
call_list = []
self.__call_list = call_list
def run(self):
"""Runs the set of API calls.
Returns:
List : the List of API calls with their result (or errors).
"""
import grequests
if len(self.__call_list) == 0:
oxapi.logger.warning("Call list is empty, nothing to run.")
return
reqs = []
for call in self.__call_list:
api_type: ModelAPI = call
reqs.append(
grequests.post(
api_type.get_url(),
json=api_type._body,
headers={
"Content-Type": "application/json",
"Authorization": oxapi.api_key,
},
)
)
results = grequests.map(
requests=reqs, exception_handler=AsyncCallPipe.__exception_handler
)
results_processed = []
for i in range(0, len(results)):
temp = self.__call_list[i]
temp.parse_error_message(results[i], raise_exceptions=False)
if results[i].status_code == 200:
temp.result = results[i].json()
results_processed.append(temp)
return results_processed
def add(self, api_call: Union[ModelAPI, List[ModelAPI]]):
"""Adds a single or a list of API calls to the call list.
Args:
api_call: single or list of API calls to be added.
"""
if isinstance(api_call, List):
self.__call_list = self.__call_list + api_call
elif isinstance(api_call, ModelAPI):
self.__call_list.append(api_call)
def flush(self):
"""Clears the list of API calls."""
self.__call_list = []
@staticmethod
def __exception_handler(request, exception):
"""Handles the exceptions in calling the APIs.
Args:
request: original request
exception: generated exception
"""
oxapi.logger.warning(
"Request failed: {0}, ERROR: {1}".format(request.url, exception)
)