44 BaseHeadersSerializer ,
55 HttpApiHeadersSerializer ,
66)
7- from aws_lambda_powertools .utilities .data_classes .common import BaseProxyEvent
7+ from aws_lambda_powertools .utilities .data_classes .common import BaseProxyEvent , DictWrapper
88from aws_lambda_powertools .utilities .data_classes .shared_functions import (
99 base64_decode ,
1010 get_header_value ,
1111 get_query_string_value ,
1212)
1313
1414
15- class VPCLatticeEvent (BaseProxyEvent ):
15+ class VPCLatticeEventBase (BaseProxyEvent ):
1616 @property
1717 def body (self ) -> str :
1818 """The VPC Lattice body."""
@@ -30,11 +30,6 @@ def headers(self) -> Dict[str, str]:
3030 """The VPC Lattice event headers."""
3131 return self ["headers" ]
3232
33- @property
34- def is_base64_encoded (self ) -> bool :
35- """A boolean flag to indicate if the applicable request payload is Base64-encode"""
36- return self ["is_base64_encoded" ]
37-
3833 @property
3934 def decoded_body (self ) -> str :
4035 """Dynamically base64 decode body as a str"""
@@ -48,24 +43,6 @@ def method(self) -> str:
4843 """The VPC Lattice method used. Valid values include: DELETE, GET, HEAD, OPTIONS, PATCH, POST, and PUT."""
4944 return self ["method" ]
5045
51- @property
52- def query_string_parameters (self ) -> Dict [str , str ]:
53- """The request query string parameters."""
54- return self ["query_string_parameters" ]
55-
56- @property
57- def raw_path (self ) -> str :
58- """The raw VPC Lattice request path."""
59- return self ["raw_path" ]
60-
61- # VPCLattice event has no path field
62- # Added here for consistency with the BaseProxyEvent class
63- @property
64- def path (self ) -> str :
65- return self ["raw_path" ]
66-
67- # VPCLattice event has no http_method field
68- # Added here for consistency with the BaseProxyEvent class
6946 @property
7047 def http_method (self ) -> str :
7148 """The HTTP method used. Valid values include: DELETE, GET, HEAD, OPTIONS, PATCH, POST, and PUT."""
@@ -140,3 +117,137 @@ def get_header_value(
140117 def header_serializer (self ) -> BaseHeadersSerializer :
141118 # When using the VPC Lattice integration, we have multiple HTTP Headers.
142119 return HttpApiHeadersSerializer ()
120+
121+
122+ class VPCLatticeEvent (VPCLatticeEventBase ):
123+ @property
124+ def raw_path (self ) -> str :
125+ """The raw VPC Lattice request path."""
126+ return self ["raw_path" ]
127+
128+ @property
129+ def is_base64_encoded (self ) -> bool :
130+ """A boolean flag to indicate if the applicable request payload is Base64-encode"""
131+ return self ["is_base64_encoded" ]
132+
133+ # VPCLattice event has no path field
134+ # Added here for consistency with the BaseProxyEvent class
135+ @property
136+ def path (self ) -> str :
137+ return self ["raw_path" ]
138+
139+ @property
140+ def query_string_parameters (self ) -> Dict [str , str ]:
141+ """The request query string parameters."""
142+ return self ["query_string_parameters" ]
143+
144+
145+ class vpcLatticeEventV2Identity (DictWrapper ):
146+ @property
147+ def source_vpc_arn (self ) -> Optional [str ]:
148+ """The VPC Lattice v2 Event requestContext Identity sourceVpcArn"""
149+ return self .get ("sourceVpcArn" )
150+
151+ @property
152+ def get_type (self ) -> Optional [str ]:
153+ """The VPC Lattice v2 Event requestContext Identity type"""
154+ return self .get ("type" )
155+
156+ @property
157+ def principal (self ) -> Optional [str ]:
158+ """The VPC Lattice v2 Event requestContext principal"""
159+ return self .get ("principal" )
160+
161+ @property
162+ def principal_org_id (self ) -> Optional [str ]:
163+ """The VPC Lattice v2 Event requestContext principalOrgID"""
164+ return self .get ("principalOrgID" )
165+
166+ @property
167+ def session_name (self ) -> Optional [str ]:
168+ """The VPC Lattice v2 Event requestContext sessionName"""
169+ return self .get ("sessionName" )
170+
171+ @property
172+ def x509_subject_cn (self ) -> Optional [str ]:
173+ """The VPC Lattice v2 Event requestContext X509SubjectCn"""
174+ return self .get ("X509SubjectCn" )
175+
176+ @property
177+ def x509_issuer_ou (self ) -> Optional [str ]:
178+ """The VPC Lattice v2 Event requestContext X509IssuerOu"""
179+ return self .get ("X509IssuerOu" )
180+
181+ @property
182+ def x509_san_dns (self ) -> Optional [str ]:
183+ """The VPC Lattice v2 Event requestContext X509SanDns"""
184+ return self .get ("x509SanDns" )
185+
186+ @property
187+ def x509_san_uri (self ) -> Optional [str ]:
188+ """The VPC Lattice v2 Event requestContext X509SanUri"""
189+ return self .get ("X509SanUri" )
190+
191+ @property
192+ def x509_san_name_cn (self ) -> Optional [str ]:
193+ """The VPC Lattice v2 Event requestContext X509SanNameCn"""
194+ return self .get ("X509SanNameCn" )
195+
196+
197+ class vpcLatticeEventV2RequestContext (DictWrapper ):
198+ @property
199+ def service_network_arn (self ) -> str :
200+ """The VPC Lattice v2 Event requestContext serviceNetworkArn"""
201+ return self ["serviceNetworkArn" ]
202+
203+ @property
204+ def service_arn (self ) -> str :
205+ """The VPC Lattice v2 Event requestContext serviceArn"""
206+ return self ["serviceArn" ]
207+
208+ @property
209+ def target_group_arn (self ) -> str :
210+ """The VPC Lattice v2 Event requestContext targetGroupArn"""
211+ return self ["targetGroupArn" ]
212+
213+ @property
214+ def identity (self ) -> vpcLatticeEventV2Identity :
215+ """The VPC Lattice v2 Event requestContext identity"""
216+ return vpcLatticeEventV2Identity (self ["identity" ])
217+
218+ @property
219+ def region (self ) -> str :
220+ """The VPC Lattice v2 Event requestContext serviceNetworkArn"""
221+ return self ["region" ]
222+
223+ @property
224+ def time_epoch (self ) -> float :
225+ """The VPC Lattice v2 Event requestContext timeEpoch"""
226+ return self ["timeEpoch" ]
227+
228+
229+ class VPCLatticeEventV2 (VPCLatticeEventBase ):
230+ @property
231+ def version (self ) -> str :
232+ """The VPC Lattice v2 Event version"""
233+ return self ["version" ]
234+
235+ @property
236+ def is_base64_encoded (self ) -> Optional [bool ]:
237+ """A boolean flag to indicate if the applicable request payload is Base64-encode"""
238+ return self .get ("isBase64Encoded" )
239+
240+ @property
241+ def path (self ) -> str :
242+ """The VPC Lattice v2 Event path"""
243+ return self ["path" ]
244+
245+ @property
246+ def request_context (self ) -> vpcLatticeEventV2RequestContext :
247+ """he VPC Lattice v2 Event request context."""
248+ return vpcLatticeEventV2RequestContext (self ["requestContext" ])
249+
250+ @property
251+ def query_string_parameters (self ) -> Optional [Dict [str , str ]]:
252+ """The request query string parameters."""
253+ return self .get ("queryStringParameters" )
0 commit comments