diff --git a/sdk/python/feast/base_feature_view.py b/sdk/python/feast/base_feature_view.py index 5feb1d7d893..01a4b397047 100644 --- a/sdk/python/feast/base_feature_view.py +++ b/sdk/python/feast/base_feature_view.py @@ -117,10 +117,16 @@ def __getitem__(self, item): cp = self.__copy__() if self.features: + feature_name_to_feature = { + feature.name: feature for feature in self.features + } referenced_features = [] - for feature in self.features: - if feature.name in item: - referenced_features.append(feature) + for feature in item: + if feature not in feature_name_to_feature: + raise ValueError( + f"Feature {feature} does not exist in this feature view." + ) + referenced_features.append(feature_name_to_feature[feature]) cp.projection.features = referenced_features else: cp.projection.desired_features = item diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index f8a288940a5..a13f1a387fe 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -310,6 +310,13 @@ def __init__(self, expected_column_name: str): ) +class FeatureViewMissingDuringFeatureServiceInference(Exception): + def __init__(self, feature_view_name: str, feature_service_name: str): + super().__init__( + f"Missing {feature_view_name} feature view during inference for {feature_service_name} feature service." + ) + + class InvalidEntityType(Exception): def __init__(self, entity_type: type): super().__init__( diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 7c946b1d0bc..c3037a55da2 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -5,6 +5,7 @@ from typeguard import typechecked from feast.base_feature_view import BaseFeatureView +from feast.errors import FeatureViewMissingDuringFeatureServiceInference from feast.feature_logging import LoggingConfig from feast.feature_view import FeatureView from feast.feature_view_projection import FeatureViewProjection @@ -85,15 +86,29 @@ def __init__( if isinstance(feature_grouping, BaseFeatureView): self.feature_view_projections.append(feature_grouping.projection) - def infer_features(self, fvs_to_update: Optional[Dict[str, FeatureView]] = None): + def infer_features(self, fvs_to_update: Dict[str, FeatureView]): + """ + Infers the features for the projections of this feature service, and updates this feature + service in place. + + This method is necessary since feature services may rely on feature views which require + feature inference. + + Args: + fvs_to_update: A mapping of feature view names to corresponding feature views that + contains all the feature views necessary to run inference. + """ for feature_grouping in self._features: if isinstance(feature_grouping, BaseFeatureView): - # For feature services that depend on an unspecified feature view, apply inferred schema - if fvs_to_update and feature_grouping.name in fvs_to_update: - if feature_grouping.projection.desired_features: - desired_features = set( - feature_grouping.projection.desired_features - ) + projection = feature_grouping.projection + + if projection.desired_features: + # The projection wants to select a specific set of inferred features. + # Example: FeatureService(features=[fv[["inferred_feature"]]]), where + # 'fv' is a feature view that was defined without a schema. + if feature_grouping.name in fvs_to_update: + # First we validate that the selected features have actually been inferred. + desired_features = set(projection.desired_features) actual_features = set( [ f.name @@ -101,16 +116,38 @@ def infer_features(self, fvs_to_update: Optional[Dict[str, FeatureView]] = None) ] ) assert desired_features.issubset(actual_features) - # We need to set the features for the projection at this point so we ensure we're starting with - # an empty list. - feature_grouping.projection.features = [] + + # Then we extract the selected features and add them to the projection. + projection.features = [] for f in fvs_to_update[feature_grouping.name].features: if f.name in desired_features: - feature_grouping.projection.features.append(f) + projection.features.append(f) else: - feature_grouping.projection.features = fvs_to_update[ - feature_grouping.name - ].features + raise FeatureViewMissingDuringFeatureServiceInference( + feature_view_name=feature_grouping.name, + feature_service_name=self.name, + ) + + continue + + if projection.features: + # The projection has already selected features from a feature view with a + # known schema, so no action needs to be taken. + # Example: FeatureService(features=[fv[["existing_feature"]]]), where + # 'existing_feature' was defined as part of the schema of 'fv'. + # Example: FeatureService(features=[fv]), where 'fv' was defined with a schema. + continue + + # The projection wants to select all possible inferred features. + # Example: FeatureService(features=[fv]), where 'fv' is a feature view that + # was defined without a schema. + if feature_grouping.name in fvs_to_update: + projection.features = fvs_to_update[feature_grouping.name].features + else: + raise FeatureViewMissingDuringFeatureServiceInference( + feature_view_name=feature_grouping.name, + feature_service_name=self.name, + ) else: raise ValueError( f"The feature service {self.name} has been provided with an invalid type " diff --git a/sdk/python/feast/feature_view_projection.py b/sdk/python/feast/feature_view_projection.py index a862e5f08d9..2960996a10c 100644 --- a/sdk/python/feast/feature_view_projection.py +++ b/sdk/python/feast/feature_view_projection.py @@ -21,6 +21,10 @@ class FeatureViewProjection: name: The unique name of the feature view from which this projection is created. name_alias: An optional alias for the name. features: The list of features represented by the feature view projection. + desired_features: The list of features that this feature view projection intends to select. + If empty, the projection intends to select all features. This attribute is only used + for feature service inference. It should only be set if the underlying feature view + is not ready to be projected, i.e. still needs to go through feature inference. join_key_map: A map to modify join key columns during retrieval of this feature view projection. """ diff --git a/sdk/python/tests/doctest/__init__.py b/sdk/python/tests/doctest/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_feature_service_2.py b/sdk/python/tests/example_repos/example_feature_repo_with_feature_service_2.py new file mode 100644 index 00000000000..3547c3de86a --- /dev/null +++ b/sdk/python/tests/example_repos/example_feature_repo_with_feature_service_2.py @@ -0,0 +1,63 @@ +from datetime import timedelta + +from feast import Entity, FeatureService, FeatureView, Field, FileSource +from feast.types import Float32, Int32, Int64 + +driver_hourly_stats = FileSource( + path="data/driver_stats.parquet", # Fake path + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +driver = Entity( + name="driver_id", +) + +driver_hourly_stats_view = FeatureView( + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=1), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + Field(name="driver_id", dtype=Int32), + ], + online=True, + source=driver_hourly_stats, + tags={}, +) + +global_daily_stats = FileSource( + path="data/global_stats.parquet", # Fake path + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +global_stats_feature_view = FeatureView( + name="global_daily_stats", + entities=[], + ttl=timedelta(days=1), + schema=[ + Field(name="num_rides", dtype=Int32), + Field(name="avg_ride_length", dtype=Float32), + ], + online=True, + source=global_daily_stats, + tags={}, +) + +all_stats_service = FeatureService( + name="all_stats", + features=[driver_hourly_stats_view, global_stats_feature_view], + tags={"release": "production"}, +) + +some_stats_service = FeatureService( + name="some_stats", + features=[ + driver_hourly_stats_view[["conv_rate"]], + global_stats_feature_view[["num_rides"]], + ], + tags={"release": "production"}, +) diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_feature_service_3.py b/sdk/python/tests/example_repos/example_feature_repo_with_feature_service_3.py new file mode 100644 index 00000000000..c16a5d4abc3 --- /dev/null +++ b/sdk/python/tests/example_repos/example_feature_repo_with_feature_service_3.py @@ -0,0 +1,52 @@ +from datetime import timedelta + +from feast import Entity, FeatureService, FeatureView, FileSource + +driver_hourly_stats = FileSource( + path="%PARQUET_PATH%", # placeholder to be replaced by the test + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +driver = Entity( + name="driver_id", +) + +driver_hourly_stats_view = FeatureView( + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=1), + online=True, + source=driver_hourly_stats, + tags={}, +) + +global_daily_stats = FileSource( + path="%PARQUET_PATH_GLOBAL%", # placeholder to be replaced by the test + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +global_stats_feature_view = FeatureView( + name="global_daily_stats", + entities=[], + ttl=timedelta(days=1), + online=True, + source=global_daily_stats, + tags={}, +) + +all_stats_service = FeatureService( + name="all_stats", + features=[driver_hourly_stats_view, global_stats_feature_view], + tags={"release": "production"}, +) + +some_stats_service = FeatureService( + name="some_stats", + features=[ + driver_hourly_stats_view[["conv_rate"]], + global_stats_feature_view[["num_rides"]], + ], + tags={"release": "production"}, +) diff --git a/sdk/python/tests/integration/e2e/__init__.py b/sdk/python/tests/integration/e2e/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/tests/integration/materialization/__init__.py b/sdk/python/tests/integration/materialization/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/tests/integration/offline_store/__init__.py b/sdk/python/tests/integration/offline_store/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/tests/integration/registration/__init__.py b/sdk/python/tests/integration/registration/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/tests/integration/scaffolding/__init__.py b/sdk/python/tests/integration/scaffolding/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/tests/unit/cli/__init__.py b/sdk/python/tests/unit/cli/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/tests/unit/diff/__init__.py b/sdk/python/tests/unit/diff/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/tests/unit/infra/__init__.py b/sdk/python/tests/unit/infra/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/tests/unit/infra/online_store/__init__.py b/sdk/python/tests/unit/infra/online_store/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/tests/unit/infra/scaffolding/__init__.py b/sdk/python/tests/unit/infra/scaffolding/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/tests/unit/infra/test_inference_unit_tests.py b/sdk/python/tests/unit/infra/test_inference_unit_tests.py index 68b6ee70f66..c5ed83c12f0 100644 --- a/sdk/python/tests/unit/infra/test_inference_unit_tests.py +++ b/sdk/python/tests/unit/infra/test_inference_unit_tests.py @@ -294,6 +294,10 @@ def test_feature_view_inference_on_feature_columns(simple_dataset_1): def test_update_feature_services_with_inferred_features(simple_dataset_1): + """ + Tests that a feature service that references feature views without specified features will + be updated with the correct projections after feature inference. + """ with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: entity1 = Entity(name="test1", join_keys=["id_join_key"]) feature_view_1 = FeatureView( @@ -338,4 +342,60 @@ def test_update_feature_services_with_inferred_features(simple_dataset_1): assert len(feature_service.feature_view_projections[1].features) == 3 +def test_update_feature_services_with_specified_features(simple_dataset_1): + """ + Tests that a feature service that references feature views with specified features will + have the correct projections both before and after feature inference. + """ + with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: + entity1 = Entity(name="test1", join_keys=["id_join_key"]) + feature_view_1 = FeatureView( + name="test1", + entities=[entity1], + schema=[ + Field(name="float_col", dtype=Float32), + Field(name="id_join_key", dtype=Int64), + ], + source=file_source, + ) + feature_view_2 = FeatureView( + name="test2", + entities=[entity1], + schema=[ + Field(name="int64_col", dtype=Int64), + Field(name="id_join_key", dtype=Int64), + ], + source=file_source, + ) + + feature_service = FeatureService( + name="fs_1", features=[feature_view_1[["float_col"]], feature_view_2] + ) + assert len(feature_service.feature_view_projections) == 2 + assert len(feature_service.feature_view_projections[0].features) == 1 + assert len(feature_service.feature_view_projections[0].desired_features) == 0 + assert len(feature_service.feature_view_projections[1].features) == 1 + assert len(feature_service.feature_view_projections[1].desired_features) == 0 + + update_feature_views_with_inferred_features_and_entities( + [feature_view_1, feature_view_2], + [entity1], + RepoConfig( + provider="local", project="test", entity_key_serialization_version=2 + ), + ) + assert len(feature_view_1.features) == 1 + assert len(feature_view_2.features) == 1 + + feature_service.infer_features( + fvs_to_update={ + feature_view_1.name: feature_view_1, + feature_view_2.name: feature_view_2, + } + ) + + assert len(feature_service.feature_view_projections[0].features) == 1 + assert len(feature_service.feature_view_projections[1].features) == 1 + + # TODO(felixwang9817): Add tests that interact with field mapping. diff --git a/sdk/python/tests/unit/local_feast_tests/__init__.py b/sdk/python/tests/unit/local_feast_tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/tests/unit/local_feast_tests/test_feature_service.py b/sdk/python/tests/unit/local_feast_tests/test_feature_service.py new file mode 100644 index 00000000000..82c1dd2a1d9 --- /dev/null +++ b/sdk/python/tests/unit/local_feast_tests/test_feature_service.py @@ -0,0 +1,96 @@ +import os +import tempfile +from datetime import datetime, timedelta + +from feast.driver_test_data import ( + create_driver_hourly_stats_df, + create_global_daily_stats_df, +) +from tests.utils.basic_read_write_test import basic_rw_test +from tests.utils.cli_repo_creator import CliRunner, get_example_repo + + +def test_apply_without_fv_inference() -> None: + """ + Tests that feature services based on feature views that do not require inference can be applied correctly. + """ + runner = CliRunner() + with runner.local_repo( + get_example_repo("example_feature_repo_with_feature_service_2.py"), "file" + ) as store: + assert len(store.list_feature_services()) == 2 + + fs = store.get_feature_service("all_stats") + assert len(fs.feature_view_projections) == 2 + assert len(fs.feature_view_projections[0].features) == 3 + assert len(fs.feature_view_projections[0].desired_features) == 0 + assert len(fs.feature_view_projections[1].features) == 2 + assert len(fs.feature_view_projections[1].desired_features) == 0 + assert len(fs.tags) == 1 + assert fs.tags["release"] == "production" + + fs = store.get_feature_service("some_stats") + assert len(fs.feature_view_projections) == 2 + assert len(fs.feature_view_projections[0].features) == 1 + assert len(fs.feature_view_projections[0].desired_features) == 0 + assert len(fs.feature_view_projections[0].features) == 1 + assert len(fs.feature_view_projections[0].desired_features) == 0 + + +def test_apply_with_fv_inference() -> None: + """ + Tests that feature services based on feature views that require inference can be applied correctly. + """ + runner = CliRunner() + with tempfile.TemporaryDirectory() as data_dir: + # Generate test data. + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + + driver_entities = [1001, 1002, 1003, 1004, 1005] + driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date) + driver_stats_path = os.path.join(data_dir, "driver_stats.parquet") + driver_df.to_parquet(path=driver_stats_path, allow_truncated_timestamps=True) + + global_df = create_global_daily_stats_df(start_date, end_date) + global_stats_path = os.path.join(data_dir, "global_stats.parquet") + global_df.to_parquet(path=global_stats_path, allow_truncated_timestamps=True) + + with runner.local_repo( + get_example_repo("example_feature_repo_with_feature_service_3.py") + .replace("%PARQUET_PATH%", driver_stats_path) + .replace("%PARQUET_PATH_GLOBAL%", global_stats_path), + "file", + ) as store: + assert len(store.list_feature_services()) == 2 + + fs = store.get_feature_service("all_stats") + assert len(fs.feature_view_projections) == 2 + assert len(fs.feature_view_projections[0].features) == 3 + assert len(fs.feature_view_projections[0].desired_features) == 0 + assert len(fs.feature_view_projections[1].features) == 2 + assert len(fs.feature_view_projections[1].desired_features) == 0 + assert len(fs.tags) == 1 + assert fs.tags["release"] == "production" + + fs = store.get_feature_service("some_stats") + assert len(fs.feature_view_projections) == 2 + assert len(fs.feature_view_projections[0].features) == 1 + assert len(fs.feature_view_projections[0].desired_features) == 0 + assert len(fs.feature_view_projections[0].features) == 1 + assert len(fs.feature_view_projections[0].desired_features) == 0 + + +def test_read() -> None: + """ + Test that feature values are correctly read through a feature service. + """ + runner = CliRunner() + with runner.local_repo( + get_example_repo("example_feature_repo_with_feature_service.py"), "file" + ) as store: + basic_rw_test( + store, + view_name="driver_locations", + feature_service_name="driver_locations_service", + ) diff --git a/sdk/python/tests/unit/local_feast_tests/test_feature_service_apply.py b/sdk/python/tests/unit/local_feast_tests/test_feature_service_apply.py deleted file mode 100644 index dc642a6e3c2..00000000000 --- a/sdk/python/tests/unit/local_feast_tests/test_feature_service_apply.py +++ /dev/null @@ -1,25 +0,0 @@ -from feast.feature_service import FeatureService -from tests.utils.cli_repo_creator import CliRunner, get_example_repo - - -def test_read_pre_applied() -> None: - """ - Read feature values from the FeatureStore using a FeatureService. - """ - runner = CliRunner() - with runner.local_repo( - get_example_repo("example_feature_repo_with_feature_service.py"), "file" - ) as store: - assert len(store.list_feature_services()) == 1 - fs = store.get_feature_service("driver_locations_service") - assert len(fs.tags) == 1 - assert fs.tags["release"] == "production" - - fv = store.get_feature_view("driver_locations") - - fs = FeatureService(name="new_feature_service", features=[fv[["lon"]]]) - - store.apply([fs]) - - assert len(store.list_feature_services()) == 2 - store.get_feature_service("new_feature_service") diff --git a/sdk/python/tests/unit/local_feast_tests/test_feature_service_read.py b/sdk/python/tests/unit/local_feast_tests/test_feature_service_read.py deleted file mode 100644 index 2b5b311dc92..00000000000 --- a/sdk/python/tests/unit/local_feast_tests/test_feature_service_read.py +++ /dev/null @@ -1,17 +0,0 @@ -from tests.utils.basic_read_write_test import basic_rw_test -from tests.utils.cli_repo_creator import CliRunner, get_example_repo - - -def test_feature_service_read() -> None: - """ - Read feature values from the FeatureStore using a FeatureService. - """ - runner = CliRunner() - with runner.local_repo( - get_example_repo("example_feature_repo_with_feature_service.py"), "file" - ) as store: - basic_rw_test( - store, - view_name="driver_locations", - feature_service_name="driver_locations_service", - )