forked from DataDog/datadog-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtracing.py
More file actions
268 lines (216 loc) · 8.95 KB
/
tracing.py
File metadata and controls
268 lines (216 loc) · 8.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# Unless explicitly stated otherwise all files in this repository are licensed
# under the Apache License Version 2.0.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2019 Datadog, Inc.
import logging
import os
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.lambda_launcher import LambdaContext
from datadog_lambda.constants import (
SamplingPriority,
TraceHeader,
XraySubsegment,
TraceContextSource,
)
from ddtrace import tracer, patch
from ddtrace import __version__ as ddtrace_version
from ddtrace.propagation.http import HTTPPropagator
from datadog_lambda import __version__ as datadog_lambda_version
logger = logging.getLogger(__name__)
dd_trace_context = {}
dd_tracing_enabled = os.environ.get("DD_TRACE_ENABLED", "false").lower() == "true"
propagator = HTTPPropagator()
def _convert_xray_trace_id(xray_trace_id):
"""
Convert X-Ray trace id (hex)'s last 63 bits to a Datadog trace id (int).
"""
return str(0x7FFFFFFFFFFFFFFF & int(xray_trace_id[-16:], 16))
def _convert_xray_entity_id(xray_entity_id):
"""
Convert X-Ray (sub)segement id (hex) to a Datadog span id (int).
"""
return str(int(xray_entity_id, 16))
def _convert_xray_sampling(xray_sampled):
"""
Convert X-Ray sampled (True/False) to its Datadog counterpart.
"""
return (
str(SamplingPriority.USER_KEEP)
if xray_sampled
else str(SamplingPriority.USER_REJECT)
)
def _get_xray_trace_context():
if not is_lambda_context():
return None
xray_trace_entity = xray_recorder.get_trace_entity() # xray (sub)segment
return {
"trace-id": _convert_xray_trace_id(xray_trace_entity.trace_id),
"parent-id": _convert_xray_entity_id(xray_trace_entity.id),
"sampling-priority": _convert_xray_sampling(xray_trace_entity.sampled),
"source": TraceContextSource.XRAY,
}
def _get_dd_trace_py_context():
span = tracer.current_span()
if not span:
return None
parent_id = span.context.span_id
trace_id = span.context.trace_id
sampling_priority = span.context.sampling_priority
return {
"parent-id": str(parent_id),
"trace-id": str(trace_id),
"sampling-priority": str(sampling_priority),
"source": TraceContextSource.DDTRACE,
}
def _context_obj_to_headers(obj):
return {
TraceHeader.TRACE_ID: str(obj.get("trace-id")),
TraceHeader.PARENT_ID: str(obj.get("parent-id")),
TraceHeader.SAMPLING_PRIORITY: str(obj.get("sampling-priority")),
}
def extract_dd_trace_context(event):
"""
Extract Datadog trace context from the Lambda `event` object.
Write the context to a global `dd_trace_context`, so the trace
can be continued on the outgoing requests with the context injected.
Save the context to an X-Ray subsegment's metadata field, so the X-Ray
trace can be converted to a Datadog trace in the Datadog backend with
the correct context.
"""
global dd_trace_context
headers = event.get("headers", {})
lowercase_headers = {k.lower(): v for k, v in headers.items()}
trace_id = lowercase_headers.get(TraceHeader.TRACE_ID)
parent_id = lowercase_headers.get(TraceHeader.PARENT_ID)
sampling_priority = lowercase_headers.get(TraceHeader.SAMPLING_PRIORITY)
if trace_id and parent_id and sampling_priority:
logger.debug("Extracted Datadog trace context from headers")
metadata = {
"trace-id": trace_id,
"parent-id": parent_id,
"sampling-priority": sampling_priority,
}
xray_recorder.begin_subsegment(XraySubsegment.NAME)
subsegment = xray_recorder.current_subsegment()
subsegment.put_metadata(XraySubsegment.KEY, metadata, XraySubsegment.NAMESPACE)
dd_trace_context = metadata.copy()
dd_trace_context["source"] = TraceContextSource.EVENT
xray_recorder.end_subsegment()
else:
# AWS Lambda runtime caches global variables between invocations,
# reset to avoid using the context from the last invocation.
dd_trace_context = _get_xray_trace_context()
logger.debug("extracted dd trace context %s", dd_trace_context)
return dd_trace_context
def get_dd_trace_context():
"""
Return the Datadog trace context to be propogated on the outgoing requests.
If the Lambda function is invoked by a Datadog-traced service, a Datadog
trace context may already exist, and it should be used. Otherwise, use the
current X-Ray trace entity, or the dd-trace-py context if DD_TRACE_ENABLED is true.
Most of widely-used HTTP clients are patched to inject the context
automatically, but this function can be used to manually inject the trace
context to an outgoing request.
"""
global dd_trace_context
context = None
xray_context = None
try:
xray_context = _get_xray_trace_context() # xray (sub)segment
except Exception as e:
logger.debug(
"get_dd_trace_context couldn't read from segment from x-ray, with error %s"
% e
)
if xray_context and not dd_trace_context:
context = xray_context
elif xray_context and dd_trace_context:
context = dd_trace_context.copy()
context["parent-id"] = xray_context["parent-id"]
if dd_tracing_enabled:
dd_trace_py_context = _get_dd_trace_py_context()
if dd_trace_py_context is not None:
logger.debug("get_dd_trace_context using dd-trace context")
context = dd_trace_py_context
return _context_obj_to_headers(context) if context is not None else {}
def set_correlation_ids():
"""
Create a dummy span, and overrides its trace_id and span_id, to make
ddtrace.helpers.get_correlation_ids() return the correct ids for both
auto and manual log correlations.
TODO: Remove me when Datadog tracer is natively supported in Lambda.
"""
if not is_lambda_context():
logger.debug("set_correlation_ids is only supported in LambdaContext")
return
if dd_tracing_enabled:
logger.debug("using ddtrace implementation for spans")
return
context = get_dd_trace_context()
span = tracer.trace("dummy.span")
span.trace_id = context[TraceHeader.TRACE_ID]
span.span_id = context[TraceHeader.PARENT_ID]
logger.debug("correlation ids set")
def inject_correlation_ids():
"""
Override the formatter of LambdaLoggerHandler to inject datadog trace and
span id for log correlation.
For manual injections to custom log handlers, use `ddtrace.helpers.get_correlation_ids`
to retrieve correlation ids (trace_id, span_id).
"""
# Override the log format of the AWS provided LambdaLoggerHandler
root_logger = logging.getLogger()
for handler in root_logger.handlers:
if handler.__class__.__name__ == "LambdaLoggerHandler":
handler.setFormatter(
logging.Formatter(
"[%(levelname)s]\t%(asctime)s.%(msecs)dZ\t%(aws_request_id)s\t"
"[dd.trace_id=%(dd.trace_id)s dd.span_id=%(dd.span_id)s]\t%(message)s\n",
"%Y-%m-%dT%H:%M:%S",
)
)
# Patch `logging.Logger.makeRecord` to actually inject correlation ids
patch(logging=True)
logger.debug("logs injection configured")
def is_lambda_context():
"""
Return True if the X-Ray context is `LambdaContext`, rather than the
regular `Context` (e.g., when testing lambda functions locally).
"""
return type(xray_recorder.context) == LambdaContext
def set_dd_trace_py_root(trace_context, merge_xray_traces):
if trace_context["source"] == TraceContextSource.EVENT or merge_xray_traces:
headers = get_dd_trace_context()
span_context = propagator.extract(headers)
tracer.context_provider.activate(span_context)
def create_function_execution_span(
context, function_name, is_cold_start, trace_context, merge_xray_traces
):
tags = {}
if context:
function_arn = (context.invoked_function_arn or "").lower()
tk = function_arn.split(":")
function_arn = ":".join(tk[0:7]) if len(tk) > 7 else function_arn
function_version = tk[7] if len(tk) > 7 else "$LATEST"
tags = {
"cold_start": str(is_cold_start).lower(),
"function_arn": function_arn,
"function_version": function_version,
"request_id": context.aws_request_id,
"resource_names": context.function_name,
"datadog_lambda": datadog_lambda_version,
"dd_trace": ddtrace_version,
}
source = trace_context["source"]
if source == TraceContextSource.XRAY and merge_xray_traces:
tags["_dd.parent_source"] = source
args = {
"service": "aws.lambda",
"resource": function_name,
"span_type": "serverless",
}
tracer.set_tags({"_dd.origin": "lambda"})
span = tracer.trace("aws.lambda", **args)
if span:
span.set_tags(tags)
return span