From 1756a144217bea11e069da7e1370170beee695d9 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Fri, 25 Mar 2022 09:44:21 -0700 Subject: [PATCH 1/5] chore: Add support for push sources in feature views Signed-off-by: Achal Shah --- sdk/python/feast/__init__.py | 3 +- sdk/python/feast/data_source.py | 38 +++++++++++++------ sdk/python/feast/feature_store.py | 23 ++++++++++- sdk/python/feast/feature_view.py | 27 +++++++++---- sdk/python/feast/repo_operations.py | 3 ++ .../example_repos/example_feature_repo_1.py | 24 ++++++++++++ sdk/python/tests/unit/test_data_sources.py | 16 +------- 7 files changed, 98 insertions(+), 36 deletions(-) diff --git a/sdk/python/feast/__init__.py b/sdk/python/feast/__init__.py index 83b504b0cb9..144b87b042b 100644 --- a/sdk/python/feast/__init__.py +++ b/sdk/python/feast/__init__.py @@ -7,7 +7,7 @@ from feast.infra.offline_stores.redshift_source import RedshiftSource from feast.infra.offline_stores.snowflake_source import SnowflakeSource -from .data_source import KafkaSource, KinesisSource, SourceType +from .data_source import KafkaSource, KinesisSource, PushSource, SourceType from .entity import Entity from .feature import Feature from .feature_service import FeatureService @@ -47,4 +47,5 @@ "RedshiftSource", "RequestFeatureView", "SnowflakeSource", + "PushSource", ] diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 0d49ce22492..02a10f85356 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -396,8 +396,8 @@ def get_table_column_names_and_types( def from_proto(data_source: DataSourceProto): schema_pb = data_source.request_data_options.schema schema = {} - for key in schema_pb.keys(): - schema[key] = ValueType(schema_pb.get(key)) + for key, val in schema_pb.items(): + schema[key] = ValueType(val) return RequestDataSource(name=data_source.name, schema=schema) def to_proto(self) -> DataSourceProto: @@ -519,18 +519,27 @@ class PushSource(DataSource): name: str schema: Dict[str, ValueType] - batch_source: Optional[DataSource] + batch_source: DataSource + event_timestamp_column: str def __init__( self, name: str, schema: Dict[str, ValueType], - batch_source: Optional[DataSource] = None, + batch_source: DataSource, + event_timestamp_column="timestamp", ): """Creates a PushSource object.""" super().__init__(name) self.schema = schema self.batch_source = batch_source + if not self.batch_source: + raise ValueError(f"batch_source is needed for push source {self.name}") + self.event_timestamp_column = event_timestamp_column + if not self.event_timestamp_column: + raise ValueError( + f"event_timestamp_column is needed for push source {self.name}" + ) def validate(self, config: RepoConfig): pass @@ -544,21 +553,23 @@ def get_table_column_names_and_types( def from_proto(data_source: DataSourceProto): schema_pb = data_source.push_options.schema schema = {} - for key, value in schema_pb.items(): - schema[key] = value + for key, val in schema_pb.items(): + schema[key] = ValueType(val) - batch_source = None - if data_source.push_options.HasField("batch_source"): - batch_source = DataSource.from_proto(data_source.push_options.batch_source) + assert data_source.push_options.HasField("batch_source") + batch_source = DataSource.from_proto(data_source.push_options.batch_source) return PushSource( - name=data_source.name, schema=schema, batch_source=batch_source + name=data_source.name, + schema=schema, + batch_source=batch_source, + event_timestamp_column=data_source.event_timestamp_column, ) def to_proto(self) -> DataSourceProto: schema_pb = {} for key, value in self.schema.items(): - schema_pb[key] = value + schema_pb[key] = value.value batch_source_proto = None if self.batch_source: batch_source_proto = self.batch_source.to_proto() @@ -567,7 +578,10 @@ def to_proto(self) -> DataSourceProto: schema=schema_pb, batch_source=batch_source_proto ) data_source_proto = DataSourceProto( - name=self.name, type=DataSourceProto.PUSH_SOURCE, push_options=options, + name=self.name, + type=DataSourceProto.PUSH_SOURCE, + push_options=options, + event_timestamp_column=self.event_timestamp_column, ) return data_source_proto diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index e9108f89537..3f423f9af16 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -39,7 +39,7 @@ from feast import feature_server, flags, flags_helper, utils from feast.base_feature_view import BaseFeatureView -from feast.data_source import DataSource +from feast.data_source import DataSource, PushSource from feast.diff.infra_diff import InfraDiff, diff_infra_protos from feast.diff.registry_diff import RegistryDiff, apply_diff_to_registry, diff_between from feast.entity import Entity @@ -1149,6 +1149,27 @@ def tqdm_builder(length): feature_view, self.project, start_date, end_date, ) + @log_exceptions_and_usage + def push(self, push_source_name: str, df: pd.DataFrame): + data_sources = self.registry.list_data_sources(self.project, True) + push_source = { + ds + for ds in data_sources + if isinstance(ds, PushSource) and ds.name == push_source_name + } + + all_fvs = self.registry.list_feature_views(self.project, True) + fvs_with_push_sources = { + fv for fv in all_fvs if fv.stream_source in push_source + } + + for fv in fvs_with_push_sources: + entities = [] + for entity_name in fv.entities: + entities.append(self._registry.get_entity(entity_name, self.project)) + provider = self._get_provider() + provider.ingest_df(fv, entities, df) + @log_exceptions_and_usage def write_to_online_store( self, diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 3e7561d3387..8a0b26e0408 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -20,7 +20,7 @@ from feast import utils from feast.base_feature_view import BaseFeatureView -from feast.data_source import DataSource +from feast.data_source import DataSource, PushSource from feast.entity import Entity from feast.feature import Feature from feast.feature_view_projection import FeatureViewProjection @@ -72,6 +72,7 @@ class FeatureView(BaseFeatureView): online: bool batch_source: DataSource stream_source: Optional[DataSource] + materialization_intervals: List[Tuple[datetime, datetime]] @log_exceptions @@ -80,7 +81,7 @@ def __init__( name: str, entities: List[str], ttl: Union[Duration, timedelta], - batch_source: DataSource, + batch_source: Optional[DataSource] = None, stream_source: Optional[DataSource] = None, features: Optional[List[Feature]] = None, tags: Optional[Dict[str, str]] = None, @@ -94,15 +95,28 @@ def __init__( """ _features = features or [] + if stream_source is not None and isinstance(stream_source, PushSource): + assert stream_source.batch_source is not None + assert isinstance(stream_source.batch_source, DataSource) + self.batch_source = stream_source.batch_source + else: + assert batch_source is not None + self.batch_source = batch_source + + if not self.batch_source: + raise ValueError( + f"A batch_source needs to be specified for feature view `{name}`" + ) + cols = [entity for entity in entities] + [feat.name for feat in _features] for col in cols: if ( - batch_source.field_mapping is not None - and col in batch_source.field_mapping.keys() + self.batch_source.field_mapping is not None + and col in self.batch_source.field_mapping.keys() ): raise ValueError( - f"The field {col} is mapped to {batch_source.field_mapping[col]} for this data source. " - f"Please either remove this field mapping or use {batch_source.field_mapping[col]} as the " + f"The field {col} is mapped to {self.batch_source.field_mapping[col]} for this data source. " + f"Please either remove this field mapping or use {self.batch_source.field_mapping[col]} as the " f"Entity or Feature name." ) @@ -124,7 +138,6 @@ def __init__( self.ttl = ttl self.online = online - self.batch_source = batch_source self.stream_source = stream_source self.materialization_intervals = [] diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 4bee79bd60a..3457aa48866 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -12,6 +12,7 @@ import click from click.exceptions import BadParameter +from feast import PushSource from feast.data_source import DataSource from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add from feast.entity import Entity @@ -112,6 +113,8 @@ def parse_repo(repo_root: Path) -> RepoContents: res.data_sources.add(obj) if isinstance(obj, FeatureView): res.feature_views.add(obj) + if isinstance(obj.stream_source, PushSource): + res.data_sources.add(obj.stream_source.batch_source) elif isinstance(obj, Entity): res.entities.add(obj) elif isinstance(obj, FeatureService): diff --git a/sdk/python/tests/example_repos/example_feature_repo_1.py b/sdk/python/tests/example_repos/example_feature_repo_1.py index b072f872547..bb9bc0f1a1a 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_1.py +++ b/sdk/python/tests/example_repos/example_feature_repo_1.py @@ -6,6 +6,7 @@ Feature, FeatureService, FeatureView, + PushSource, ValueType, ) @@ -26,6 +27,16 @@ event_timestamp_column="event_timestamp", ) +driver_locations_push_source = PushSource( + name="driver_locations_push", + schema={ + "driver_id": ValueType.STRING, + "driver_lat": ValueType.FLOAT, + "driver_long": ValueType.STRING, + }, + batch_source=driver_locations_source, +) + driver = Entity( name="driver", # The name is derived from this argument, not object name. join_key="driver_id", @@ -53,6 +64,19 @@ tags={}, ) +pushed_driver_locations = FeatureView( + name="pushed_driver_locations", + entities=["driver"], + ttl=timedelta(days=1), + features=[ + Feature(name="driver_lat", dtype=ValueType.FLOAT), + Feature(name="driver_long", dtype=ValueType.STRING), + ], + online=True, + stream_source=driver_locations_push_source, + tags={}, +) + customer_profile = FeatureView( name="customer_profile", entities=["customer"], diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index cae5249694a..28a12d44ad4 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -1,20 +1,6 @@ +from feast import ValueType from feast.data_source import PushSource from feast.infra.offline_stores.bigquery_source import BigQuerySource -from feast.protos.feast.types.Value_pb2 import ValueType - - -def test_push_no_batch(): - push_source = PushSource( - name="test", schema={"user_id": ValueType.INT64, "ltv": ValueType.INT64} - ) - push_source_proto = push_source.to_proto() - assert push_source_proto.push_options is not None - assert not push_source_proto.push_options.HasField("batch_source") - push_source_unproto = PushSource.from_proto(push_source_proto) - - assert push_source.name == push_source_unproto.name - assert push_source.schema == push_source_unproto.schema - assert push_source.batch_source == push_source_unproto.batch_source def test_push_with_batch(): From 1840abb9de10b00cbb8487b9699c198891b751f4 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Fri, 25 Mar 2022 11:05:41 -0700 Subject: [PATCH 2/5] fix test Signed-off-by: Achal Shah --- sdk/python/tests/integration/registration/test_cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/tests/integration/registration/test_cli.py b/sdk/python/tests/integration/registration/test_cli.py index c79d672fb94..655e53e7593 100644 --- a/sdk/python/tests/integration/registration/test_cli.py +++ b/sdk/python/tests/integration/registration/test_cli.py @@ -84,12 +84,12 @@ def test_universal_cli(environment: Environment): cwd=repo_path, ) assertpy.assert_that(result.returncode).is_equal_to(0) - assertpy.assert_that(fs.list_feature_views()).is_length(3) + assertpy.assert_that(fs.list_feature_views()).is_length(4) result = runner.run( ["data-sources", "describe", "customer_profile_source"], cwd=repo_path, ) assertpy.assert_that(result.returncode).is_equal_to(0) - assertpy.assert_that(fs.list_data_sources()).is_length(3) + assertpy.assert_that(fs.list_data_sources()).is_length(4) # entity & feature view describe commands should fail when objects don't exist result = runner.run(["entities", "describe", "foo"], cwd=repo_path) From 5002ef0bc80decf2f32f79213e9b32d613d0b1bd Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Fri, 25 Mar 2022 12:04:23 -0700 Subject: [PATCH 3/5] cr Signed-off-by: Achal Shah --- sdk/python/feast/data_source.py | 17 +++++++---- sdk/python/feast/feature_store.py | 48 +++++++++++++++++-------------- sdk/python/feast/feature_view.py | 22 ++++++++------ 3 files changed, 50 insertions(+), 37 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 02a10f85356..2f66f846bcb 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -510,11 +510,7 @@ def to_proto(self) -> DataSourceProto: class PushSource(DataSource): """ - PushSource that can be used to ingest features on request - - Args: - name: Name of the push source - schema: Schema mapping from the input feature name to a ValueType + A source that can be used to ingest features on request """ name: str @@ -529,7 +525,16 @@ def __init__( batch_source: DataSource, event_timestamp_column="timestamp", ): - """Creates a PushSource object.""" + """ + Creates a PushSource object. + Args: + name: Name of the push source + schema: Schema mapping from the input feature name to a ValueType + batch_source: The batch source that backs this push source. It's used when materializing from the offline + store to the online store, and when retrieving historical features. + event_timestamp_column (optional): Event timestamp column used for point in time + joins of feature values. + """ super().__init__(name) self.schema = schema self.batch_source = batch_source diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index a9047670cd9..aa76b2802d4 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -40,7 +40,7 @@ from feast import feature_server, flags, flags_helper, utils from feast.base_feature_view import BaseFeatureView -from feast.data_source import DataSource, PushSource +from feast.data_source import DataSource from feast.diff.infra_diff import InfraDiff, diff_infra_protos from feast.diff.registry_diff import RegistryDiff, apply_diff_to_registry, diff_between from feast.entity import Entity @@ -282,12 +282,13 @@ def list_data_sources(self, allow_cache: bool = False) -> List[DataSource]: return self._registry.list_data_sources(self.project, allow_cache=allow_cache) @log_exceptions_and_usage - def get_entity(self, name: str) -> Entity: + def get_entity(self, name: str, allow_registry_cache: bool = False) -> Entity: """ Retrieves an entity. Args: name: Name of entity. + allow_registry_cache: (Optional) Whether to allow returning this entity from a cached registry Returns: The specified entity. @@ -295,7 +296,9 @@ def get_entity(self, name: str) -> Entity: Raises: EntityNotFoundException: The entity could not be found. """ - return self._registry.get_entity(name, self.project) + return self._registry.get_entity( + name, self.project, allow_cache=allow_registry_cache + ) @log_exceptions_and_usage def get_feature_service( @@ -317,12 +320,15 @@ def get_feature_service( return self._registry.get_feature_service(name, self.project, allow_cache) @log_exceptions_and_usage - def get_feature_view(self, name: str) -> FeatureView: + def get_feature_view( + self, name: str, allow_registry_cache: bool = False + ) -> FeatureView: """ Retrieves a feature view. Args: name: Name of feature view. + allow_registry_cache: (Optional) Whether to allow returning this entity from a cached registry Returns: The specified feature view. @@ -330,12 +336,17 @@ def get_feature_view(self, name: str) -> FeatureView: Raises: FeatureViewNotFoundException: The feature view could not be found. """ - return self._get_feature_view(name) + return self._get_feature_view(name, allow_registry_cache=allow_registry_cache) def _get_feature_view( - self, name: str, hide_dummy_entity: bool = True + self, + name: str, + hide_dummy_entity: bool = True, + allow_registry_cache: bool = False, ) -> FeatureView: - feature_view = self._registry.get_feature_view(name, self.project) + feature_view = self._registry.get_feature_view( + name, self.project, allow_cache=allow_registry_cache + ) if hide_dummy_entity and feature_view.entities[0] == DUMMY_ENTITY_NAME: feature_view.entities = [] return feature_view @@ -1146,24 +1157,15 @@ def tqdm_builder(length): @log_exceptions_and_usage def push(self, push_source_name: str, df: pd.DataFrame): - data_sources = self.registry.list_data_sources(self.project, True) - push_source = { - ds - for ds in data_sources - if isinstance(ds, PushSource) and ds.name == push_source_name - } + push_source = self.get_data_source(push_source_name) - all_fvs = self.registry.list_feature_views(self.project, True) + all_fvs = self.list_feature_views(allow_cache=True) fvs_with_push_sources = { fv for fv in all_fvs if fv.stream_source in push_source } for fv in fvs_with_push_sources: - entities = [] - for entity_name in fv.entities: - entities.append(self._registry.get_entity(entity_name, self.project)) - provider = self._get_provider() - provider.ingest_df(fv, entities, df) + self.write_to_online_store(fv.name, df, allow_registry_cache=True) @log_exceptions_and_usage def write_to_online_store( @@ -1176,12 +1178,14 @@ def write_to_online_store( ingests data directly into the Online store """ # TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type - feature_view = self._registry.get_feature_view( - feature_view_name, self.project, allow_cache=allow_registry_cache + feature_view = self.get_feature_view( + feature_view_name, allow_registry_cache=allow_registry_cache ) entities = [] for entity_name in feature_view.entities: - entities.append(self._registry.get_entity(entity_name, self.project)) + entities.append( + self.get_entity(entity_name, allow_registry_cache=allow_registry_cache) + ) provider = self._get_provider() provider.ingest_df(feature_view, entities, df) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index d7523e6c6b8..4ca014d0df7 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -58,7 +58,9 @@ class FeatureView(BaseFeatureView): ttl: The amount of time this group of features lives. A ttl of 0 indicates that this group of features lives forever. Note that large ttl's or a ttl of 0 can result in extremely computationally intensive queries. - batch_source: The batch source of data where this group of features is stored. + batch_source (optional): The batch source of data where this group of features is stored. + This is optional ONLY a push source is specified as the stream_source, since push sources + contain their own batch sources. stream_source (optional): The stream source of data where this group of features is stored. features: The list of features defined as part of this feature view. @@ -122,18 +124,20 @@ def __init__( _features = features or [] if stream_source is not None and isinstance(stream_source, PushSource): - assert stream_source.batch_source is not None - assert isinstance(stream_source.batch_source, DataSource) + if stream_source.batch_source is None or not isinstance( + stream_source.batch_source, DataSource + ): + raise ValueError( + f"A batch_source needs to be specified for feature view `{name}`" + ) self.batch_source = stream_source.batch_source else: - assert batch_source is not None + if batch_source is None: + raise ValueError( + f"A batch_source needs to be specified for feature view `{name}`" + ) self.batch_source = batch_source - if not self.batch_source: - raise ValueError( - f"A batch_source needs to be specified for feature view `{name}`" - ) - cols = [entity for entity in entities] + [feat.name for feat in _features] for col in cols: if ( From 8bf8b8431c723c110d7cf4fbaeb989a1c27205fc Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Fri, 25 Mar 2022 12:55:51 -0700 Subject: [PATCH 4/5] Add universal test Signed-off-by: Achal Shah --- sdk/python/feast/feature_store.py | 11 ++++- .../feature_repos/repo_configuration.py | 3 ++ .../feature_repos/universal/feature_views.py | 21 ++++++++- .../test_push_online_retrieval.py | 44 +++++++++++++++++++ 4 files changed, 76 insertions(+), 3 deletions(-) create mode 100644 sdk/python/tests/integration/online_store/test_push_online_retrieval.py diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index aa76b2802d4..5fcdece6d52 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1157,11 +1157,18 @@ def tqdm_builder(length): @log_exceptions_and_usage def push(self, push_source_name: str, df: pd.DataFrame): - push_source = self.get_data_source(push_source_name) + from feast.data_source import PushSource all_fvs = self.list_feature_views(allow_cache=True) + fvs_with_push_sources = { - fv for fv in all_fvs if fv.stream_source in push_source + fv + for fv in all_fvs + if ( + fv.stream_source is not None + and isinstance(fv.stream_source, PushSource) + and fv.stream_source.name == push_source_name + ) } for fv in fvs_with_push_sources: diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index c271d08605a..9cbe11f86b9 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -43,6 +43,7 @@ create_global_stats_feature_view, create_location_stats_feature_view, create_order_feature_view, + create_pushable_feature_view, ) DYNAMO_CONFIG = {"type": "dynamodb", "region": "us-west-2"} @@ -263,6 +264,7 @@ class UniversalFeatureViews: order: FeatureView location: FeatureView field_mapping: FeatureView + pushed_locations: FeatureView def values(self): return dataclasses.asdict(self).values() @@ -288,6 +290,7 @@ def construct_universal_feature_views( order=create_order_feature_view(data_sources.orders), location=create_location_stats_feature_view(data_sources.location), field_mapping=create_field_mapping_feature_view(data_sources.field_mapping), + pushed_locations=create_pushable_feature_view(data_sources.location), ) diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index b0dc34197f3..1a9f1f18650 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -4,7 +4,7 @@ import numpy as np import pandas as pd -from feast import Feature, FeatureView, OnDemandFeatureView, ValueType +from feast import Feature, FeatureView, OnDemandFeatureView, PushSource, ValueType from feast.data_source import DataSource, RequestDataSource from feast.request_feature_view import RequestFeatureView @@ -227,3 +227,22 @@ def create_field_mapping_feature_view(source): batch_source=source, ttl=timedelta(days=2), ) + + +def create_pushable_feature_view(batch_source: DataSource): + push_source = PushSource( + name="location_stats_push_source", + schema={ + "location_id": ValueType.INT64, + "temperature": ValueType.INT32, + "timestamp": ValueType.UNIX_TIMESTAMP, + }, + batch_source=batch_source, + ) + return FeatureView( + name="pushable_location_stats", + entities=["location_id"], + features=[Feature(name="temperature", dtype=ValueType.INT32)], + ttl=timedelta(days=2), + stream_source=push_source, + ) diff --git a/sdk/python/tests/integration/online_store/test_push_online_retrieval.py b/sdk/python/tests/integration/online_store/test_push_online_retrieval.py new file mode 100644 index 00000000000..ed392b26c3c --- /dev/null +++ b/sdk/python/tests/integration/online_store/test_push_online_retrieval.py @@ -0,0 +1,44 @@ +import datetime + +import pandas as pd +import pytest + +from tests.integration.feature_repos.repo_configuration import ( + construct_universal_feature_views, +) +from tests.integration.feature_repos.universal.entities import ( + customer, + driver, + location, +) + + +@pytest.mark.integration +@pytest.mark.universal +def test_historical_features_with_missing_request_data( + environment, universal_data_sources +): + store = environment.feature_store + + (_, datasets, data_sources) = universal_data_sources + feature_views = construct_universal_feature_views(data_sources) + + store.apply([driver(), customer(), location(), *feature_views.values()]) + data = { + "location_id": [1], + "temperature": [4], + "event_timestamp": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")], + "created": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")], + } + df_ingest = pd.DataFrame(data) + + store.push("location_stats_push_source", df_ingest) + + online_resp = store.get_online_features( + features=["pushable_location_stats:temperature"], + entity_rows=[{"location_id": 1}], + ) + online_resp_dict = online_resp.to_dict() + print(online_resp_dict) + assert online_resp_dict["location_id"] == [1] + assert online_resp_dict["temperature"] == [4] From 0a42623034e73ddef0dfa11439c38f1ad63e6285 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Fri, 25 Mar 2022 13:38:00 -0700 Subject: [PATCH 5/5] cr Signed-off-by: Achal Shah --- sdk/python/feast/feature_store.py | 6 ++++++ .../integration/online_store/test_push_online_retrieval.py | 5 +---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 5fcdece6d52..4598a573d4a 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1157,6 +1157,12 @@ def tqdm_builder(length): @log_exceptions_and_usage def push(self, push_source_name: str, df: pd.DataFrame): + """ + Push features to a push source. This updates all the feature views that have the push source as stream source. + Args: + push_source_name: The name of the push source we want to push data to. + df: the data being pushed. + """ from feast.data_source import PushSource all_fvs = self.list_feature_views(allow_cache=True) diff --git a/sdk/python/tests/integration/online_store/test_push_online_retrieval.py b/sdk/python/tests/integration/online_store/test_push_online_retrieval.py index ed392b26c3c..9e9ec953c73 100644 --- a/sdk/python/tests/integration/online_store/test_push_online_retrieval.py +++ b/sdk/python/tests/integration/online_store/test_push_online_retrieval.py @@ -15,9 +15,7 @@ @pytest.mark.integration @pytest.mark.universal -def test_historical_features_with_missing_request_data( - environment, universal_data_sources -): +def test_push_features_and_read(environment, universal_data_sources): store = environment.feature_store (_, datasets, data_sources) = universal_data_sources @@ -39,6 +37,5 @@ def test_historical_features_with_missing_request_data( entity_rows=[{"location_id": 1}], ) online_resp_dict = online_resp.to_dict() - print(online_resp_dict) assert online_resp_dict["location_id"] == [1] assert online_resp_dict["temperature"] == [4]