forked from aws-powertools/powertools-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_kinesis.py
More file actions
108 lines (83 loc) · 4.29 KB
/
test_kinesis.py
File metadata and controls
108 lines (83 loc) · 4.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import pytest
from aws_lambda_powertools.utilities.parser import BaseModel, ValidationError, envelopes, parse
from aws_lambda_powertools.utilities.parser.models import (
KinesisDataStreamModel,
KinesisDataStreamRecordPayload,
)
from aws_lambda_powertools.utilities.parser.models.cloudwatch import (
CloudWatchLogsDecode,
)
from aws_lambda_powertools.utilities.parser.models.kinesis import (
extract_cloudwatch_logs_from_event,
extract_cloudwatch_logs_from_record,
)
from tests.functional.utils import load_event
from tests.unit.parser.schemas import MyKinesisBusiness
def test_kinesis_trigger_bad_base64_event():
raw_event = load_event("kinesisStreamEvent.json")
raw_event["Records"][0]["kinesis"]["data"] = "bad"
with pytest.raises(ValidationError):
KinesisDataStreamModel(**raw_event)
def test_kinesis_trigger_event():
raw_event = load_event("kinesisStreamEventOneRecord.json")
parsed_event: MyKinesisBusiness = parse(
event=raw_event,
model=MyKinesisBusiness,
envelope=envelopes.KinesisDataStreamEnvelope,
)
assert len(parsed_event) == 1
record: KinesisDataStreamModel = parsed_event[0]
assert record.message == "test message"
assert record.username == "test"
def test_kinesis_trigger_event_no_envelope():
raw_event = load_event("kinesisStreamEvent.json")
parsed_event: KinesisDataStreamModel = KinesisDataStreamModel(**raw_event)
records = parsed_event.Records
assert len(records) == 2
record: KinesisDataStreamModel = records[0]
raw_record = raw_event["Records"][0]
assert record.awsRegion == raw_record["awsRegion"]
assert record.eventID == raw_record["eventID"]
assert record.eventName == raw_record["eventName"]
assert record.eventSource == raw_record["eventSource"]
assert record.eventSourceARN == raw_record["eventSourceARN"]
assert record.eventVersion == raw_record["eventVersion"]
assert record.invokeIdentityArn == raw_record["invokeIdentityArn"]
kinesis: KinesisDataStreamRecordPayload = record.kinesis
assert kinesis.approximateArrivalTimestamp == raw_record["kinesis"]["approximateArrivalTimestamp"]
assert kinesis.kinesisSchemaVersion == raw_record["kinesis"]["kinesisSchemaVersion"]
assert kinesis.partitionKey == raw_record["kinesis"]["partitionKey"]
assert kinesis.sequenceNumber == raw_record["kinesis"]["sequenceNumber"]
assert kinesis.data == b"Hello, this is a test."
def test_validate_event_does_not_conform_with_model_no_envelope():
raw_event: dict = {"hello": "s"}
with pytest.raises(ValidationError):
KinesisDataStreamModel(**raw_event)
def test_validate_event_does_not_conform_with_model():
raw_event: dict = {"hello": "s"}
with pytest.raises(ValidationError):
parse(event=raw_event, model=MyKinesisBusiness, envelope=envelopes.KinesisDataStreamEnvelope)
def test_kinesis_stream_event_cloudwatch_logs_data_extraction():
# GIVEN a KinesisDataStreamModel is instantiated with CloudWatch Logs compressed data
raw_event = load_event("kinesisStreamCloudWatchLogsEvent.json")
stream_data = KinesisDataStreamModel(**raw_event)
single_record = stream_data.Records[0]
# WHEN we try to extract CloudWatch Logs from KinesisDataStreamRecordPayload model
extracted_logs = extract_cloudwatch_logs_from_event(stream_data)
individual_logs = [extract_cloudwatch_logs_from_record(record) for record in stream_data.Records]
single_log = extract_cloudwatch_logs_from_record(single_record)
# THEN we should have extracted any potential logs as CloudWatchLogsDecode models
assert len(extracted_logs) == len(individual_logs)
assert isinstance(single_log, CloudWatchLogsDecode)
def test_kinesis_stream_event_cloudwatch_logs_data_extraction_fails_with_custom_model():
# GIVEN a custom model replaces Kinesis Record Data bytes
class DummyModel(BaseModel):
...
raw_event = load_event("kinesisStreamCloudWatchLogsEvent.json")
stream_data = KinesisDataStreamModel(**raw_event)
# WHEN decompress_zlib_record_data_as_json is used
# THEN ValueError should be raised
with pytest.raises(ValueError, match="We can only decompress bytes data"):
for record in stream_data.Records:
record.kinesis.data = DummyModel()
record.decompress_zlib_record_data_as_json()