1+ import functools
2+ import time
3+ from concurrent .futures import Future , ThreadPoolExecutor
14from datetime import datetime
2- from typing import Optional , Tuple
5+ from typing import List , Optional , Tuple
36
47import boto3
58import requests
69from mypy_boto3_lambda import LambdaClient
710from mypy_boto3_lambda .type_defs import InvocationResponseTypeDef
11+ from pydantic import BaseModel
812from requests import Request , Response
913from requests .exceptions import RequestException
1014from retry import retry
1115
16+ GetLambdaResponse = Tuple [InvocationResponseTypeDef , datetime ]
17+
18+
19+ class GetLambdaResponseOptions (BaseModel ):
20+ lambda_arn : str
21+ payload : Optional [str ] = None
22+ client : Optional [LambdaClient ] = None
23+ raise_on_error : bool = True
24+
25+ # Maintenance: Pydantic v2 deprecated it; we should update in v3
26+ class Config :
27+ arbitrary_types_allowed = True
28+
1229
1330def get_lambda_response (
1431 lambda_arn : str ,
1532 payload : Optional [str ] = None ,
1633 client : Optional [LambdaClient ] = None ,
17- ) -> Tuple [InvocationResponseTypeDef , datetime ]:
34+ raise_on_error : bool = True ,
35+ ) -> GetLambdaResponse :
36+ """Invoke function synchronously
37+
38+ Parameters
39+ ----------
40+ lambda_arn : str
41+ Lambda function ARN to invoke
42+ payload : Optional[str], optional
43+ JSON payload for Lambda invocation, by default None
44+ client : Optional[LambdaClient], optional
45+ Boto3 Lambda SDK client, by default None
46+ raise_on_error : bool, optional
47+ Whether to raise exception upon invocation error, by default True
48+
49+ Returns
50+ -------
51+ Tuple[InvocationResponseTypeDef, datetime]
52+ Function response and approximate execution time
53+
54+ Raises
55+ ------
56+ RuntimeError
57+ Function invocation error details
58+ """
1859 client = client or boto3 .client ("lambda" )
1960 payload = payload or ""
2061 execution_time = datetime .utcnow ()
21- return client .invoke (FunctionName = lambda_arn , InvocationType = "RequestResponse" , Payload = payload ), execution_time
62+ response : InvocationResponseTypeDef = client .invoke (
63+ FunctionName = lambda_arn ,
64+ InvocationType = "RequestResponse" ,
65+ Payload = payload ,
66+ )
67+
68+ has_error = response .get ("FunctionError" , "" ) == "Unhandled"
69+ if has_error and raise_on_error :
70+ error_payload = response ["Payload" ].read ().decode ()
71+ raise RuntimeError (f"Function failed invocation: { error_payload } " )
72+
73+ return response , execution_time
2274
2375
2476@retry (RequestException , delay = 2 , jitter = 1.5 , tries = 5 )
@@ -27,3 +79,39 @@ def get_http_response(request: Request) -> Response:
2779 result = session .send (request .prepare ())
2880 result .raise_for_status ()
2981 return result
82+
83+
84+ def get_lambda_response_in_parallel (
85+ get_lambda_response_options : List [GetLambdaResponseOptions ],
86+ ) -> List [GetLambdaResponse ]:
87+ """Invoke functions in parallel
88+
89+ Parameters
90+ ----------
91+ get_lambda_response_options : List[GetLambdaResponseOptions]
92+ List of options to call get_lambda_response with
93+
94+ Returns
95+ -------
96+ List[GetLambdaResponse]
97+ Function responses and approximate execution time
98+ """
99+ result_list = []
100+ with ThreadPoolExecutor () as executor :
101+ running_tasks : List [Future ] = []
102+ for options in get_lambda_response_options :
103+ # Sleep 0.5, 1, 1.5, ... seconds between each invocation. This way
104+ # we can guarantee that lambdas are executed in parallel, but they are
105+ # called in the same "order" as they are passed in, thus guaranteeing that
106+ # we can assert on the correct output.
107+ time .sleep (0.5 * len (running_tasks ))
108+
109+ get_lambda_response_callback = functools .partial (get_lambda_response , ** options .dict ())
110+ running_tasks .append (
111+ executor .submit (get_lambda_response_callback ),
112+ )
113+
114+ executor .shutdown (wait = True )
115+ result_list .extend (running_task .result () for running_task in running_tasks )
116+
117+ return result_list
0 commit comments