From 5a5278d134ecea94474c316c1c06251b67285f8e Mon Sep 17 00:00:00 2001 From: tokoko Date: Mon, 12 Feb 2024 05:18:00 +0000 Subject: [PATCH 1/4] decouple transformation from odfvs Signed-off-by: tokoko --- protos/feast/core/OnDemandFeatureView.proto | 4 +- .../feast/infra/registry/base_registry.py | 2 +- sdk/python/feast/on_demand_feature_view.py | 69 +++++++++---------- .../feast/on_demand_pandas_transformation.py | 56 +++++++++++++++ .../feature_repos/universal/feature_views.py | 11 +-- .../tests/unit/test_on_demand_feature_view.py | 37 +++++++--- 6 files changed, 129 insertions(+), 50 deletions(-) create mode 100644 sdk/python/feast/on_demand_pandas_transformation.py diff --git a/protos/feast/core/OnDemandFeatureView.proto b/protos/feast/core/OnDemandFeatureView.proto index 50bf8b6f557..741d46e39ec 100644 --- a/protos/feast/core/OnDemandFeatureView.proto +++ b/protos/feast/core/OnDemandFeatureView.proto @@ -48,7 +48,9 @@ message OnDemandFeatureViewSpec { // Map of sources for this feature view. map sources = 4; - UserDefinedFunction user_defined_function = 5; + oneof transformation { + UserDefinedFunction user_defined_function = 5; + } // Description of the on demand feature view. string description = 6; diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index f89b0794788..e1caa083c74 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -629,7 +629,7 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]: odfv_dict["spec"]["userDefinedFunction"][ "body" - ] = on_demand_feature_view.udf_string + ] = on_demand_feature_view.transformation.udf_string registry_dict["onDemandFeatureViews"].append(odfv_dict) for request_feature_view in sorted( self.list_request_feature_views(project=project), diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index fcafeaa2bc1..41d7cb817a2 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -2,7 +2,6 @@ import functools import warnings from datetime import datetime -from types import FunctionType from typing import Any, Dict, List, Optional, Type, Union import dill @@ -16,6 +15,7 @@ from feast.feature_view import FeatureView from feast.feature_view_projection import FeatureViewProjection from feast.field import Field, from_value_type +from feast.on_demand_pandas_transformation import OnDemandPandasTransformation from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( OnDemandFeatureView as OnDemandFeatureViewProto, ) @@ -24,9 +24,6 @@ OnDemandFeatureViewSpec, OnDemandSource, ) -from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( - UserDefinedFunction as UserDefinedFunctionProto, -) from feast.type_map import ( feast_value_type_to_pandas_type, python_type_to_feast_value_type, @@ -51,8 +48,7 @@ class OnDemandFeatureView(BaseFeatureView): sources with type FeatureViewProjection. source_request_sources: A map from input source names to the actual input sources with type RequestSource. - udf: The user defined transformation function, which must take pandas dataframes - as inputs. + transformation: The user defined transformation. description: A human-readable description. tags: A dictionary of key-value pairs to store arbitrary metadata. owner: The owner of the on demand feature view, typically the email of the primary @@ -63,8 +59,7 @@ class OnDemandFeatureView(BaseFeatureView): features: List[Field] source_feature_view_projections: Dict[str, FeatureViewProjection] source_request_sources: Dict[str, RequestSource] - udf: FunctionType - udf_string: str + transformation: Union[OnDemandPandasTransformation] description: str tags: Dict[str, str] owner: str @@ -82,8 +77,7 @@ def __init__( # noqa: C901 FeatureViewProjection, ] ], - udf: FunctionType, - udf_string: str = "", + transformation: Union[OnDemandPandasTransformation], description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", @@ -98,9 +92,7 @@ def __init__( # noqa: C901 sources: A map from input source names to the actual input sources, which may be feature views, or request data sources. These sources serve as inputs to the udf, which will refer to them by name. - udf: The user defined transformation function, which must take pandas - dataframes as inputs. - udf_string: The source code version of the udf (for diffing and displaying in Web UI) + transformation: The user defined transformation. description (optional): A human-readable description. tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the on demand feature view, typically the email @@ -126,8 +118,7 @@ def __init__( # noqa: C901 odfv_source.name ] = odfv_source.projection - self.udf = udf # type: ignore - self.udf_string = udf_string + self.transformation = transformation @property def proto_class(self) -> Type[OnDemandFeatureViewProto]: @@ -139,8 +130,7 @@ def __copy__(self): schema=self.features, sources=list(self.source_feature_view_projections.values()) + list(self.source_request_sources.values()), - udf=self.udf, - udf_string=self.udf_string, + transformation=self.transformation, description=self.description, tags=self.tags, owner=self.owner, @@ -161,8 +151,7 @@ def __eq__(self, other): self.source_feature_view_projections != other.source_feature_view_projections or self.source_request_sources != other.source_request_sources - or self.udf_string != other.udf_string - or self.udf.__code__.co_code != other.udf.__code__.co_code + or self.transformation != other.transformation ): return False @@ -200,11 +189,9 @@ def to_proto(self) -> OnDemandFeatureViewProto: name=self.name, features=[feature.to_proto() for feature in self.features], sources=sources, - user_defined_function=UserDefinedFunctionProto( - name=self.udf.__name__, - body=dill.dumps(self.udf, recurse=True), - body_text=self.udf_string, - ), + user_defined_function=self.transformation.to_proto() + if type(self.transformation) == OnDemandPandasTransformation + else None, description=self.description, tags=self.tags, owner=self.owner, @@ -243,6 +230,16 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): RequestSource.from_proto(on_demand_source.request_data_source) ) + if ( + on_demand_feature_view_proto.spec.WhichOneof("transformation") + == "user_defined_function" + ): + transformation = OnDemandPandasTransformation.from_proto( + on_demand_feature_view_proto.spec.user_defined_function + ) + else: + raise Exception("At least one transformation type needs to be provided") + on_demand_feature_view_obj = cls( name=on_demand_feature_view_proto.spec.name, schema=[ @@ -253,10 +250,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): for feature in on_demand_feature_view_proto.spec.features ], sources=sources, - udf=dill.loads( - on_demand_feature_view_proto.spec.user_defined_function.body - ), - udf_string=on_demand_feature_view_proto.spec.user_defined_function.body_text, + transformation=transformation, description=on_demand_feature_view_proto.spec.description, tags=dict(on_demand_feature_view_proto.spec.tags), owner=on_demand_feature_view_proto.spec.owner, @@ -315,7 +309,8 @@ def get_transformed_features_df( columns_to_cleanup.append(full_feature_ref) # Compute transformed values and apply to each result row - df_with_transformed_features = self.udf.__call__(df_with_features) + + df_with_transformed_features = self.transformation.transform(df_with_features) # Work out whether the correct columns names are used. rename_columns: Dict[str, str] = {} @@ -335,7 +330,7 @@ def get_transformed_features_df( df_with_features.drop(columns=columns_to_cleanup, inplace=True) return df_with_transformed_features.rename(columns=rename_columns) - def infer_features(self): + def infer_features(self) -> None: """ Infers the set of features associated to this feature view from the input source. @@ -365,7 +360,7 @@ def infer_features(self): dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type()) sample_val = rand_df_value[dtype] if dtype in rand_df_value else None df[f"{field.name}"] = pd.Series(sample_val, dtype=dtype) - output_df: pd.DataFrame = self.udf.__call__(df) + output_df: pd.DataFrame = self.transformation.transform(df) inferred_features = [] for f, dt in zip(output_df.columns, output_df.dtypes): inferred_features.append( @@ -396,7 +391,9 @@ def infer_features(self): ) @staticmethod - def get_requested_odfvs(feature_refs, project, registry): + def get_requested_odfvs( + feature_refs, project, registry + ) -> List["OnDemandFeatureView"]: all_on_demand_feature_views = registry.list_on_demand_feature_views( project, allow_cache=True ) @@ -438,7 +435,7 @@ def on_demand_feature_view( of the primary maintainer. """ - def mainify(obj): + def mainify(obj) -> None: # Needed to allow dill to properly serialize the udf. Otherwise, clients will need to have a file with the same # name as the original file defining the ODFV. if obj.__module__ != "__main__": @@ -447,15 +444,17 @@ def mainify(obj): def decorator(user_function): udf_string = dill.source.getsource(user_function) mainify(user_function) + + transformation = OnDemandPandasTransformation(user_function, udf_string) + on_demand_feature_view_obj = OnDemandFeatureView( name=user_function.__name__, sources=sources, schema=schema, - udf=user_function, + transformation=transformation, description=description, tags=tags, owner=owner, - udf_string=udf_string, ) functools.update_wrapper( wrapper=on_demand_feature_view_obj, wrapped=user_function diff --git a/sdk/python/feast/on_demand_pandas_transformation.py b/sdk/python/feast/on_demand_pandas_transformation.py new file mode 100644 index 00000000000..52d45893c51 --- /dev/null +++ b/sdk/python/feast/on_demand_pandas_transformation.py @@ -0,0 +1,56 @@ +from types import FunctionType + +import dill +import pandas as pd + +from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( + UserDefinedFunction as UserDefinedFunctionProto, +) + + +class OnDemandPandasTransformation: + def __init__(self, udf: FunctionType, udf_string: str = ""): + """ + Creates an OnDemandPandasTransformation object. + + Args: + udf: The user defined transformation function, which must take pandas + dataframes as inputs. + udf_string: The source code version of the udf (for diffing and displaying in Web UI) + """ + self.udf = udf + self.udf_string = udf_string + + def transform(self, df: pd.DataFrame) -> pd.DataFrame: + return self.udf.__call__(df) + + def __eq__(self, other): + if not isinstance(other, OnDemandPandasTransformation): + raise TypeError( + "Comparisons should only involve OnDemandPandasTransformation class objects." + ) + + if not super().__eq__(other): + return False + + if ( + self.udf_string != other.udf_string + or self.udf.__code__.co_code != other.udf.__code__.co_code + ): + return False + + return True + + def to_proto(self) -> UserDefinedFunctionProto: + return UserDefinedFunctionProto( + name=self.udf.__name__, + body=dill.dumps(self.udf, recurse=True), + body_text=self.udf_string, + ) + + @classmethod + def from_proto(cls, user_defined_function_proto: UserDefinedFunctionProto): + return OnDemandPandasTransformation( + udf=dill.loads(user_defined_function_proto.body), + udf_string=user_defined_function_proto.body_text, + ) diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index 5938a0c936e..13a4806e56a 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -14,6 +14,7 @@ StreamFeatureView, ) from feast.data_source import DataSource, RequestSource +from feast.on_demand_feature_view import OnDemandPandasTransformation from feast.types import Array, FeastType, Float32, Float64, Int32, Int64 from tests.integration.feature_repos.universal.entities import ( customer, @@ -69,8 +70,9 @@ def conv_rate_plus_100_feature_view( name=conv_rate_plus_100.__name__, schema=[] if infer_features else _features, sources=sources, - udf=conv_rate_plus_100, - udf_string="raw udf source", + transformation=OnDemandPandasTransformation( + udf=conv_rate_plus_100, udf_string="raw udf source" + ), ) @@ -107,8 +109,9 @@ def similarity_feature_view( name=similarity.__name__, sources=sources, schema=[] if infer_features else _fields, - udf=similarity, - udf_string="similarity raw udf", + transformation=OnDemandPandasTransformation( + udf=similarity, udf_string="similarity raw udf" + ), ) diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index ca8e7b25cb8..24a4af449ba 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -17,7 +17,10 @@ from feast.feature_view import FeatureView from feast.field import Field from feast.infra.offline_stores.file_source import FileSource -from feast.on_demand_feature_view import OnDemandFeatureView +from feast.on_demand_feature_view import ( + OnDemandFeatureView, + OnDemandPandasTransformation, +) from feast.types import Float32 @@ -54,8 +57,9 @@ def test_hash(): Field(name="output1", dtype=Float32), Field(name="output2", dtype=Float32), ], - udf=udf1, - udf_string="udf1 source code", + transformation=OnDemandPandasTransformation( + udf=udf1, udf_string="udf1 source code" + ), ) on_demand_feature_view_2 = OnDemandFeatureView( name="my-on-demand-feature-view", @@ -64,8 +68,9 @@ def test_hash(): Field(name="output1", dtype=Float32), Field(name="output2", dtype=Float32), ], - udf=udf1, - udf_string="udf1 source code", + transformation=OnDemandPandasTransformation( + udf=udf1, udf_string="udf1 source code" + ), ) on_demand_feature_view_3 = OnDemandFeatureView( name="my-on-demand-feature-view", @@ -74,8 +79,9 @@ def test_hash(): Field(name="output1", dtype=Float32), Field(name="output2", dtype=Float32), ], - udf=udf2, - udf_string="udf2 source code", + transformation=OnDemandPandasTransformation( + udf=udf2, udf_string="udf2 source code" + ), ) on_demand_feature_view_4 = OnDemandFeatureView( name="my-on-demand-feature-view", @@ -84,8 +90,21 @@ def test_hash(): Field(name="output1", dtype=Float32), Field(name="output2", dtype=Float32), ], - udf=udf2, - udf_string="udf2 source code", + transformation=OnDemandPandasTransformation( + udf=udf2, udf_string="udf2 source code" + ), + description="test", + ) + on_demand_feature_view_4 = OnDemandFeatureView( + name="my-on-demand-feature-view", + sources=sources, + schema=[ + Field(name="output1", dtype=Float32), + Field(name="output2", dtype=Float32), + ], + transformation=OnDemandPandasTransformation( + udf=udf2, udf_string="udf2 source code" + ), description="test", ) From f0180e69e9a2bfbfa5152ce9ddbcddd8962a482a Mon Sep 17 00:00:00 2001 From: tokoko Date: Tue, 13 Feb 2024 20:52:22 +0000 Subject: [PATCH 2/4] OnDemandFeatureView: keep udf and udf_string parameters for backwards compatibility Signed-off-by: tokoko --- sdk/python/feast/on_demand_feature_view.py | 20 ++++++++++++++++++- .../tests/unit/test_on_demand_feature_view.py | 11 ++++++---- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 41d7cb817a2..706f2ec4e41 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -2,6 +2,7 @@ import functools import warnings from datetime import datetime +from types import FunctionType from typing import Any, Dict, List, Optional, Type, Union import dill @@ -77,7 +78,9 @@ def __init__( # noqa: C901 FeatureViewProjection, ] ], - transformation: Union[OnDemandPandasTransformation], + udf: Optional[FunctionType] = None, + udf_string: str = "", + transformation: Optional[Union[OnDemandPandasTransformation]] = None, description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", @@ -92,6 +95,9 @@ def __init__( # noqa: C901 sources: A map from input source names to the actual input sources, which may be feature views, or request data sources. These sources serve as inputs to the udf, which will refer to them by name. + udf (deprecated): The user defined transformation function, which must take pandas + dataframes as inputs. + udf_string (deprecated): The source code version of the udf (for diffing and displaying in Web UI) transformation: The user defined transformation. description (optional): A human-readable description. tags (optional): A dictionary of key-value pairs to store arbitrary metadata. @@ -106,6 +112,18 @@ def __init__( # noqa: C901 owner=owner, ) + if not transformation: + if udf: + warnings.warn( + "udf and udf_string parameters are deprecated. Please use transformation=OnDemandPandasTransformation(udf, udf_string) instead.", + DeprecationWarning, + ) + transformation = OnDemandPandasTransformation(udf, udf_string) + else: + raise Exception( + "OnDemandFeatureView needs to be initialized with either transformation or udf arguments" + ) + self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {} self.source_request_sources: Dict[str, RequestSource] = {} for odfv_source in sources: diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index 24a4af449ba..721026ea462 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -95,16 +95,15 @@ def test_hash(): ), description="test", ) - on_demand_feature_view_4 = OnDemandFeatureView( + on_demand_feature_view_5 = OnDemandFeatureView( name="my-on-demand-feature-view", sources=sources, schema=[ Field(name="output1", dtype=Float32), Field(name="output2", dtype=Float32), ], - transformation=OnDemandPandasTransformation( - udf=udf2, udf_string="udf2 source code" - ), + udf=udf2, + udf_string="udf2 source code", description="test", ) @@ -124,3 +123,7 @@ def test_hash(): on_demand_feature_view_4, } assert len(s4) == 3 + + assert on_demand_feature_view_5.transformation == OnDemandPandasTransformation( + udf2, "udf2 source code" + ) From 9b936f100a2a752d27a003400245b821ace2e83d Mon Sep 17 00:00:00 2001 From: tokoko Date: Sat, 17 Feb 2024 19:17:25 +0000 Subject: [PATCH 3/4] fix linting issues Signed-off-by: tokoko --- .../tests/integration/feature_repos/universal/feature_views.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index a75e6cc2d01..421ef416018 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -14,8 +14,8 @@ StreamFeatureView, ) from feast.data_source import DataSource, RequestSource -from feast.on_demand_feature_view import OnDemandPandasTransformation from feast.feature_view_projection import FeatureViewProjection +from feast.on_demand_feature_view import OnDemandPandasTransformation from feast.types import Array, FeastType, Float32, Float64, Int32, Int64 from tests.integration.feature_repos.universal.entities import ( customer, From 2f0c67cf44464ce5abd1cc8736114325cb5685ab Mon Sep 17 00:00:00 2001 From: tokoko Date: Sun, 18 Feb 2024 12:38:00 +0000 Subject: [PATCH 4/4] remove unused import in registry protos Signed-off-by: tokoko --- protos/feast/registry/RegistryServer.proto | 1 - 1 file changed, 1 deletion(-) diff --git a/protos/feast/registry/RegistryServer.proto b/protos/feast/registry/RegistryServer.proto index 3e7773e89a4..ab324f9bd1a 100644 --- a/protos/feast/registry/RegistryServer.proto +++ b/protos/feast/registry/RegistryServer.proto @@ -2,7 +2,6 @@ syntax = "proto3"; package feast.registry; -import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto"; import "feast/core/Registry.proto"; import "feast/core/Entity.proto";