forked from aws-powertools/powertools-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_kafka.py
More file actions
82 lines (67 loc) · 3.08 KB
/
test_kafka.py
File metadata and controls
82 lines (67 loc) · 3.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from aws_lambda_powertools.utilities.parser import envelopes, parse
from aws_lambda_powertools.utilities.parser.models import (
KafkaMskEventModel,
KafkaRecordModel,
KafkaSelfManagedEventModel,
)
from tests.functional.utils import load_event
from tests.unit.parser.schemas import MyLambdaKafkaBusiness
def test_kafka_msk_event_with_envelope():
raw_event = load_event("kafkaEventMsk.json")
parsed_event: MyLambdaKafkaBusiness = parse(
event=raw_event,
model=MyLambdaKafkaBusiness,
envelope=envelopes.KafkaEnvelope,
)
assert parsed_event[0].key == "value"
assert len(parsed_event) == 1
def test_kafka_self_managed_event_with_envelope():
raw_event = load_event("kafkaEventSelfManaged.json")
parsed_event: MyLambdaKafkaBusiness = parse(
event=raw_event,
model=MyLambdaKafkaBusiness,
envelope=envelopes.KafkaEnvelope,
)
assert parsed_event[0].key == "value"
assert len(parsed_event) == 1
def test_self_managed_kafka_event():
raw_event = load_event("kafkaEventSelfManaged.json")
parsed_event: KafkaSelfManagedEventModel = KafkaSelfManagedEventModel(**raw_event)
assert parsed_event.eventSource == raw_event["eventSource"]
assert parsed_event.bootstrapServers == raw_event["bootstrapServers"].split(",")
records = list(parsed_event.records["mytopic-0"])
assert len(records) == 1
record: KafkaRecordModel = records[0]
raw_record = raw_event["records"]["mytopic-0"][0]
assert record.topic == raw_record["topic"]
assert record.partition == raw_record["partition"]
assert record.offset == raw_record["offset"]
assert record.timestamp is not None
convert_time = int(round(record.timestamp.timestamp() * 1000))
assert convert_time == raw_record["timestamp"]
assert record.timestampType == raw_record["timestampType"]
assert record.key == b"recordKey"
assert record.value == '{"key":"value"}'
assert len(record.headers) == 1
assert record.headers[0]["headerKey"] == b"headerValue"
def test_kafka_msk_event():
raw_event = load_event("kafkaEventMsk.json")
parsed_event: KafkaMskEventModel = KafkaMskEventModel(**raw_event)
assert parsed_event.eventSource == raw_event["eventSource"]
assert parsed_event.bootstrapServers == raw_event["bootstrapServers"].split(",")
assert parsed_event.eventSourceArn == raw_event["eventSourceArn"]
records = list(parsed_event.records["mytopic-0"])
assert len(records) == 1
record: KafkaRecordModel = records[0]
raw_record = raw_event["records"]["mytopic-0"][0]
assert record.topic == raw_record["topic"]
assert record.partition == raw_record["partition"]
assert record.offset == raw_record["offset"]
assert record.timestamp is not None
convert_time = int(round(record.timestamp.timestamp() * 1000))
assert convert_time == raw_record["timestamp"]
assert record.timestampType == raw_record["timestampType"]
assert record.key == b"recordKey"
assert record.value == '{"key":"value"}'
assert len(record.headers) == 1
assert record.headers[0]["headerKey"] == b"headerValue"