From 761e9aa1f49f9a69dc47f9c67fe2c1aaeb4cea37 Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Thu, 1 Dec 2022 19:52:15 -0500 Subject: [PATCH 1/2] fix: Fix memory leak from usage.py not properly cleaning up call stack context Signed-off-by: Danny Chiao --- sdk/python/feast/usage.py | 41 +++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/sdk/python/feast/usage.py b/sdk/python/feast/usage.py index 0965e709998..43f5bd642fa 100644 --- a/sdk/python/feast/usage.py +++ b/sdk/python/feast/usage.py @@ -78,7 +78,7 @@ class FnCall: class Sampler: - def should_record(self, event) -> bool: + def should_record(self) -> bool: raise NotImplementedError @property @@ -87,7 +87,7 @@ def priority(self): class AlwaysSampler(Sampler): - def should_record(self, event) -> bool: + def should_record(self) -> bool: return True @@ -100,7 +100,7 @@ def __init__(self, ratio): self.total_counter = 0 self.sampled_counter = 0 - def should_record(self, event) -> bool: + def should_record(self) -> bool: self.total_counter += 1 if self.total_counter == self.MAX_COUNTER: self.total_counter = 1 @@ -176,7 +176,7 @@ def _set_installation_id(): def _export(event: typing.Dict[str, typing.Any]): - _executor.submit(requests.post, USAGE_ENDPOINT, json=event, timeout=30) + _executor.submit(requests.post, USAGE_ENDPOINT, json=event, timeout=2) def _produce_event(ctx: UsageContext): @@ -204,10 +204,6 @@ def _produce_event(ctx: UsageContext): **_constant_attributes, } event.update(ctx.attributes) - - if ctx.sampler and not ctx.sampler.should_record(event): - return - _export(event) @@ -262,6 +258,13 @@ def deeply_nested(...): """ sampler = attrs.pop("sampler", AlwaysSampler()) + def clear_context(ctx): + _context.set(UsageContext()) # reset context to default values + # TODO: Figure out why without this, new contexts.get aren't reset + ctx.call_stack = [] + ctx.completed_calls = [] + ctx.attributes = {} + def decorator(func): if not _is_enabled: return func @@ -295,17 +298,25 @@ def wrapper(*args, **kwargs): raise exc finally: - last_call = ctx.call_stack.pop(-1) - last_call.end = datetime.utcnow() - ctx.completed_calls.append(last_call) ctx.sampler = ( sampler if sampler.priority > ctx.sampler.priority else ctx.sampler ) - if not ctx.call_stack: - # we reached the root of the stack - _context.set(UsageContext()) # reset context to default values - _produce_event(ctx) + if not ctx.sampler.should_record(): + clear_context(ctx) + else: + last_call = ctx.call_stack.pop(-1) + last_call.end = datetime.utcnow() + ctx.completed_calls.append(last_call) + if not ctx.call_stack or ( + len(ctx.call_stack) == 1 + and "feast.feature_store.FeatureStore.serve" + in str(ctx.call_stack[0].fn_name) + ): + # When running `feast serve`, the serve method never exits so it gets + # stuck otherwise + _produce_event(ctx) + clear_context(ctx) return wrapper From 575e78197a9dc85262bf8ce5e75da07110235895 Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Thu, 1 Dec 2022 20:37:13 -0500 Subject: [PATCH 2/2] fix Signed-off-by: Danny Chiao --- sdk/python/feast/usage.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/sdk/python/feast/usage.py b/sdk/python/feast/usage.py index 43f5bd642fa..18bb497182c 100644 --- a/sdk/python/feast/usage.py +++ b/sdk/python/feast/usage.py @@ -180,6 +180,8 @@ def _export(event: typing.Dict[str, typing.Any]): def _produce_event(ctx: UsageContext): + if ctx.sampler and not ctx.sampler.should_record(): + return # Cannot check for unittest because typeguard pulls in unittest is_test = flags_helper.is_test() or bool({"pytest"} & sys.modules.keys()) event = { @@ -301,22 +303,19 @@ def wrapper(*args, **kwargs): ctx.sampler = ( sampler if sampler.priority > ctx.sampler.priority else ctx.sampler ) - - if not ctx.sampler.should_record(): + last_call = ctx.call_stack.pop(-1) + last_call.end = datetime.utcnow() + ctx.completed_calls.append(last_call) + + if not ctx.call_stack or ( + len(ctx.call_stack) == 1 + and "feast.feature_store.FeatureStore.serve" + in str(ctx.call_stack[0].fn_name) + ): + # When running `feast serve`, the serve method never exits so it gets + # stuck otherwise + _produce_event(ctx) clear_context(ctx) - else: - last_call = ctx.call_stack.pop(-1) - last_call.end = datetime.utcnow() - ctx.completed_calls.append(last_call) - if not ctx.call_stack or ( - len(ctx.call_stack) == 1 - and "feast.feature_store.FeatureStore.serve" - in str(ctx.call_stack[0].fn_name) - ): - # When running `feast serve`, the serve method never exits so it gets - # stuck otherwise - _produce_event(ctx) - clear_context(ctx) return wrapper