Skip to content

Commit e2c8b49

Browse files
committed
refactor(event-handler): reduce cognitive complexity and address SonarCloud issues
- Refactor get_field_info_annotated_type function by extracting helper functions to reduce cognitive complexity from 29 to below 15 - Fix copy_field_info to preserve FieldInfo subclass types using shallow copy instead of from_annotation - Rename variable Action to action_type to follow Python naming conventions - Resolve failing test_validate_embed_body_param by maintaining Body parameter type recognition - Add helper functions: _has_discriminator, _handle_discriminator_with_body, _create_field_info, _set_field_default - Maintain full backward compatibility and discriminator functionality
1 parent 0c3aad6 commit e2c8b49

3 files changed

Lines changed: 77 additions & 50 deletions

File tree

aws_lambda_powertools/event_handler/openapi/compat.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,9 @@ def model_rebuild(model: type[BaseModel]) -> None:
187187

188188
def copy_field_info(*, field_info: FieldInfo, annotation: Any) -> FieldInfo:
189189
# Create a shallow copy of the field_info to preserve its type and all attributes
190-
import copy
190+
from copy import copy
191191

192-
new_field = copy.copy(field_info)
192+
new_field = copy(field_info)
193193
# Update only the annotation to the new one
194194
new_field.annotation = annotation
195195
return new_field

aws_lambda_powertools/event_handler/openapi/params.py

