From 5bc7136b3889f9fb291f59f27258dba83092de61 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 13 Apr 2022 14:59:08 -0700 Subject: [PATCH 1/3] Allow local feature server to use Go feature server if enabled Signed-off-by: Felix Wang --- sdk/python/feast/feature_server.py | 4 +-- sdk/python/feast/feature_store.py | 36 +++++++++---------- .../feature_repos/repo_configuration.py | 5 +++ 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 9e3ec42177d..f80d03dbcdb 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -52,8 +52,8 @@ def get_online_features(body=Depends(get_body)): raise HTTPException(status_code=500, detail="Uneven number of columns") response_proto = store._get_online_features( - features, - request_proto.entities, + features=features, + entity_values=request_proto.entities, full_feature_names=full_feature_names, native_entity_values=False, ).proto diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 63db1514129..872b664fafa 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1284,7 +1284,23 @@ def get_online_features( except KeyError as e: raise ValueError("All entity_rows must have the same keys.") from e - # If Go feature server is enabled, send request to it instead of going through a regular Python logic + return self._get_online_features( + features=features, + entity_values=columnar, + full_feature_names=full_feature_names, + native_entity_values=True, + ) + + def _get_online_features( + self, + features: Union[List[str], FeatureService], + entity_values: Mapping[ + str, Union[Sequence[Any], Sequence[Value], RepeatedValue] + ], + full_feature_names: bool = False, + native_entity_values: bool = True, + ): + # If Go feature server is enabled, send request to it instead of going through regular Python logic if self.config.go_feature_server: from feast.embedded_go.online_features_service import ( EmbeddedOnlineFeatureServer, @@ -1301,27 +1317,11 @@ def get_online_features( feature_service=features if isinstance(features, FeatureService) else None, - entities=columnar, + entities=entity_values, request_data={}, # TODO: add request data parameter to public API full_feature_names=full_feature_names, ) - return self._get_online_features( - features=features, - entity_values=columnar, - full_feature_names=full_feature_names, - native_entity_values=True, - ) - - def _get_online_features( - self, - features: Union[List[str], FeatureService], - entity_values: Mapping[ - str, Union[Sequence[Any], Sequence[Value], RepeatedValue] - ], - full_feature_names: bool = False, - native_entity_values: bool = True, - ): _feature_refs = self._get_features(features, allow_cache=True) ( requested_feature_views, diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index a99ba959537..4da02bcacbe 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -118,6 +118,11 @@ IntegrationTestRepoConfig( online_store=REDIS_CONFIG, go_feature_server=True, ), + IntegrationTestRepoConfig( + online_store=REDIS_CONFIG, + python_feature_server=True, + go_feature_server=True, + ), ] ) full_repo_configs_module = os.environ.get(FULL_REPO_CONFIGS_MODULE_ENV_NAME) From 772a8e0d2e1b44bd0c43ee3c3ba3befdfc64cb4a Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 13 Apr 2022 16:05:12 -0700 Subject: [PATCH 2/3] Initialize _go_server correctly Signed-off-by: Felix Wang --- sdk/python/feast/feature_store.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 872b664fafa..c873f77fd9e 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -135,6 +135,7 @@ def __init__( self._registry = Registry(registry_config, repo_path=self.repo_path) self._registry._initialize_registry() self._provider = get_provider(self.config, self.repo_path) + self._go_server = None @log_exceptions def version(self) -> str: From 7ea615a34c50566e7674b532014d6a4b342386dc Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 13 Apr 2022 18:33:03 -0700 Subject: [PATCH 3/3] Convert proto values to native values to support Go feature server Signed-off-by: Felix Wang --- sdk/python/feast/feature_store.py | 34 +++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index c873f77fd9e..1aa4cef602a 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -82,7 +82,10 @@ from feast.repo_contents import RepoContents from feast.request_feature_view import RequestFeatureView from feast.saved_dataset import SavedDataset, SavedDatasetStorage -from feast.type_map import python_values_to_proto_values +from feast.type_map import ( + feast_value_type_to_python_type, + python_values_to_proto_values, +) from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute from feast.value_type import ValueType from feast.version import get_version @@ -1301,6 +1304,12 @@ def _get_online_features( full_feature_names: bool = False, native_entity_values: bool = True, ): + # Extract Sequence from RepeatedValue Protobuf. + entity_value_lists: Dict[str, Union[List[Any], List[Value]]] = { + k: list(v) if isinstance(v, Sequence) else list(v.val) + for k, v in entity_values.items() + } + # If Go feature server is enabled, send request to it instead of going through regular Python logic if self.config.go_feature_server: from feast.embedded_go.online_features_service import ( @@ -1313,12 +1322,27 @@ def _get_online_features( str(self.repo_path.absolute()), self.config, self ) + entity_native_values: Dict[str, List[Any]] + if not native_entity_values: + # Convert proto types to native types since Go feature server currently + # only handles native types. + # TODO(felixwang9817): Remove this logic once native types are supported. + entity_native_values = { + k: [ + feast_value_type_to_python_type(proto_value) + for proto_value in v + ] + for k, v in entity_value_lists.items() + } + else: + entity_native_values = entity_value_lists + return self._go_server.get_online_features( features_refs=features if isinstance(features, list) else [], feature_service=features if isinstance(features, FeatureService) else None, - entities=entity_values, + entities=entity_native_values, request_data={}, # TODO: add request data parameter to public API full_feature_names=full_feature_names, ) @@ -1345,12 +1369,6 @@ def _get_online_features( join_keys_set, ) = self._get_entity_maps(requested_feature_views) - # Extract Sequence from RepeatedValue Protobuf. - entity_value_lists: Dict[str, Union[List[Any], List[Value]]] = { - k: list(v) if isinstance(v, Sequence) else list(v.val) - for k, v in entity_values.items() - } - entity_proto_values: Dict[str, List[Value]] if native_entity_values: # Convert values to Protobuf once.