From a8337aeb1065682bf15b658fd50fb856035f288f Mon Sep 17 00:00:00 2001 From: huawei Date: Sun, 2 Aug 2020 19:35:50 +0800 Subject: [PATCH 1/7] Support snapshot context --- skywalking/agent/protocol/grpc.py | 2 +- skywalking/trace/context.py | 40 ++++++++++- skywalking/trace/segment.py | 18 ++++- skywalking/trace/snapshot/__init__.py | 40 +++++++++++ tests/plugin/sw_flask/expected.data.yml | 79 ++++++++++++++++++++-- tests/plugin/sw_flask/services/consumer.py | 14 ++++ tests/plugin/sw_flask/test_flask.py | 2 +- 7 files changed, 185 insertions(+), 10 deletions(-) create mode 100644 skywalking/trace/snapshot/__init__.py diff --git a/skywalking/agent/protocol/grpc.py b/skywalking/agent/protocol/grpc.py index 1e47ba45..ca24895b 100644 --- a/skywalking/agent/protocol/grpc.py +++ b/skywalking/agent/protocol/grpc.py @@ -88,7 +88,7 @@ def generator(): value=str(tag.val), ) for tag in span.tags], refs=[SegmentReference( - refType=0, + refType=0 if ref.ref_type == "CrossProcess" else 1, traceId=ref.trace_id, parentTraceSegmentId=ref.segment_id, parentSpanId=ref.span_id, diff --git a/skywalking/trace/context.py b/skywalking/trace/context.py index 99fe8af9..b78d61f4 100644 --- a/skywalking/trace/context.py +++ b/skywalking/trace/context.py @@ -20,8 +20,10 @@ from typing import List from skywalking import agent, config +from skywalking.trace import ID from skywalking.trace.carrier import Carrier -from skywalking.trace.segment import Segment +from skywalking.trace.segment import Segment, SegmentRef +from skywalking.trace.snapshot import Snapshot from skywalking.trace.span import Span, Kind, NoopSpan, EntrySpan, ExitSpan from skywalking.utils.counter import Counter @@ -117,7 +119,7 @@ def active_span(self): if self.spans: return self.spans[len(self.spans) - 1] - return None + raise Exception("No active span.") def get_correlation(self, key): if key in self._correlation: @@ -137,6 +139,28 @@ def put_correlation(self, key, value): self._correlation[key] = value + def capture(self): + if len(self.spans) == 0: + raise Exception("Spans can't be empty. ") + + return Snapshot( + segment_id=str(self.segment.segment_id), + span_id=self.active_span().sid, + trace_id=self.segment.related_traces[0], + endpoint=self.spans[0].op, + correlation=self._correlation, + ) + + def continued(self, snapshot: 'Snapshot'): + if snapshot is None: + raise Exception("Snapshot can't be none.") + if not snapshot.is_from_current(self) and snapshot.is_valid(): + ref = SegmentRef.build_ref(snapshot) + span = self.active_span() + span.refs.append(ref) + self.segment.relate(ID(ref.trace_id)) + self._correlation.update(snapshot.correlation) + class NoopContext(SpanContext): def __init__(self): @@ -169,6 +193,18 @@ def stop(self, span: Span) -> bool: def active_span(self): return self._noop_span + def capture(self): + return Snapshot( + segment_id=None, + span_id=-1, + trace_id=None, + endpoint=None, + correlation=self._correlation, + ) + + def continued(self, snapshot: 'Snapshot'): + self._correlation.update(snapshot.correlation) + _thread_local = threading.local() _thread_local.context = None diff --git a/skywalking/trace/segment.py b/skywalking/trace/segment.py index 4a8d905e..fc3e43f9 100644 --- a/skywalking/trace/segment.py +++ b/skywalking/trace/segment.py @@ -18,6 +18,8 @@ import time from typing import List, TYPE_CHECKING +from skywalking import config + from skywalking.trace import ID from skywalking.utils.lang import tostring @@ -27,8 +29,8 @@ class SegmentRef(object): - def __init__(self, carrier: 'Carrier'): - self.ref_type = 'CrossProcess' # type: str + def __init__(self, carrier: 'Carrier', ref_type: str = 'CrossProcess'): + self.ref_type = ref_type # type: str self.trace_id = carrier.trace_id # type: str self.segment_id = carrier.segment_id # type: str self.span_id = int(carrier.span_id) # type: int @@ -49,6 +51,18 @@ def __eq__(self, other): self.endpoint == other.endpoint and \ self.client_address == other.client_address + @classmethod + def build_ref(cls, snapshot: 'Snapshot'): + from skywalking.trace.carrier import Carrier + carrier = Carrier() + carrier.trace_id = str(snapshot.trace_id) + carrier.segment_id = str(snapshot.segment_id) + carrier.endpoint = snapshot.endpoint + carrier.span_id = snapshot.span_id + carrier.service = config.service_name + carrier.service_instance = config.service_instance + return SegmentRef(carrier, ref_type="CrossThread") + class _NewID(ID): pass diff --git a/skywalking/trace/snapshot/__init__.py b/skywalking/trace/snapshot/__init__.py new file mode 100644 index 00000000..57d34179 --- /dev/null +++ b/skywalking/trace/snapshot/__init__.py @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from skywalking.trace import ID + + +class Snapshot: + def __init__( + self, + segment_id: str = None, + span_id: int = None, + trace_id: ID = None, + endpoint: str = None, + correlation: dict = None + ): + self.trace_id = trace_id # type: ID + self.segment_id = segment_id # type: str + self.span_id = span_id # type: int + self.endpoint = endpoint # type: str + self.correlation = correlation.copy() # type: dict + + def is_from_current(self, context: 'SpanContext'): + return self.segment_id is not None and self.segment_id == context.capture().segment_id + + def is_valid(self): + return self.segment_id is not None and self.span_id > -1 and self.trace_id is not None diff --git a/tests/plugin/sw_flask/expected.data.yml b/tests/plugin/sw_flask/expected.data.yml index 453f5392..add2226a 100644 --- a/tests/plugin/sw_flask/expected.data.yml +++ b/tests/plugin/sw_flask/expected.data.yml @@ -17,7 +17,7 @@ segmentItems: - serviceName: provider - segmentSize: 1 + segmentSize: 2 segments: - segmentId: not null spans: @@ -35,7 +35,36 @@ segmentItems: value: '200' refs: - parentEndpoint: /users - networkAddress: provider:9091 + networkAddress: 'provider:9091' + refType: CrossProcess + parentSpanId: 1 + parentTraceSegmentId: not null + parentServiceInstance: not null + parentService: consumer + traceId: not null + startTime: gt 0 + endTime: gt 0 + componentId: 7001 + spanType: Entry + peer: not null + skipAnalysis: false + - segmentId: not null + spans: + - operationName: /users + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Http + tags: + - key: http.method + value: POST + - key: url + value: http://provider:9091/users + - key: status.code + value: '200' + refs: + - parentEndpoint: /users + networkAddress: 'provider:9091' refType: CrossProcess parentSpanId: 1 parentTraceSegmentId: not null @@ -49,8 +78,50 @@ segmentItems: peer: not null skipAnalysis: false - serviceName: consumer - segmentSize: 1 + segmentSize: 2 segments: + - segmentId: not null + spans: + - operationName: /users + operationId: 0 + parentSpanId: 0 + spanId: 1 + spanLayer: Http + startTime: gt 0 + endTime: gt 0 + componentId: 7002 + isError: false + spanType: Exit + peer: provider:9091 + skipAnalysis: false + tags: + - key: http.method + value: POST + - key: url + value: 'http://provider:9091/users' + - key: status.code + value: '200' + - operationName: /test + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Unknown + startTime: gt 0 + endTime: gt 0 + componentId: 0 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + refs: + - parentEndpoint: /users + networkAddress: '' + refType: CrossThread + parentSpanId: 0 + parentTraceSegmentId: not null + parentServiceInstance: not null + parentService: consumer + traceId: not null - segmentId: not null spans: - operationName: /users @@ -90,4 +161,4 @@ segmentItems: componentId: 7001 spanType: Entry peer: not null - skipAnalysis: false \ No newline at end of file + skipAnalysis: false diff --git a/tests/plugin/sw_flask/services/consumer.py b/tests/plugin/sw_flask/services/consumer.py index 129222a7..27ee05fc 100644 --- a/tests/plugin/sw_flask/services/consumer.py +++ b/tests/plugin/sw_flask/services/consumer.py @@ -29,12 +29,26 @@ app = Flask(__name__) + @app.route("/users", methods=["POST", "GET"]) def application(): from skywalking.trace.context import get_context get_context().put_correlation("correlation", "correlation") + + def post(snap): + with get_context().new_local_span("/test") as span: + get_context().continued(snap) + requests.post("http://provider:9091/users") + + snapshot = get_context().capture() + + from threading import Thread + t = Thread(target=post, args=(snapshot,)) + t.start() + res = requests.post("http://provider:9091/users") return jsonify(res.json()) + PORT = 9090 app.run(host='0.0.0.0', port=PORT, debug=True) diff --git a/tests/plugin/sw_flask/test_flask.py b/tests/plugin/sw_flask/test_flask.py index 1ff07d59..e6fd6c6d 100644 --- a/tests/plugin/sw_flask/test_flask.py +++ b/tests/plugin/sw_flask/test_flask.py @@ -33,7 +33,7 @@ def setUpClass(cls): cls.compose.wait_for(cls.url(('consumer', '9090'), 'users?test=test1&test=test2&test2=test2')) def test_plugin(self): - time.sleep(3) + time.sleep(5) self.validate() response = requests.get(TestPlugin.url(('consumer', '9090'), 'users')) From 367e8bcd6cd8aa9e721831638f64f3372d05b7b4 Mon Sep 17 00:00:00 2001 From: huawei Date: Sun, 2 Aug 2020 19:40:00 +0800 Subject: [PATCH 2/7] update code style --- skywalking/trace/segment.py | 1 + 1 file changed, 1 insertion(+) diff --git a/skywalking/trace/segment.py b/skywalking/trace/segment.py index fc3e43f9..f2e0d4e9 100644 --- a/skywalking/trace/segment.py +++ b/skywalking/trace/segment.py @@ -26,6 +26,7 @@ if TYPE_CHECKING: from skywalking.trace.carrier import Carrier from skywalking.trace.span import Span + from skywalking.trace.snapshot import Snapshot class SegmentRef(object): From 032c391f1954b349e3d9a88b498f02bb123e4c5b Mon Sep 17 00:00:00 2001 From: huawei Date: Sun, 2 Aug 2020 19:42:38 +0800 Subject: [PATCH 3/7] update code style --- skywalking/trace/snapshot/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/skywalking/trace/snapshot/__init__.py b/skywalking/trace/snapshot/__init__.py index 57d34179..3072ba9d 100644 --- a/skywalking/trace/snapshot/__init__.py +++ b/skywalking/trace/snapshot/__init__.py @@ -14,6 +14,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from skywalking.trace.context import SpanContext from skywalking.trace import ID From af0559065d3c2552c52346f697f1c29785c20e02 Mon Sep 17 00:00:00 2001 From: huawei Date: Sun, 2 Aug 2020 19:44:41 +0800 Subject: [PATCH 4/7] update code style --- tests/plugin/sw_flask/services/consumer.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/plugin/sw_flask/services/consumer.py b/tests/plugin/sw_flask/services/consumer.py index 27ee05fc..d038b14d 100644 --- a/tests/plugin/sw_flask/services/consumer.py +++ b/tests/plugin/sw_flask/services/consumer.py @@ -29,14 +29,13 @@ app = Flask(__name__) - @app.route("/users", methods=["POST", "GET"]) def application(): from skywalking.trace.context import get_context get_context().put_correlation("correlation", "correlation") def post(snap): - with get_context().new_local_span("/test") as span: + with get_context().new_local_span("/test"): get_context().continued(snap) requests.post("http://provider:9091/users") @@ -49,6 +48,5 @@ def post(snap): res = requests.post("http://provider:9091/users") return jsonify(res.json()) - PORT = 9090 app.run(host='0.0.0.0', port=PORT, debug=True) From cea7ed099c6e811f8188bd37fd660f2f5a7d3fd2 Mon Sep 17 00:00:00 2001 From: kezhenxu94 Date: Mon, 3 Aug 2020 21:13:46 +0800 Subject: [PATCH 5/7] Fix threading sequential problem in test --- tests/plugin/sw_flask/services/consumer.py | 3 +++ tests/plugin/sw_flask/test_flask.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/plugin/sw_flask/services/consumer.py b/tests/plugin/sw_flask/services/consumer.py index d038b14d..19a6cbba 100644 --- a/tests/plugin/sw_flask/services/consumer.py +++ b/tests/plugin/sw_flask/services/consumer.py @@ -46,6 +46,9 @@ def post(snap): t.start() res = requests.post("http://provider:9091/users") + + t.join() + return jsonify(res.json()) PORT = 9090 diff --git a/tests/plugin/sw_flask/test_flask.py b/tests/plugin/sw_flask/test_flask.py index e6fd6c6d..1ff07d59 100644 --- a/tests/plugin/sw_flask/test_flask.py +++ b/tests/plugin/sw_flask/test_flask.py @@ -33,7 +33,7 @@ def setUpClass(cls): cls.compose.wait_for(cls.url(('consumer', '9090'), 'users?test=test1&test=test2&test2=test2')) def test_plugin(self): - time.sleep(5) + time.sleep(3) self.validate() response = requests.get(TestPlugin.url(('consumer', '9090'), 'users')) From aaf3a6c7cc01cca87e2e2f89267a317f82c6717a Mon Sep 17 00:00:00 2001 From: huawei Date: Mon, 3 Aug 2020 21:33:04 +0800 Subject: [PATCH 6/7] update exception --- skywalking/trace/context.py | 6 +++--- skywalking/trace/{snapshot/__init__.py => snapshot.py} | 0 2 files changed, 3 insertions(+), 3 deletions(-) rename skywalking/trace/{snapshot/__init__.py => snapshot.py} (100%) diff --git a/skywalking/trace/context.py b/skywalking/trace/context.py index b78d61f4..23cdbc66 100644 --- a/skywalking/trace/context.py +++ b/skywalking/trace/context.py @@ -119,7 +119,7 @@ def active_span(self): if self.spans: return self.spans[len(self.spans) - 1] - raise Exception("No active span.") + return None def get_correlation(self, key): if key in self._correlation: @@ -141,7 +141,7 @@ def put_correlation(self, key, value): def capture(self): if len(self.spans) == 0: - raise Exception("Spans can't be empty. ") + return None return Snapshot( segment_id=str(self.segment.segment_id), @@ -153,7 +153,7 @@ def capture(self): def continued(self, snapshot: 'Snapshot'): if snapshot is None: - raise Exception("Snapshot can't be none.") + return None if not snapshot.is_from_current(self) and snapshot.is_valid(): ref = SegmentRef.build_ref(snapshot) span = self.active_span() diff --git a/skywalking/trace/snapshot/__init__.py b/skywalking/trace/snapshot.py similarity index 100% rename from skywalking/trace/snapshot/__init__.py rename to skywalking/trace/snapshot.py From 01dd0403ee4277700f20d578103bccde781af337 Mon Sep 17 00:00:00 2001 From: huawei Date: Mon, 3 Aug 2020 21:34:10 +0800 Subject: [PATCH 7/7] update test case --- tests/plugin/sw_flask/services/consumer.py | 1 + tests/plugin/sw_flask/test_flask.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/plugin/sw_flask/services/consumer.py b/tests/plugin/sw_flask/services/consumer.py index d038b14d..791f2118 100644 --- a/tests/plugin/sw_flask/services/consumer.py +++ b/tests/plugin/sw_flask/services/consumer.py @@ -44,6 +44,7 @@ def post(snap): from threading import Thread t = Thread(target=post, args=(snapshot,)) t.start() + t.join() res = requests.post("http://provider:9091/users") return jsonify(res.json()) diff --git a/tests/plugin/sw_flask/test_flask.py b/tests/plugin/sw_flask/test_flask.py index e6fd6c6d..1ff07d59 100644 --- a/tests/plugin/sw_flask/test_flask.py +++ b/tests/plugin/sw_flask/test_flask.py @@ -33,7 +33,7 @@ def setUpClass(cls): cls.compose.wait_for(cls.url(('consumer', '9090'), 'users?test=test1&test=test2&test2=test2')) def test_plugin(self): - time.sleep(5) + time.sleep(3) self.validate() response = requests.get(TestPlugin.url(('consumer', '9090'), 'users'))