Lines changed: 73 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,70 +1037,97 @@ def get_field_info_response_type(annotation, value) -> tuple[FieldInfo | None, A
10371037
return get_field_info_and_type_annotation(inner_type, value, False, True)
10381038

10391039

1040+
def _has_discriminator(field_info: FieldInfo) -> bool:
1041+
"""Check if a FieldInfo has a discriminator."""
1042+
return hasattr(field_info, "discriminator") and field_info.discriminator is not None
1043+
1044+
1045+
def _handle_discriminator_with_body(
1046+
annotations: list[FieldInfo], annotation: Any,
1047+
) -> tuple[FieldInfo | None, Any, bool]:
1048+
"""
1049+
Handle the special case of Field(discriminator) + Body() combination.
1050+
1051+
Returns:
1052+
tuple of (powertools_annotation, type_annotation, has_discriminator_with_body)
1053+
"""
1054+
field_obj = None
1055+
body_obj = None
1056+
1057+
for ann in annotations:
1058+
if isinstance(ann, Body):
1059+
body_obj = ann
1060+
elif _has_discriminator(ann):
1061+
field_obj = ann
1062+
1063+
if field_obj and body_obj:
1064+
# Use Body as the primary annotation, preserve full annotation for validation
1065+
return body_obj, annotation, True
1066+
1067+
raise AssertionError("Only one FieldInfo can be used per parameter")
1068+
1069+
1070+
def _create_field_info(
1071+
powertools_annotation: FieldInfo,
1072+
type_annotation: Any,
1073+
has_discriminator_with_body: bool,
1074+
) -> FieldInfo:
1075+
"""Create or copy FieldInfo based on the annotation type."""
1076+
if has_discriminator_with_body:
1077+
# For discriminator + Body case, create a new Body instance directly
1078+
field_info = Body()
1079+
field_info.annotation = type_annotation
1080+
else:
1081+
# Copy field_info because we mutate field_info.default later
1082+
field_info = copy_field_info(
1083+
field_info=powertools_annotation,
1084+
annotation=type_annotation,
1085+
)
1086+
return field_info
1087+
1088+
1089+
def _set_field_default(field_info: FieldInfo, value: Any, is_path_param: bool) -> None:
1090+
"""Set the default value for a field."""
1091+
if field_info.default not in [Undefined, Required]:
1092+
raise AssertionError("FieldInfo needs to have a default value of Undefined or Required")
1093+
1094+
if value is not inspect.Signature.empty:
1095+
if is_path_param:
1096+
raise AssertionError("Cannot use a FieldInfo as a path parameter and pass a value")
1097+
field_info.default = value
1098+
else:
1099+
field_info.default = Required
1100+
1101+
10401102
def get_field_info_annotated_type(annotation, value, is_path_param: bool) -> tuple[FieldInfo | None, Any]:
10411103
"""
10421104
Get the FieldInfo and type annotation from an Annotated type.
10431105
"""
1044-
field_info: FieldInfo | None = None
10451106
annotated_args = get_args(annotation)
10461107
type_annotation = annotated_args[0]
10471108
powertools_annotations = [arg for arg in annotated_args[1:] if isinstance(arg, FieldInfo)]
10481109

1049-
# Special case: handle Field(discriminator) + Body() combination
1050-
# This happens when using Annotated[Union[A, B], Field(discriminator='...')] with Body()
1051-
has_discriminator_with_body = False
1110+
# Determine which annotation to use
10521111
powertools_annotation: FieldInfo | None = None
1112+
has_discriminator_with_body = False
10531113

10541114
if len(powertools_annotations) == 2:
1055-
field_obj = None
1056-
body_obj = None
1057-
for ann in powertools_annotations:
1058-
if isinstance(ann, Body):
1059-
body_obj = ann
1060-
elif isinstance(ann, FieldInfo) and hasattr(ann, "discriminator") and ann.discriminator is not None:
1061-
field_obj = ann
1062-
1063-
if field_obj and body_obj:
1064-
# Use Body as the primary annotation
1065-
powertools_annotation = body_obj
1066-
# Preserve the full annotation including discriminator for proper validation
1067-
# This ensures the discriminator is available when creating the TypeAdapter
1068-
type_annotation = annotation
1069-
has_discriminator_with_body = True
1070-
else:
1071-
raise AssertionError("Only one FieldInfo can be used per parameter")
1115+
powertools_annotation, type_annotation, has_discriminator_with_body = _handle_discriminator_with_body(
1116+
powertools_annotations, annotation,
1117+
)
10721118
elif len(powertools_annotations) > 1:
10731119
raise AssertionError("Only one FieldInfo can be used per parameter")
10741120
else:
10751121
powertools_annotation = next(iter(powertools_annotations), None)
10761122

1123+
# Process the annotation if it exists
1124+
field_info: FieldInfo | None = None
10771125
if isinstance(powertools_annotation, FieldInfo):
1078-
if has_discriminator_with_body:
1079-
# For discriminator + Body case, create a new Body instance directly
1080-
# This avoids issues with copy_field_info trying to process the Field
1081-
field_info = Body()
1082-
field_info.annotation = type_annotation
1083-
else:
1084-
# Copy `field_info` because we mutate `field_info.default` later
1085-
# Use the possibly modified type_annotation for copy_field_info
1086-
field_info = copy_field_info(
1087-
field_info=powertools_annotation,
1088-
annotation=type_annotation,
1089-
)
1090-
if field_info.default not in [Undefined, Required]:
1091-
raise AssertionError("FieldInfo needs to have a default value of Undefined or Required")
1092-
1093-
if value is not inspect.Signature.empty:
1094-
if is_path_param:
1095-
raise AssertionError("Cannot use a FieldInfo as a path parameter and pass a value")
1096-
field_info.default = value
1097-
else:
1098-
field_info.default = Required
1126+
field_info = _create_field_info(powertools_annotation, type_annotation, has_discriminator_with_body)
1127+
_set_field_default(field_info, value, is_path_param)
10991128

1100-
# Preserve the full annotated type if it contains discriminator metadata
1101-
# This is crucial for tagged unions to work properly
1102-
if hasattr(powertools_annotation, "discriminator") and powertools_annotation.discriminator is not None:
1103-
# Store the full annotated type for discriminated unions
1129+
# Preserve full annotated type for discriminated unions
1130+
if _has_discriminator(powertools_annotation):
11041131
type_annotation = annotation
11051132

11061133
return field_info, type_annotation

tests/functional/event_handler/_pydantic/test_openapi_validation_middleware.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1997,10 +1997,10 @@ class BarAction(BaseModel):
19971997
action: Literal["bar"]
19981998
bar_data: int
19991999

2000-
Action = Annotated[FooAction | BarAction, Field(discriminator="action")]
2000+
action_type = Annotated[FooAction | BarAction, Field(discriminator="action")]
20012001

20022002
@app.post("/actions")
2003-
def create_action(action: Annotated[Action, Body()]):
2003+
def create_action(action: Annotated[action_type, Body()]):
20042004
return {"received_action": action.action, "data": action.model_dump()}
20052005

20062006
gw_event["path"] = "/actions"

0 commit comments

Comments
 (0)