From 35599b2c5d05fc637328010f66e64c6bce841bb4 Mon Sep 17 00:00:00 2001 From: Vitaly Sergeyev Date: Wed, 2 Mar 2022 19:41:27 -0500 Subject: [PATCH 1/6] ability to get event timestamps from online response Signed-off-by: Vitaly Sergeyev --- sdk/python/feast/online_response.py | 27 +++++++++++- .../online_store/test_universal_online.py | 44 +++++++++++++++++++ 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/online_response.py b/sdk/python/feast/online_response.py index bb69c6b9d95..3398fff8416 100644 --- a/sdk/python/feast/online_response.py +++ b/sdk/python/feast/online_response.py @@ -23,7 +23,7 @@ class OnlineResponse: """ - Defines a online response in feast. + Defines an online response in feast. """ def __init__(self, online_response_proto: GetOnlineFeaturesResponse): @@ -66,3 +66,28 @@ def to_df(self) -> pd.DataFrame: """ return pd.DataFrame(self.to_dict()) + + def event_timestamps_dict(self) -> Dict[str, List[int]]: + """ + Converts GetOnlineFeaturesResponse feature event timestamps into a dictionary form. + Includes the entity id + """ + response: Dict[str, List[int]] = {} + + for result in self.proto.results: + for idx, feature_ref in enumerate(self.proto.metadata.feature_names.val): + # obtain the entity id + if idx == 0: + value = feast_value_type_to_python_type(result.values[idx]) + # feature timestamps + else: + value = result.event_timestamps[idx].seconds + + if feature_ref not in response: + response[feature_ref] = [value] + else: + response[feature_ref].append(value) + return response + + def event_timestamps_df(self) -> pd.DataFrame: + return pd.DataFrame(self.event_timestamps_dict()) diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 90a06bb3477..cc993d97c7a 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -322,6 +322,50 @@ def get_online_features_dict( return dict1 +@pytest.mark.integration +@pytest.mark.universal +@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v)) +def test_online_retrieval_timestamps(environment, universal_data_sources, full_feature_names): + fs = environment.feature_store + entities, datasets, data_sources = universal_data_sources + feature_views = construct_universal_feature_views(data_sources) + + fs.apply([driver(), feature_views["driver"], feature_views["global"]]) + + # fake data to ingest into Online Store + data = { + "driver_id": [1, 2], + "conv_rate": [0.5, 0.3], + "acc_rate": [0.6, 0.4], + "avg_daily_trips": [4, 5], + "event_timestamp": [pd.to_datetime(1646263500, utc=True, unit='s'), pd.to_datetime(1646263600, utc=True, unit='s')], + "created": [pd.to_datetime(1646263500, unit='s'), pd.to_datetime(1646263600, unit='s')], + } + df_ingest = pd.DataFrame(data) + + # directly ingest data into the Online Store + fs.write_to_online_store("driver_stats", df_ingest) + + response = fs.get_online_features( + features=[ + "driver_stats:avg_daily_trips", + "driver_stats:acc_rate", + "driver_stats:conv_rate", + ], + entity_rows=[{"driver": 1}, {"driver": 2}], + ) + df = response.event_timestamps_df() + assertpy.assert_that(len(df)).is_equal_to(2) + assertpy.assert_that(df["driver_id"].iloc[0]).is_equal_to(1) + assertpy.assert_that(df["driver_id"].iloc[1]).is_equal_to(2) + assertpy.assert_that(df["avg_daily_trips"].iloc[0]).is_equal_to(1646263500) + assertpy.assert_that(df["avg_daily_trips"].iloc[1]).is_equal_to(1646263600) + assertpy.assert_that(df["acc_rate"].iloc[0]).is_equal_to(1646263500) + assertpy.assert_that(df["acc_rate"].iloc[1]).is_equal_to(1646263600) + assertpy.assert_that(df["conv_rate"].iloc[0]).is_equal_to(1646263500) + assertpy.assert_that(df["conv_rate"].iloc[1]).is_equal_to(1646263600) + + @pytest.mark.integration @pytest.mark.universal @pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v)) From 44e8054f66c9378cab38c6fb2180939c121da1ce Mon Sep 17 00:00:00 2001 From: Vitaly Sergeyev Date: Wed, 2 Mar 2022 19:42:00 -0500 Subject: [PATCH 2/6] fix event timestamp bugs Signed-off-by: Vitaly Sergeyev --- sdk/python/feast/feature_store.py | 2 +- sdk/python/feast/infra/online_stores/redis.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index b47e6745c95..ed944804c54 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1508,10 +1508,10 @@ def _read_from_online_store( # Each row is a set of features for a given entity key. We only need to convert # the data to Protobuf once. - row_ts_proto = Timestamp() null_value = Value() read_row_protos = [] for read_row in read_rows: + row_ts_proto = Timestamp() row_ts, feature_data = read_row if row_ts is not None: row_ts_proto.FromDatetime(row_ts) diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 6225f2d1d1e..a2e8e27d807 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -27,6 +27,7 @@ Union, ) +import pytz from google.protobuf.timestamp_pb2 import Timestamp from pydantic import StrictStr from pydantic.typing import Literal @@ -302,5 +303,5 @@ def _get_features_for_entity( if not res: return None, None else: - timestamp = datetime.fromtimestamp(res_ts.seconds) + timestamp = datetime.fromtimestamp(res_ts.seconds, tz=pytz.utc) return timestamp, res From 4f41cd316b8a53b2115578f4d8f757a3a68cdfa6 Mon Sep 17 00:00:00 2001 From: Vitaly Sergeyev Date: Thu, 3 Mar 2022 15:14:03 -0500 Subject: [PATCH 3/6] python formatting Signed-off-by: Vitaly Sergeyev --- .../online_store/test_universal_online.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index cc993d97c7a..178e5a8207b 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -325,7 +325,9 @@ def get_online_features_dict( @pytest.mark.integration @pytest.mark.universal @pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v)) -def test_online_retrieval_timestamps(environment, universal_data_sources, full_feature_names): +def test_online_retrieval_timestamps( + environment, universal_data_sources, full_feature_names +): fs = environment.feature_store entities, datasets, data_sources = universal_data_sources feature_views = construct_universal_feature_views(data_sources) @@ -338,8 +340,14 @@ def test_online_retrieval_timestamps(environment, universal_data_sources, full_f "conv_rate": [0.5, 0.3], "acc_rate": [0.6, 0.4], "avg_daily_trips": [4, 5], - "event_timestamp": [pd.to_datetime(1646263500, utc=True, unit='s'), pd.to_datetime(1646263600, utc=True, unit='s')], - "created": [pd.to_datetime(1646263500, unit='s'), pd.to_datetime(1646263600, unit='s')], + "event_timestamp": [ + pd.to_datetime(1646263500, utc=True, unit="s"), + pd.to_datetime(1646263600, utc=True, unit="s"), + ], + "created": [ + pd.to_datetime(1646263500, unit="s"), + pd.to_datetime(1646263600, unit="s"), + ], } df_ingest = pd.DataFrame(data) From 6c8e8eb1277e5d5f1b5c2fc0065a535193b9407d Mon Sep 17 00:00:00 2001 From: Vitaly Sergeyev Date: Fri, 4 Mar 2022 17:50:39 -0500 Subject: [PATCH 4/6] optional param to retrieve event_timestamp in online_reponse Signed-off-by: Vitaly Sergeyev --- sdk/python/feast/online_response.py | 45 ++++++++----------- .../online_store/test_universal_online.py | 19 ++++---- 2 files changed, 28 insertions(+), 36 deletions(-) diff --git a/sdk/python/feast/online_response.py b/sdk/python/feast/online_response.py index 3398fff8416..1ad510164ef 100644 --- a/sdk/python/feast/online_response.py +++ b/sdk/python/feast/online_response.py @@ -20,6 +20,8 @@ from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesResponse from feast.type_map import feast_value_type_to_python_type +TIMESTAMP_POSTFIX: str = "__ts" + class OnlineResponse: """ @@ -44,9 +46,12 @@ def __init__(self, online_response_proto: GetOnlineFeaturesResponse): del result.event_timestamps[idx] break - def to_dict(self) -> Dict[str, Any]: + def to_dict(self, is_with_event_timestamps: bool = False) -> Dict[str, Any]: """ Converts GetOnlineFeaturesResponse features into a dictionary form. + + Args: + is_with_event_timestamps: bool Optionally include feature timestamps in the dictionary """ response: Dict[str, List[Any]] = {} @@ -58,36 +63,22 @@ def to_dict(self) -> Dict[str, Any]: else: response[feature_ref].append(native_type_value) + if is_with_event_timestamps: + event_ts = result.event_timestamps[idx].seconds + timestamp_ref = feature_ref + TIMESTAMP_POSTFIX + if timestamp_ref not in response: + response[timestamp_ref] = [event_ts] + else: + response[timestamp_ref].append(event_ts) + return response - def to_df(self) -> pd.DataFrame: + def to_df(self, is_with_event_timestamps: bool = False) -> pd.DataFrame: """ Converts GetOnlineFeaturesResponse features into Panda dataframe form. - """ - - return pd.DataFrame(self.to_dict()) - def event_timestamps_dict(self) -> Dict[str, List[int]]: - """ - Converts GetOnlineFeaturesResponse feature event timestamps into a dictionary form. - Includes the entity id + Args: + is_with_event_timestamps: bool Optionally include feature timestamps in the dataframe """ - response: Dict[str, List[int]] = {} - - for result in self.proto.results: - for idx, feature_ref in enumerate(self.proto.metadata.feature_names.val): - # obtain the entity id - if idx == 0: - value = feast_value_type_to_python_type(result.values[idx]) - # feature timestamps - else: - value = result.event_timestamps[idx].seconds - - if feature_ref not in response: - response[feature_ref] = [value] - else: - response[feature_ref].append(value) - return response - def event_timestamps_df(self) -> pd.DataFrame: - return pd.DataFrame(self.event_timestamps_dict()) + return pd.DataFrame(self.to_dict(is_with_event_timestamps)) diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 178e5a8207b..8ae15e4aab4 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -18,6 +18,7 @@ FeatureNameCollisionError, RequestDataNotFoundInEntityRowsException, ) +from feast.online_response import TIMESTAMP_POSTFIX from feast.wait import wait_retry_backoff from tests.integration.feature_repos.repo_configuration import ( Environment, @@ -325,14 +326,14 @@ def get_online_features_dict( @pytest.mark.integration @pytest.mark.universal @pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v)) -def test_online_retrieval_timestamps( +def test_online_retrieval_with_event_timestamps( environment, universal_data_sources, full_feature_names ): fs = environment.feature_store entities, datasets, data_sources = universal_data_sources feature_views = construct_universal_feature_views(data_sources) - fs.apply([driver(), feature_views["driver"], feature_views["global"]]) + fs.apply([driver(), feature_views.driver, feature_views.global_fv]) # fake data to ingest into Online Store data = { @@ -362,16 +363,16 @@ def test_online_retrieval_timestamps( ], entity_rows=[{"driver": 1}, {"driver": 2}], ) - df = response.event_timestamps_df() + df = response.to_df(True) assertpy.assert_that(len(df)).is_equal_to(2) assertpy.assert_that(df["driver_id"].iloc[0]).is_equal_to(1) assertpy.assert_that(df["driver_id"].iloc[1]).is_equal_to(2) - assertpy.assert_that(df["avg_daily_trips"].iloc[0]).is_equal_to(1646263500) - assertpy.assert_that(df["avg_daily_trips"].iloc[1]).is_equal_to(1646263600) - assertpy.assert_that(df["acc_rate"].iloc[0]).is_equal_to(1646263500) - assertpy.assert_that(df["acc_rate"].iloc[1]).is_equal_to(1646263600) - assertpy.assert_that(df["conv_rate"].iloc[0]).is_equal_to(1646263500) - assertpy.assert_that(df["conv_rate"].iloc[1]).is_equal_to(1646263600) + assertpy.assert_that(df["avg_daily_trips" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to(1646263500) + assertpy.assert_that(df["avg_daily_trips" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to(1646263600) + assertpy.assert_that(df["acc_rate" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to(1646263500) + assertpy.assert_that(df["acc_rate" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to(1646263600) + assertpy.assert_that(df["conv_rate" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to(1646263500) + assertpy.assert_that(df["conv_rate" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to(1646263600) @pytest.mark.integration From 4f2388fc1dbb12bf5aaf1c0ef7d34dc82507bd8e Mon Sep 17 00:00:00 2001 From: Vitaly Sergeyev Date: Fri, 4 Mar 2022 17:51:32 -0500 Subject: [PATCH 5/6] formatting Signed-off-by: Vitaly Sergeyev --- .../online_store/test_universal_online.py | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 8ae15e4aab4..bdbdfb35552 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -367,12 +367,24 @@ def test_online_retrieval_with_event_timestamps( assertpy.assert_that(len(df)).is_equal_to(2) assertpy.assert_that(df["driver_id"].iloc[0]).is_equal_to(1) assertpy.assert_that(df["driver_id"].iloc[1]).is_equal_to(2) - assertpy.assert_that(df["avg_daily_trips" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to(1646263500) - assertpy.assert_that(df["avg_daily_trips" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to(1646263600) - assertpy.assert_that(df["acc_rate" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to(1646263500) - assertpy.assert_that(df["acc_rate" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to(1646263600) - assertpy.assert_that(df["conv_rate" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to(1646263500) - assertpy.assert_that(df["conv_rate" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to(1646263600) + assertpy.assert_that(df["avg_daily_trips" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to( + 1646263500 + ) + assertpy.assert_that(df["avg_daily_trips" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to( + 1646263600 + ) + assertpy.assert_that(df["acc_rate" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to( + 1646263500 + ) + assertpy.assert_that(df["acc_rate" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to( + 1646263600 + ) + assertpy.assert_that(df["conv_rate" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to( + 1646263500 + ) + assertpy.assert_that(df["conv_rate" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to( + 1646263600 + ) @pytest.mark.integration From 1370aed82a024e69c95f28a18d122c0e1fcd503e Mon Sep 17 00:00:00 2001 From: Vitaly Sergeyev Date: Fri, 4 Mar 2022 18:25:20 -0500 Subject: [PATCH 6/6] renaming param Signed-off-by: Vitaly Sergeyev --- sdk/python/feast/online_response.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/online_response.py b/sdk/python/feast/online_response.py index 1ad510164ef..f01bd510be9 100644 --- a/sdk/python/feast/online_response.py +++ b/sdk/python/feast/online_response.py @@ -46,7 +46,7 @@ def __init__(self, online_response_proto: GetOnlineFeaturesResponse): del result.event_timestamps[idx] break - def to_dict(self, is_with_event_timestamps: bool = False) -> Dict[str, Any]: + def to_dict(self, include_event_timestamps: bool = False) -> Dict[str, Any]: """ Converts GetOnlineFeaturesResponse features into a dictionary form. @@ -63,7 +63,7 @@ def to_dict(self, is_with_event_timestamps: bool = False) -> Dict[str, Any]: else: response[feature_ref].append(native_type_value) - if is_with_event_timestamps: + if include_event_timestamps: event_ts = result.event_timestamps[idx].seconds timestamp_ref = feature_ref + TIMESTAMP_POSTFIX if timestamp_ref not in response: @@ -73,7 +73,7 @@ def to_dict(self, is_with_event_timestamps: bool = False) -> Dict[str, Any]: return response - def to_df(self, is_with_event_timestamps: bool = False) -> pd.DataFrame: + def to_df(self, include_event_timestamps: bool = False) -> pd.DataFrame: """ Converts GetOnlineFeaturesResponse features into Panda dataframe form. @@ -81,4 +81,4 @@ def to_df(self, is_with_event_timestamps: bool = False) -> pd.DataFrame: is_with_event_timestamps: bool Optionally include feature timestamps in the dataframe """ - return pd.DataFrame(self.to_dict(is_with_event_timestamps)) + return pd.DataFrame(self.to_dict(include_event_timestamps))