diff --git a/docs/getting-started/concepts/feature-view.md b/docs/getting-started/concepts/feature-view.md index d94b1359cdd..fa6612fbadd 100644 --- a/docs/getting-started/concepts/feature-view.md +++ b/docs/getting-started/concepts/feature-view.md @@ -138,6 +138,8 @@ Feature names must be unique within a [feature view](feature-view.md#feature-vie On demand feature views allows users to use existing features and request time data (features only available at request time) to transform and create new features. Users define python transformation logic which is executed in both historical retrieval and online retrieval paths: ```python +from feast import Field, Float64, RequestSource + # Define a request data source which encodes features / information only # available at request time (e.g. part of the user initiated HTTP request) input_request = RequestSource( @@ -150,13 +152,13 @@ input_request = RequestSource( # Use the input data and feature view features to create new features @on_demand_feature_view( - inputs={ + sources={ 'driver_hourly_stats': driver_hourly_stats_view, 'vals_to_add': input_request }, - features=[ - Feature(name='conv_rate_plus_val1', dtype=ValueType.DOUBLE), - Feature(name='conv_rate_plus_val2', dtype=ValueType.DOUBLE) + schema=[ + Field(name='conv_rate_plus_val1', dtype=Float64), + Field(name='conv_rate_plus_val2', dtype=Float64) ] ) def transformed_conv_rate(features_df: pd.DataFrame) -> pd.DataFrame: diff --git a/docs/reference/alpha-on-demand-feature-view.md b/docs/reference/alpha-on-demand-feature-view.md index 18416d74e23..8816f562ba9 100644 --- a/docs/reference/alpha-on-demand-feature-view.md +++ b/docs/reference/alpha-on-demand-feature-view.md @@ -28,6 +28,8 @@ See [https://github.com/feast-dev/on-demand-feature-views-demo](https://github.c We register `RequestDataSource` inputs and the transform in `on_demand_feature_view`: ```python +from feast import Field, Float64, RequestSource + # Define a request data source which encodes features / information only # available at request time (e.g. part of the user initiated HTTP request) input_request = RequestDataSource( @@ -40,13 +42,13 @@ input_request = RequestDataSource( # Use the input data and feature view features to create new features @on_demand_feature_view( - inputs={ + sources={ 'driver_hourly_stats': driver_hourly_stats_view, 'vals_to_add': input_request }, - features=[ - Feature(name='conv_rate_plus_val1', dtype=ValueType.DOUBLE), - Feature(name='conv_rate_plus_val2', dtype=ValueType.DOUBLE) + schema=[ + Field(name='conv_rate_plus_val1', dtype=Float64), + Field(name='conv_rate_plus_val2', dtype=Float64) ] ) def transformed_conv_rate(features_df: pd.DataFrame) -> pd.DataFrame: diff --git a/docs/tutorials/validating-historical-features.md b/docs/tutorials/validating-historical-features.md index 79d16a74b77..d0c59de6619 100644 --- a/docs/tutorials/validating-historical-features.md +++ b/docs/tutorials/validating-historical-features.md @@ -107,7 +107,7 @@ pyarrow.parquet.write_table(entities_2019_table, "entities.parquet") import pyarrow.parquet import pandas as pd -from feast import Feature, FeatureView, Entity, FeatureStore, Field, Float64, Int64 +from feast import FeatureView, Entity, FeatureStore, Field, Float64, Int64 from feast.value_type import ValueType from feast.data_format import ParquetFormat from feast.on_demand_feature_view import on_demand_feature_view @@ -153,11 +153,11 @@ trips_stats_fv = FeatureView( ```python @on_demand_feature_view( - features=[ - Feature("avg_fare", ValueType.DOUBLE), - Feature("avg_speed", ValueType.DOUBLE), - Feature("avg_trip_seconds", ValueType.DOUBLE), - Feature("earned_per_hour", ValueType.DOUBLE), + schema=[ + Field("avg_fare", Float64), + Field("avg_speed", Float64), + Field("avg_trip_seconds", Float64), + Field("earned_per_hour", Float64), ], inputs={ "stats": trips_stats_fv diff --git a/examples/java-demo/feature_repo/driver_repo.py b/examples/java-demo/feature_repo/driver_repo.py index cf00c740918..d7ce41134a7 100644 --- a/examples/java-demo/feature_repo/driver_repo.py +++ b/examples/java-demo/feature_repo/driver_repo.py @@ -1,10 +1,13 @@ import pandas as pd -from feast import Entity, Feature, FeatureView, FileSource, ValueType from feast.data_source import RequestSource +from feast.field import Field from feast.on_demand_feature_view import on_demand_feature_view from feast.request_feature_view import RequestFeatureView +from feast.types import Float32, Float64, Int64, String from google.protobuf.duration_pb2 import Duration +from feast import Entity, Feature, FeatureView, FileSource, ValueType + driver_hourly_stats = FileSource( path="data/driver_stats_with_string.parquet", timestamp_field="event_timestamp", @@ -15,11 +18,11 @@ name="driver_hourly_stats", entities=["driver_id"], ttl=Duration(seconds=86400000), - features=[ - Feature(name="conv_rate", dtype=ValueType.FLOAT), - Feature(name="acc_rate", dtype=ValueType.FLOAT), - Feature(name="avg_daily_trips", dtype=ValueType.INT64), - Feature(name="string_feature", dtype=ValueType.STRING), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + Field(name="string_feature", dtype=String), ], online=True, batch_source=driver_hourly_stats, @@ -40,9 +43,9 @@ "driver_hourly_stats": driver_hourly_stats_view, "vals_to_add": input_request, }, - features=[ - Feature(name="conv_rate_plus_val1", dtype=ValueType.DOUBLE), - Feature(name="conv_rate_plus_val2", dtype=ValueType.DOUBLE), + schema=[ + Field(name="conv_rate_plus_val1", dtype=Float64), + Field(name="conv_rate_plus_val2", dtype=Float64), ], ) def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame: diff --git a/java/serving/src/test/resources/docker-compose/feast10/definitions.py b/java/serving/src/test/resources/docker-compose/feast10/definitions.py index c48acffe173..35c5a2c127f 100644 --- a/java/serving/src/test/resources/docker-compose/feast10/definitions.py +++ b/java/serving/src/test/resources/docker-compose/feast10/definitions.py @@ -1,16 +1,15 @@ import pandas as pd - -from google.protobuf.duration_pb2 import Duration - -from feast.value_type import ValueType -from feast.feature import Feature -from feast.feature_view import FeatureView +from feast.data_source import RequestSource from feast.entity import Entity from feast.feature_service import FeatureService -from feast.data_source import RequestSource +from feast.feature_view import FeatureView +from feast.field import Field from feast.on_demand_feature_view import on_demand_feature_view -from feast import FileSource +from feast.types import Float32, Float64, Int64 +from feast.value_type import ValueType +from google.protobuf.duration_pb2 import Duration +from feast import FileSource file_path = "driver_stats.parquet" driver_hourly_stats = FileSource( @@ -30,10 +29,10 @@ name="driver_hourly_stats", entities=["driver_id"], ttl=Duration(seconds=86400 * 7), - features=[ - Feature(name="conv_rate", dtype=ValueType.DOUBLE), - Feature(name="acc_rate", dtype=ValueType.FLOAT), - Feature(name="avg_daily_trips", dtype=ValueType.INT64), + schema=[ + Field(name="conv_rate", dtype=Float64), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), ], online=True, batch_source=driver_hourly_stats, @@ -43,49 +42,39 @@ input_request = RequestSource( name="vals_to_add", - schema={ - "val_to_add": ValueType.INT64, - "val_to_add_2": ValueType.INT64 - } + schema={"val_to_add": ValueType.INT64, "val_to_add_2": ValueType.INT64}, ) @on_demand_feature_view( - sources={ - 'driver_hourly_stats': driver_hourly_stats_view, - 'vals_to_add': input_request - }, - features=[ - Feature(name='conv_rate_plus_val1', dtype=ValueType.DOUBLE), - Feature(name='conv_rate_plus_val2', dtype=ValueType.DOUBLE) - ] + sources={ + "driver_hourly_stats": driver_hourly_stats_view, + "vals_to_add": input_request, + }, + schema=[ + Field(name="conv_rate_plus_val1", dtype=Float64), + Field(name="conv_rate_plus_val2", dtype=Float64), + ], ) def transformed_conv_rate(features_df: pd.DataFrame) -> pd.DataFrame: df = pd.DataFrame() - df['conv_rate_plus_val1'] = (features_df['conv_rate'] + features_df['val_to_add']) - df['conv_rate_plus_val2'] = (features_df['conv_rate'] + features_df['val_to_add_2']) + df["conv_rate_plus_val1"] = features_df["conv_rate"] + features_df["val_to_add"] + df["conv_rate_plus_val2"] = features_df["conv_rate"] + features_df["val_to_add_2"] return df generated_data_source = FileSource( - path="benchmark_data.parquet", - timestamp_field="event_timestamp", + path="benchmark_data.parquet", timestamp_field="event_timestamp", ) -entity = Entity( - name="entity", - value_type=ValueType.STRING, -) +entity = Entity(name="entity", value_type=ValueType.STRING,) benchmark_feature_views = [ FeatureView( name=f"feature_view_{i}", entities=["entity"], ttl=Duration(seconds=86400), - features=[ - Feature(name=f"feature_{10 * i + j}", dtype=ValueType.INT64) - for j in range(10) - ], + schema=[Field(name=f"feature_{10 * i + j}", dtype=Int64) for j in range(10)], online=True, batch_source=generated_data_source, ) @@ -93,6 +82,5 @@ def transformed_conv_rate(features_df: pd.DataFrame) -> pd.DataFrame: ] benchmark_feature_service = FeatureService( - name=f"benchmark_feature_service", - features=benchmark_feature_views, + name=f"benchmark_feature_service", features=benchmark_feature_views, ) diff --git a/java/serving/src/test/resources/docker-compose/feast10/materialize.py b/java/serving/src/test/resources/docker-compose/feast10/materialize.py index 8389d8527bf..404fec27e12 100644 --- a/java/serving/src/test/resources/docker-compose/feast10/materialize.py +++ b/java/serving/src/test/resources/docker-compose/feast10/materialize.py @@ -1,12 +1,17 @@ -import pandas as pd -import numpy as np - from datetime import datetime, timedelta -from feast import FeatureStore -from definitions import driver_hourly_stats_view, driver, entity,\ - benchmark_feature_service, benchmark_feature_views, transformed_conv_rate +import numpy as np +import pandas as pd +from definitions import ( + benchmark_feature_service, + benchmark_feature_views, + driver, + driver_hourly_stats_view, + entity, + transformed_conv_rate, +) +from feast import FeatureStore print("Running materialize.py") @@ -21,7 +26,9 @@ df["avg_daily_trips"] = np.arange(0, 1000, 100) # some of rows are beyond 7 days to test OUTSIDE_MAX_AGE status -df["event_timestamp"] = start + pd.Series(np.arange(0, 10)).map(lambda days: timedelta(days=days)) +df["event_timestamp"] = start + pd.Series(np.arange(0, 10)).map( + lambda days: timedelta(days=days) +) # Store data in parquet files. Parquet is convenient for local development mode. For # production, you can use your favorite DWH, such as BigQuery. See Feast documentation @@ -41,21 +48,27 @@ def generate_data(num_rows: int, num_features: int, destination: str) -> pd.Data for column in features: df[column] = np.random.randint(1, num_rows, num_rows) - df["entity"] = "key-" + \ - pd.Series(np.arange(1, num_rows + 1)).astype(pd.StringDtype()) + df["entity"] = "key-" + pd.Series(np.arange(1, num_rows + 1)).astype( + pd.StringDtype() + ) df.to_parquet(destination) -generate_data(10**3, 250, "benchmark_data.parquet") +generate_data(10 ** 3, 250, "benchmark_data.parquet") fs = FeatureStore(".") -fs.apply([driver_hourly_stats_view, - transformed_conv_rate, - driver, - entity, benchmark_feature_service, - *benchmark_feature_views]) +fs.apply( + [ + driver_hourly_stats_view, + transformed_conv_rate, + driver, + entity, + benchmark_feature_service, + *benchmark_feature_views, + ] +) now = datetime.now() fs.materialize(start, now) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 33c4c255080..a453cbb4db7 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -106,7 +106,7 @@ def __init__( owner (optional): The owner of the on demand feature view, typically the email of the primary maintainer. """ - positional_attributes = ["name", "features", "sources", "udf"] + positional_attributes = ["name", "features", "inputs", "udf"] _name = name @@ -459,22 +459,112 @@ def get_requested_odfvs(feature_refs, project, registry): # TODO(felixwang9817): Force this decorator to accept kwargs and switch from # `features` to `schema`. def on_demand_feature_view( - features: List[Feature], sources: Dict[str, Union[FeatureView, RequestSource]] + *args, + features: Optional[List[Feature]] = None, + sources: Optional[Dict[str, Union[FeatureView, RequestSource]]] = None, + inputs: Optional[Dict[str, Union[FeatureView, RequestSource]]] = None, + schema: Optional[List[Field]] = None, + description: str = "", + tags: Optional[Dict[str, str]] = None, + owner: str = "", ): """ - Declare an on-demand feature view - - :param features: Output schema with feature names - :param sources: The sources passed into the transform. - :return: An On Demand Feature View. + Creates an OnDemandFeatureView object with the given user function as udf. + + Args: + features (deprecated): The list of features in the output of the on demand + feature view, after the transformation has been applied. + sources (optional): A map from input source names to the actual input sources, + which may be feature views, feature view projections, or request data sources. + These sources serve as inputs to the udf, which will refer to them by name. + inputs (optional): A map from input source names to the actual input sources, + which may be feature views, feature view projections, or request data sources. + These sources serve as inputs to the udf, which will refer to them by name. + schema (optional): The list of features in the output of the on demand feature + view, after the transformation has been applied. + description (optional): A human-readable description. + tags (optional): A dictionary of key-value pairs to store arbitrary metadata. + owner (optional): The owner of the on demand feature view, typically the email + of the primary maintainer. """ + positional_attributes = ["features", "inputs"] + + _schema = schema or [] + if len(_schema) == 0 and features is not None: + _schema = [Field.from_feature(feature) for feature in features] + if features is not None: + warnings.warn( + ( + "The `features` parameter is being deprecated in favor of the `schema` parameter. " + "Please switch from using `features` to `schema`. This will also requiring switching " + "feature definitions from using `Feature` to `Field`. Feast 0.21 and onwards will not " + "support the `features` parameter." + ), + DeprecationWarning, + ) + + _sources = sources or inputs + if inputs and sources: + raise ValueError("At most one of `sources` or `inputs` can be specified.") + elif inputs: + warnings.warn( + ( + "The `inputs` parameter is being deprecated. Please use `sources` instead. " + "Feast 0.21 and onwards will not support the `inputs` parameter." + ), + DeprecationWarning, + ) + + if args: + warnings.warn( + ( + "On demand feature view parameters should be specified as keyword arguments " + "instead of positional arguments. Feast 0.23 and onwards will not support " + "positional arguments in on demand feature view definitions." + ), + DeprecationWarning, + ) + if len(args) > len(positional_attributes): + raise ValueError( + f"Only {', '.join(positional_attributes)} are allowed as positional args " + f"when defining feature views, for backwards compatibility." + ) + if len(args) >= 1: + _schema = args[0] + # Convert Features to Fields. + if len(_schema) > 0 and isinstance(_schema[0], Feature): + _schema = [Field.from_feature(feature) for feature in _schema] + warnings.warn( + ( + "The `features` parameter is being deprecated in favor of the `schema` parameter. " + "Please switch from using `features` to `schema`. This will also requiring switching " + "feature definitions from using `Feature` to `Field`. Feast 0.21 and onwards will not " + "support the `features` parameter." + ), + DeprecationWarning, + ) + if len(args) >= 2: + _sources = args[1] + warnings.warn( + ( + "The `inputs` parameter is being deprecated. Please use `sources` instead. " + "Feast 0.21 and onwards will not support the `inputs` parameter." + ), + DeprecationWarning, + ) + + if not _sources: + raise ValueError("The `sources` parameter must be specified.") def decorator(user_function): on_demand_feature_view_obj = OnDemandFeatureView( name=user_function.__name__, - sources=sources, - features=features, + sources=_sources, + schema=_schema, udf=user_function, + description=description, + tags=tags, + owner=owner, ) functools.update_wrapper( wrapper=on_demand_feature_view_obj, wrapped=user_function diff --git a/sdk/python/tests/integration/registration/test_inference.py b/sdk/python/tests/integration/registration/test_inference.py index b69191d3082..c0358abb59e 100644 --- a/sdk/python/tests/integration/registration/test_inference.py +++ b/sdk/python/tests/integration/registration/test_inference.py @@ -20,6 +20,7 @@ SpecifiedFeaturesNotPresentError, ) from feast.feature_view import FeatureView +from feast.field import Field from feast.inference import ( update_data_sources_with_inferred_event_timestamp_col, update_entities_with_inferred_types_from_feature_views, @@ -28,6 +29,7 @@ SparkSource, ) from feast.on_demand_feature_view import on_demand_feature_view +from feast.types import String, UnixTimestamp from tests.utils.data_source_utils import ( prep_file_source, simple_bq_source_using_query_arg, @@ -185,7 +187,10 @@ def test_view(features_df: pd.DataFrame) -> pd.DataFrame: test_view.infer_features() @on_demand_feature_view( - sources={"date_request": date_request}, + # Note: we deliberately use `inputs` instead of `sources` to test that `inputs` + # still works correctly, even though it is deprecated. + # TODO(felixwang9817): Remove references to `inputs` once it is fully deprecated. + inputs={"date_request": date_request}, features=[ Feature(name="output", dtype=ValueType.UNIX_TIMESTAMP), Feature(name="object_output", dtype=ValueType.STRING), @@ -201,11 +206,14 @@ def invalid_test_view(features_df: pd.DataFrame) -> pd.DataFrame: invalid_test_view.infer_features() @on_demand_feature_view( - sources={"date_request": date_request}, - features=[ + # Note: we deliberately use positional arguments here to test that they work correctly, + # even though positional arguments are deprecated in favor of keyword arguments. + # TODO(felixwang9817): Remove positional arguments once they are fully deprecated. + [ Feature(name="output", dtype=ValueType.UNIX_TIMESTAMP), Feature(name="missing", dtype=ValueType.STRING), ], + {"date_request": date_request}, ) def test_view_with_missing_feature(features_df: pd.DataFrame) -> pd.DataFrame: data = pd.DataFrame() @@ -223,11 +231,14 @@ def test_datasource_inference(): ) @on_demand_feature_view( - sources={"date_request": date_request}, - features=[ + # Note: we deliberately use positional arguments here to test that they work correctly, + # even though positional arguments are deprecated in favor of keyword arguments. + # TODO(felixwang9817): Remove positional arguments once they are fully deprecated. + [ Feature(name="output", dtype=ValueType.UNIX_TIMESTAMP), Feature(name="string_output", dtype=ValueType.STRING), ], + sources={"date_request": date_request}, ) def test_view(features_df: pd.DataFrame) -> pd.DataFrame: data = pd.DataFrame() @@ -239,9 +250,9 @@ def test_view(features_df: pd.DataFrame) -> pd.DataFrame: @on_demand_feature_view( sources={"date_request": date_request}, - features=[ - Feature(name="output", dtype=ValueType.UNIX_TIMESTAMP), - Feature(name="object_output", dtype=ValueType.STRING), + schema=[ + Field(name="output", dtype=UnixTimestamp), + Field(name="object_output", dtype=String), ], ) def invalid_test_view(features_df: pd.DataFrame) -> pd.DataFrame: diff --git a/ui/feature_repo/features.py b/ui/feature_repo/features.py index 934e7124475..dfc23cc394b 100644 --- a/ui/feature_repo/features.py +++ b/ui/feature_repo/features.py @@ -136,7 +136,7 @@ # Define an on demand feature view which can generate new features based on # existing feature views and RequestSource features @on_demand_feature_view( - inputs={"credit_history": credit_history, "transaction": input_request,}, + sources={"credit_history": credit_history, "transaction": input_request,}, schema=[ Field(name="transaction_gt_last_credit_card_due", dtype=Bool), ], diff --git a/ui/public/registry.json b/ui/public/registry.json index 70709bf28da..af328979e98 100644 --- a/ui/public/registry.json +++ b/ui/public/registry.json @@ -627,7 +627,7 @@ }, "userDefinedFunction": { "name": "transaction_gt_last_credit_card_due", - "body": "@on_demand_feature_view(\n inputs={\"credit_history\": credit_history, \"transaction\": input_request,},\n schema=[\n Field(name=\"transaction_gt_last_credit_card_due\", dtype=Bool),\n ],\n)\ndef transaction_gt_last_credit_card_due(inputs: pd.DataFrame) -> pd.DataFrame:\n df = pd.DataFrame()\n df[\"transaction_gt_last_credit_card_due\"] = (\n inputs[\"transaction_amt\"] > inputs[\"credit_card_due\"]\n )\n return df\n" + "body": "@on_demand_feature_view(\n sources={\"credit_history\": credit_history, \"transaction\": input_request,},\n schema=[\n Field(name=\"transaction_gt_last_credit_card_due\", dtype=Bool),\n ],\n)\ndef transaction_gt_last_credit_card_due(inputs: pd.DataFrame) -> pd.DataFrame:\n df = pd.DataFrame()\n df[\"transaction_gt_last_credit_card_due\"] = (\n inputs[\"transaction_amt\"] > inputs[\"credit_card_due\"]\n )\n return df\n" } }, "meta": {