Skip to content

Commit db8d528

Browse files
daniel-sanchegcf-owl-bot[bot]mutianf
authored
feat: add data model for client side metrics (#1187)
This PR revives googleapis/python-bigtable#923, which was de-priotirized to work on the sync client. This PR brings it back, working with both async and sync. It also adds a grpc interceptor, as an improved way to capture metadata across both clients --- ## Design The main architecture looks like this: <img width="651" height="631" alt="300137129-bebbb05a-20f0-45c2-9d38-e95a314edf64 drawio (1)" src="proxy.php?url=https%3A%2F%2Fgithub.com%2F%3Ca+href%3D"https://github.com/user-attachments/assets/c8318ac8-5f18-4027-9f64-4a40a8ab1d79">https://github.com/user-attachments/assets/c8318ac8-5f18-4027-9f64-4a40a8ab1d79" /> Most of the work is done by the ActiveOperationMetric class, which is instantiated with each rpc call, and updated through the lifecycle of the call. When the rpc is complete, it will call `on_operation_complete` and `on_attempt_complete` on the MetricsHandler, which can then log the completed data into OpenTelemetry (or theoretically, other locations if needed) Note that there are separate classes for active vs completed metrics (`ActiveOperationMetric`, `ActiveAttemptMetric`, `CompletedOperationMetric`, `CompletedAttemptMetric`). This is so that we can keep fields mutable and optional while the request is ongoing, but pass down static immutable copies once the attempt is completed and no new data is coming --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com> Co-authored-by: Mattie Fu <[email protected]>
1 parent a1e74dd commit db8d528

File tree

20 files changed

+2555
-40
lines changed

20 files changed

+2555
-40
lines changed

packages/google-cloud-bigtable/google/cloud/bigtable/data/_async/client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
from google.cloud.bigtable.data.row_filters import StripValueTransformerFilter
8989
from google.cloud.bigtable.data.row_filters import CellsRowLimitFilter
9090
from google.cloud.bigtable.data.row_filters import RowFilterChain
91+
from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController
9192

9293
from google.cloud.bigtable.data._cross_sync import CrossSync
9394

@@ -1039,6 +1040,8 @@ def __init__(
10391040
default_retryable_errors or ()
10401041
)
10411042

1043+
self._metrics = BigtableClientSideMetricsController()
1044+
10421045
try:
10431046
self._register_instance_future = CrossSync.create_task(
10441047
self.client._register_instance,
@@ -1753,6 +1756,7 @@ async def close(self):
17531756
"""
17541757
Called to close the Table instance and release any resources held by it.
17551758
"""
1759+
self._metrics.close()
17561760
if self._register_instance_future:
17571761
self._register_instance_future.cancel()
17581762
self.client._remove_instance_registration(

packages/google-cloud-bigtable/google/cloud/bigtable/data/_async/metrics_interceptor.py

Lines changed: 99 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,21 @@
1313
# limitations under the License.
1414
from __future__ import annotations
1515

16+
from typing import Sequence
17+
18+
import time
19+
from functools import wraps
20+
21+
from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric
22+
from google.cloud.bigtable.data._metrics.data_model import OperationState
23+
from google.cloud.bigtable.data._metrics.data_model import OperationType
24+
1625
from google.cloud.bigtable.data._cross_sync import CrossSync
1726

1827
if CrossSync.is_async:
1928
from grpc.aio import UnaryUnaryClientInterceptor
2029
from grpc.aio import UnaryStreamClientInterceptor
30+
from grpc.aio import AioRpcError
2131
else:
2232
from grpc import UnaryUnaryClientInterceptor
2333
from grpc import UnaryStreamClientInterceptor
@@ -26,6 +36,57 @@
2636
__CROSS_SYNC_OUTPUT__ = "google.cloud.bigtable.data._sync_autogen.metrics_interceptor"
2737

2838

39+
def _with_active_operation(func):
40+
"""
41+
Decorator for interceptor methods to extract the active operation associated with the
42+
in-scope contextvars, and pass it to the decorated function.
43+
"""
44+
45+
@wraps(func)
46+
def wrapper(self, continuation, client_call_details, request):
47+
operation: ActiveOperationMetric | None = ActiveOperationMetric.from_context()
48+
49+
if operation:
50+
# start a new attempt if not started
51+
if (
52+
operation.state == OperationState.CREATED
53+
or operation.state == OperationState.BETWEEN_ATTEMPTS
54+
):
55+
operation.start_attempt()
56+
# wrap continuation in logic to process the operation
57+
return func(self, operation, continuation, client_call_details, request)
58+
else:
59+
# if operation not found, return unwrapped continuation
60+
return continuation(client_call_details, request)
61+
62+
return wrapper
63+
64+
65+
@CrossSync.convert
66+
async def _get_metadata(source) -> dict[str, str | bytes] | None:
67+
"""Helper to extract metadata from a call or RpcError"""
68+
try:
69+
metadata: Sequence[tuple[str, str | bytes]]
70+
if CrossSync.is_async:
71+
# grpc.aio returns metadata in Metadata objects
72+
if isinstance(source, AioRpcError):
73+
metadata = list(source.trailing_metadata()) + list(
74+
source.initial_metadata()
75+
)
76+
else:
77+
metadata = list(await source.trailing_metadata()) + list(
78+
await source.initial_metadata()
79+
)
80+
else:
81+
# sync grpc returns metadata as a sequence of tuples
82+
metadata = source.trailing_metadata() + source.initial_metadata()
83+
# convert metadata to dict format
84+
return {k: v for (k, v) in metadata}
85+
except Exception:
86+
# ignore errors while fetching metadata
87+
return None
88+
89+
2990
@CrossSync.convert_class(sync_name="BigtableMetricsInterceptor")
3091
class AsyncBigtableMetricsInterceptor(
3192
UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor
@@ -35,21 +96,33 @@ class AsyncBigtableMetricsInterceptor(
3596
"""
3697

3798
@CrossSync.convert
38-
async def intercept_unary_unary(self, continuation, client_call_details, request):
99+
@_with_active_operation
100+
async def intercept_unary_unary(
101+
self, operation, continuation, client_call_details, request
102+
):
39103
"""
40104
Interceptor for unary rpcs:
41105
- MutateRow
42106
- CheckAndMutateRow
43107
- ReadModifyWriteRow
44108
"""
109+
metadata = None
45110
try:
46111
call = await continuation(client_call_details, request)
112+
metadata = await _get_metadata(call)
47113
return call
48114
except Exception as rpc_error:
115+
metadata = await _get_metadata(rpc_error)
49116
raise rpc_error
117+
finally:
118+
if metadata is not None:
119+
operation.add_response_metadata(metadata)
50120

51121
@CrossSync.convert
52-
async def intercept_unary_stream(self, continuation, client_call_details, request):
122+
@_with_active_operation
123+
async def intercept_unary_stream(
124+
self, operation, continuation, client_call_details, request
125+
):
53126
"""
54127
Interceptor for streaming rpcs:
55128
- ReadRows
@@ -58,21 +131,42 @@ async def intercept_unary_stream(self, continuation, client_call_details, reques
58131
"""
59132
try:
60133
return self._streaming_generator_wrapper(
61-
await continuation(client_call_details, request)
134+
operation, await continuation(client_call_details, request)
62135
)
63136
except Exception as rpc_error:
64137
# handle errors while intializing stream
138+
metadata = await _get_metadata(rpc_error)
139+
if metadata is not None:
140+
operation.add_response_metadata(metadata)
65141
raise rpc_error
66142

67143
@staticmethod
68144
@CrossSync.convert
69-
async def _streaming_generator_wrapper(call):
145+
async def _streaming_generator_wrapper(operation, call):
70146
"""
71147
Wrapped generator to be returned by intercept_unary_stream.
72148
"""
149+
# only track has_first response for READ_ROWS
150+
has_first_response = (
151+
operation.first_response_latency_ns is not None
152+
or operation.op_type != OperationType.READ_ROWS
153+
)
154+
encountered_exc = None
73155
try:
74156
async for response in call:
157+
# record time to first response. Currently only used for READ_ROWs
158+
if not has_first_response:
159+
operation.first_response_latency_ns = (
160+
time.monotonic_ns() - operation.start_time_ns
161+
)
162+
has_first_response = True
75163
yield response
76164
except Exception as e:
77165
# handle errors while processing stream
78-
raise e
166+
encountered_exc = e
167+
raise
168+
finally:
169+
if call is not None:
170+
metadata = await _get_metadata(encountered_exc or call)
171+
if metadata is not None:
172+
operation.add_response_metadata(metadata)

packages/google-cloud-bigtable/google/cloud/bigtable/data/_helpers.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery
2424

2525
from google.api_core import exceptions as core_exceptions
26+
from google.api_core.retry import exponential_sleep_generator
2627
from google.api_core.retry import RetryFailureReason
2728
from google.cloud.bigtable.data.exceptions import RetryExceptionGroup
2829

@@ -248,3 +249,61 @@ def _get_retryable_errors(
248249
call_codes = table.default_mutate_rows_retryable_errors
249250

250251
return [_get_error_type(e) for e in call_codes]
252+
253+
254+
class TrackedBackoffGenerator:
255+
"""
256+
Generator class for exponential backoff sleep times.
257+
This implementation builds on top of api_core.retries.exponential_sleep_generator,
258+
adding the ability to retrieve previous values using get_attempt_backoff(idx).
259+
This is used by the Metrics class to track the sleep times used for each attempt.
260+
"""
261+
262+
def __init__(self, initial=0.01, maximum=60, multiplier=2):
263+
self.history = []
264+
self.subgenerator = exponential_sleep_generator(
265+
initial=initial, maximum=maximum, multiplier=multiplier
266+
)
267+
self._next_override: float | None = None
268+
269+
def __iter__(self):
270+
return self
271+
272+
def set_next(self, next_value: float):
273+
"""
274+
Set the next backoff value, instead of generating one from subgenerator.
275+
After the value is yielded, it will go back to using self.subgenerator.
276+
277+
If set_next is called twice before the next() is called, only the latest
278+
value will be used and others discarded
279+
280+
Args:
281+
next_value: the upcomming value to yield when next() is called
282+
Raises:
283+
ValueError: if next_value is negative
284+
"""
285+
if next_value < 0:
286+
raise ValueError("backoff value cannot be less than 0")
287+
self._next_override = next_value
288+
289+
def __next__(self) -> float:
290+
if self._next_override is not None:
291+
next_backoff = self._next_override
292+
self._next_override = None
293+
else:
294+
next_backoff = next(self.subgenerator)
295+
self.history.append(next_backoff)
296+
return next_backoff
297+
298+
def get_attempt_backoff(self, attempt_idx) -> float:
299+
"""
300+
returns the backoff time for a specific attempt index, starting at 0.
301+
302+
Args:
303+
attempt_idx: the index of the attempt to return backoff for
304+
Raises:
305+
IndexError: if attempt_idx is negative, or not in history
306+
"""
307+
if attempt_idx < 0:
308+
raise IndexError("received negative attempt number")
309+
return self.history[attempt_idx]
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from google.cloud.bigtable.data._metrics.metrics_controller import (
15+
BigtableClientSideMetricsController,
16+
)
17+
18+
from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric
19+
from google.cloud.bigtable.data._metrics.data_model import ActiveAttemptMetric
20+
from google.cloud.bigtable.data._metrics.data_model import CompletedOperationMetric
21+
from google.cloud.bigtable.data._metrics.data_model import CompletedAttemptMetric
22+
from google.cloud.bigtable.data._metrics.data_model import OperationState
23+
from google.cloud.bigtable.data._metrics.data_model import OperationType
24+
from google.cloud.bigtable.data._metrics.tracked_retry import tracked_retry
25+
26+
__all__ = (
27+
"BigtableClientSideMetricsController",
28+
"OperationType",
29+
"OperationState",
30+
"ActiveOperationMetric",
31+
"ActiveAttemptMetric",
32+
"CompletedOperationMetric",
33+
"CompletedAttemptMetric",
34+
"tracked_retry",
35+
)

0 commit comments

Comments
 (0)