From b7578ba42dff50de239ba8f5676d5c2a1143b1d8 Mon Sep 17 00:00:00 2001 From: qooba Date: Sat, 16 Oct 2021 21:28:23 +0000 Subject: [PATCH 01/39] add dask df Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 146 +++++++++++++----- .../infra/offline_stores/offline_store.py | 20 +++ sdk/python/feast/infra/provider.py | 9 +- 3 files changed, 132 insertions(+), 43 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index a49ce643d0b..b02268612fb 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -2,6 +2,7 @@ from typing import Callable, List, Optional, Tuple, Union import pandas as pd +import dask.dataframe as dd import pyarrow import pytz from pydantic.typing import Literal @@ -22,6 +23,7 @@ from feast.infra.provider import ( _get_requested_feature_views_to_features_dict, _run_field_mapping, + _run_dask_field_mapping ) from feast.registry import Registry from feast.repo_config import FeastConfigBaseModel, RepoConfig @@ -68,6 +70,12 @@ def _to_df_internal(self) -> pd.DataFrame: df = self.evaluation_function() return df + @log_exceptions_and_usage + def _to_dask_df_internal(self) -> dd.DataFrame: + # Only execute the evaluation function to build the final historical retrieval dataframe at the last moment. + df = self.evaluation_function(True) + return df + @log_exceptions_and_usage def _to_arrow_internal(self): # Only execute the evaluation function to build the final historical retrieval dataframe at the last moment. @@ -140,7 +148,7 @@ def get_historical_features( ) # Create lazy function that is only called from the RetrievalJob object - def evaluate_historical_retrieval(): + def evaluate_historical_retrieval(use_dask: bool = False): # Make sure all event timestamp fields are tz-aware. We default tz-naive fields to UTC entity_df[entity_df_event_timestamp_col] = entity_df[ @@ -170,45 +178,78 @@ def evaluate_historical_retrieval(): feature_view.batch_source.created_timestamp_column ) - # Read offline parquet data in pyarrow format. - filesystem, path = FileSource.create_filesystem_and_path( - feature_view.batch_source.path, - feature_view.batch_source.file_options.s3_endpoint_override, - ) - table = pyarrow.parquet.read_table(path, filesystem=filesystem) + if use_dask: + merge_asof = dd.merge_asof + storage_options = { + "client_kwargs": { + "endpoint_url": feature_view.batch_source.file_options.s3_endpoint_override + } + } if feature_view.batch_source.file_options.s3_endpoint_override else None - # Rename columns by the field mapping dictionary if it exists - if feature_view.batch_source.field_mapping is not None: - table = _run_field_mapping( - table, feature_view.batch_source.field_mapping + df_to_join = dd.read_parquet(feature_view.batch_source.path, storage_options=storage_options) + + # Rename entity columns by the join_key_map dictionary if it exists + if feature_view.projection.join_key_map: + _run_dask_field_mapping( + df_to_join, feature_view.projection.join_key_map + ) + + # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC + df_to_join[event_timestamp_column] = df_to_join[ + event_timestamp_column + ].apply( + lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), meta=(event_timestamp_column,'datetime64[ns, UTC]') ) - # Rename entity columns by the join_key_map dictionary if it exists - if feature_view.projection.join_key_map: - table = _run_field_mapping( - table, feature_view.projection.join_key_map + if created_timestamp_column: + df_to_join[created_timestamp_column] = df_to_join[ + created_timestamp_column + ].apply( + lambda x: x + if x.tzinfo is not None + else x.replace(tzinfo=pytz.utc), + meta=(event_timestamp_column,'datetime64[ns, UTC]') + + ) + + else: + merge_asof = pd.merge_asof + # Read offline parquet data in pyarrow format. + filesystem, path = FileSource.create_filesystem_and_path( + feature_view.batch_source.path, + feature_view.batch_source.file_options.s3_endpoint_override, ) - # Convert pyarrow table to pandas dataframe. Note, if the underlying data has missing values, - # pandas will convert those values to np.nan if the dtypes are numerical (floats, ints, etc.) or boolean - # If the dtype is 'object', then missing values are inferred as python `None`s. - # More details at: - # https://pandas.pydata.org/pandas-docs/stable/user_guide/missing_data.html#values-considered-missing - df_to_join = table.to_pandas() + table = pyarrow.parquet.read_table(path, filesystem=filesystem) - # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC - df_to_join[event_timestamp_column] = df_to_join[ - event_timestamp_column - ].apply( - lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc) - ) - if created_timestamp_column: - df_to_join[created_timestamp_column] = df_to_join[ - created_timestamp_column + # Rename entity columns by the join_key_map dictionary if it exists + if feature_view.projection.join_key_map: + table = _run_field_mapping( + table, feature_view.projection.join_key_map + ) + + # Convert pyarrow table to pandas dataframe. Note, if the underlying data has missing values, + # pandas will convert those values to np.nan if the dtypes are numerical (floats, ints, etc.) or boolean + # If the dtype is 'object', then missing values are inferred as python `None`s. + # More details at: + # https://pandas.pydata.org/pandas-docs/stable/user_guide/missing_data.html#values-considered-missing + df_to_join = table.to_pandas() + + + + # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC + df_to_join[event_timestamp_column] = df_to_join[ + event_timestamp_column ].apply( - lambda x: x - if x.tzinfo is not None - else x.replace(tzinfo=pytz.utc) + lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc) ) + if created_timestamp_column: + df_to_join[created_timestamp_column] = df_to_join[ + created_timestamp_column + ].apply( + lambda x: x + if x.tzinfo is not None + else x.replace(tzinfo=pytz.utc) + ) # Sort dataframe by the event timestamp column df_to_join = df_to_join.sort_values(event_timestamp_column) @@ -229,9 +270,15 @@ def evaluate_historical_retrieval(): feature_names.append(formatted_feature_name) # Ensure that the source dataframe feature column includes the feature view name as a prefix - df_to_join.rename( - columns={feature: formatted_feature_name}, inplace=True, - ) + if use_dask: + df_to_join=df_to_join.rename( + columns={feature: formatted_feature_name} + ) + else: + df_to_join.rename( + columns={feature: formatted_feature_name}, inplace=True, + ) + # Build a list of entity columns to join on (from the right table) join_keys = [] @@ -254,7 +301,11 @@ def evaluate_historical_retrieval(): created_timestamp_column ] - df_to_join.sort_values(by=right_entity_key_sort_columns, inplace=True) + if use_dask: + df_to_join=df_to_join.sort_values(by=event_timestamp_column) + else: + df_to_join.sort_values(by=right_entity_key_sort_columns, inplace=True) + df_to_join.drop_duplicates( right_entity_key_sort_columns, keep="last", @@ -266,7 +317,12 @@ def evaluate_historical_retrieval(): df_to_join = df_to_join[right_entity_key_columns + feature_names] # Do point in-time-join between entity_df and feature dataframe - entity_df_with_features = pd.merge_asof( + + if use_dask: + entity_df_with_features = dd.from_pandas(entity_df_with_features, npartitions=1).sort_values(entity_df_event_timestamp_col) + df_to_join = df_to_join.sort_values(event_timestamp_column) + + entity_df_with_features = merge_asof( entity_df_with_features, df_to_join, left_on=entity_df_event_timestamp_col, @@ -277,9 +333,17 @@ def evaluate_historical_retrieval(): # Remove right (feature table/view) event_timestamp column. if event_timestamp_column != entity_df_event_timestamp_col: - entity_df_with_features.drop( - columns=[event_timestamp_column], inplace=True - ) + if use_dask: + if event_timestamp_column in entity_df_with_features.columns: + entity_df_with_features.drop( + columns=[event_timestamp_column] + ) + else: + entity_df_with_features.drop( + columns=[event_timestamp_column], inplace=True + ) + + # Ensure that we delete dataframes to free up memory del df_to_join diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 1e5fe573774..58e240ce1f9 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -17,6 +17,7 @@ from typing import TYPE_CHECKING, List, Optional, Union import pandas as pd +import dask.dataframe as dd import pyarrow from feast.data_source import DataSource @@ -99,11 +100,30 @@ def to_df( return features_df + def to_dask_df(self) -> dd.DataFrame: + """Return dataset as Pandas DataFrame synchronously including on demand transforms""" + features_df = self._to_dask_df_internal() + if self.on_demand_feature_views is None: + return features_df + + for odfv in self.on_demand_feature_views: + features_df = features_df.join( + odfv.get_transformed_features_df(self.full_feature_names, features_df) + ) + return features_df + + @abstractmethod def _to_df_internal(self) -> pd.DataFrame: """Return dataset as Pandas DataFrame synchronously""" pass + @abstractmethod + def _to_dask_df_internal(self) -> dd.DataFrame: + """Return dataset as Dask DataFrame synchronously""" + pass + + @abstractmethod def _to_arrow_internal(self) -> pyarrow.Table: """Return dataset as pyarrow Table synchronously""" diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index a53030b74f9..d62b395709d 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -7,6 +7,7 @@ import pandas import pyarrow from tqdm import tqdm +import dask.dataframe as dd from feast import errors from feast.entity import Entity @@ -311,6 +312,12 @@ def _run_field_mapping( table = table.rename_columns(mapped_cols) return table +def _run_dask_field_mapping( + table: dd.DataFrame, field_mapping: Dict[str, str], +): + if field_mapping: + # run field mapping in the forward direction + table.rename(field_mapping) def _coerce_datetime(ts): """ @@ -321,13 +328,11 @@ def _coerce_datetime(ts): same way. We convert it to normal datetime so that consumers downstream don't have to deal with these quirks. """ - if isinstance(ts, pandas.Timestamp): return ts.to_pydatetime() else: return ts - def _convert_arrow_to_proto( table: Union[pyarrow.Table, pyarrow.RecordBatch], feature_view: FeatureView, From 8324f2c51b765dfa98d4f40209a45414956cace3 Mon Sep 17 00:00:00 2001 From: qooba Date: Sat, 16 Oct 2021 22:21:50 +0000 Subject: [PATCH 02/39] add dask Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index b02268612fb..89f5c7acab7 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -116,7 +116,7 @@ def get_historical_features( project: str, full_feature_names: bool = False, ) -> RetrievalJob: - if not isinstance(entity_df, pd.DataFrame): + if not isinstance(entity_df, pd.DataFrame) and not isinstance(entity_df, dd.DataFrame): raise ValueError( f"Please provide an entity_df of type {type(pd.DataFrame)} instead of type {type(entity_df)}" ) @@ -160,9 +160,14 @@ def evaluate_historical_retrieval(use_dask: bool = False): # Convert event timestamp column to datetime and normalize time zone to UTC # This is necessary to avoid issues with pd.merge_asof - entity_df_with_features[entity_df_event_timestamp_col] = pd.to_datetime( - entity_df_with_features[entity_df_event_timestamp_col], utc=True - ) + if isinstance(entity_df_with_features, dd.DataFrame): + entity_df_with_features[entity_df_event_timestamp_col] = dd.to_datetime( + entity_df_with_features[entity_df_event_timestamp_col], utc=True + ) + else: + entity_df_with_features[entity_df_event_timestamp_col] = pd.to_datetime( + entity_df_with_features[entity_df_event_timestamp_col], utc=True + ) # Sort event timestamp values entity_df_with_features = entity_df_with_features.sort_values( @@ -319,8 +324,11 @@ def evaluate_historical_retrieval(use_dask: bool = False): # Do point in-time-join between entity_df and feature dataframe if use_dask: - entity_df_with_features = dd.from_pandas(entity_df_with_features, npartitions=1).sort_values(entity_df_event_timestamp_col) + if not isinstance(entity_df_with_features, dd.DataFrame): + entity_df_with_features = dd.from_pandas(entity_df_with_features, npartitions=1) + df_to_join = df_to_join.sort_values(event_timestamp_column) + entity_df_with_features = entity_df_with_features.sort_values(entity_df_event_timestamp_col) entity_df_with_features = merge_asof( entity_df_with_features, From e50864f700def5ec4a02b3fd0e833fade89f0d07 Mon Sep 17 00:00:00 2001 From: qooba Date: Sat, 16 Oct 2021 23:45:49 +0000 Subject: [PATCH 03/39] add dask Signed-off-by: qooba --- sdk/python/feast/cli.py | 6 +- sdk/python/feast/feature_store.py | 5 +- sdk/python/feast/infra/offline_stores/file.py | 67 +++++++++++-------- .../infra/offline_stores/offline_store.py | 4 +- .../feast/infra/passthrough_provider.py | 3 +- sdk/python/feast/infra/provider.py | 1 + 6 files changed, 53 insertions(+), 33 deletions(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index f6d326410a3..66cfb960a28 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -420,12 +420,13 @@ def registry_dump_command(ctx: click.Context): @cli.command("materialize") @click.argument("start_ts") @click.argument("end_ts") +@click.argument("use_dask") @click.option( "--views", "-v", help="Feature views to materialize", multiple=True, ) @click.pass_context def materialize_command( - ctx: click.Context, start_ts: str, end_ts: str, views: List[str] + ctx: click.Context, start_ts: str, end_ts: str, views: List[str], use_dask: bool ): """ Run a (non-incremental) materialization job to ingest data into the online store. Feast @@ -442,11 +443,13 @@ def materialize_command( feature_views=None if not views else views, start_date=utils.make_tzaware(datetime.fromisoformat(start_ts)), end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)), + use_dask=use_dask, ) @cli.command("materialize-incremental") @click.argument("end_ts") +@click.argument("use_dask") @click.option( "--views", "-v", help="Feature views to incrementally materialize", multiple=True, ) @@ -466,6 +469,7 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List store.materialize_incremental( feature_views=None if not views else views, end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)), + use_dask=use_dask, ) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index fcd94f9bea8..d905d102ab3 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -914,7 +914,7 @@ def get_saved_dataset(self, name: str) -> SavedDataset: @log_exceptions_and_usage def materialize_incremental( - self, end_date: datetime, feature_views: Optional[List[str]] = None, + self, end_date: datetime, feature_views: Optional[List[str]] = None, use_dask: bool = False ) -> None: """ Materialize incremental new data from the offline store into the online store. @@ -998,6 +998,7 @@ def tqdm_builder(length): registry=self._registry, project=self.project, tqdm_builder=tqdm_builder, + use_dask=use_dask, ) self._registry.apply_materialization( @@ -1010,6 +1011,7 @@ def materialize( start_date: datetime, end_date: datetime, feature_views: Optional[List[str]] = None, + use_dask: bool = False ) -> None: """ Materialize data from the offline store into the online store. @@ -1085,6 +1087,7 @@ def tqdm_builder(length): registry=self._registry, project=self.project, tqdm_builder=tqdm_builder, + use_dask=use_dask, ) self._registry.apply_materialization( diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 89f5c7acab7..c4f549739d6 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -275,15 +275,9 @@ def evaluate_historical_retrieval(use_dask: bool = False): feature_names.append(formatted_feature_name) # Ensure that the source dataframe feature column includes the feature view name as a prefix - if use_dask: - df_to_join=df_to_join.rename( - columns={feature: formatted_feature_name} - ) - else: - df_to_join.rename( - columns={feature: formatted_feature_name}, inplace=True, - ) - + df_to_join=df_to_join.rename( + columns={feature: formatted_feature_name} + ) # Build a list of entity columns to join on (from the right table) join_keys = [] @@ -306,10 +300,7 @@ def evaluate_historical_retrieval(use_dask: bool = False): created_timestamp_column ] - if use_dask: - df_to_join=df_to_join.sort_values(by=event_timestamp_column) - else: - df_to_join.sort_values(by=right_entity_key_sort_columns, inplace=True) + df_to_join=df_to_join.sort_values(by=event_timestamp_column) df_to_join.drop_duplicates( right_entity_key_sort_columns, @@ -395,21 +386,42 @@ def pull_latest_from_table_or_query( assert isinstance(data_source, FileSource) # Create lazy function that is only called from the RetrievalJob object - def evaluate_offline_job(): - filesystem, path = FileSource.create_filesystem_and_path( - data_source.path, data_source.file_options.s3_endpoint_override - ) - source_df = pd.read_parquet(path, filesystem=filesystem) - # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC - source_df[event_timestamp_column] = source_df[event_timestamp_column].apply( - lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc) - ) - if created_timestamp_column: - source_df[created_timestamp_column] = source_df[ - created_timestamp_column - ].apply( + def evaluate_offline_job(use_dask: bool = False): + + if use_dask: + storage_options = { + "client_kwargs": { + "endpoint_url": data_source.file_options.s3_endpoint_override + } + } if data_source.file_options.s3_endpoint_override else None + + source_df = dd.read_parquet(path, storage_options=storage_options) + + source_df[event_timestamp_column] = source_df[event_timestamp_column].apply( + lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), meta=(event_timestamp_column,'datetime64[ns, UTC]' + ) + if created_timestamp_column: + source_df[created_timestamp_column] = source_df[ + created_timestamp_column + ].apply( + lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), meta=(event_timestamp_column,'datetime64[ns, UTC]' + ) + + else: + filesystem, path = FileSource.create_filesystem_and_path( + data_source.path, data_source.file_options.s3_endpoint_override + ) + source_df = pd.read_parquet(path, filesystem=filesystem) + # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC + source_df[event_timestamp_column] = source_df[event_timestamp_column].apply( lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc) ) + if created_timestamp_column: + source_df[created_timestamp_column] = source_df[ + created_timestamp_column + ].apply( + lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc) + ) source_columns = set(source_df.columns) if not set(join_key_columns).issubset(source_columns): @@ -423,7 +435,8 @@ def evaluate_offline_job(): else [event_timestamp_column] ) - source_df.sort_values(by=ts_columns, inplace=True) + + source_df=source_df.sort_values(by=ts_columns) filtered_df = source_df[ (source_df[event_timestamp_column] >= start_date) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 58e240ce1f9..7e99cb26bb5 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -112,7 +112,6 @@ def to_dask_df(self) -> dd.DataFrame: ) return features_df - @abstractmethod def _to_df_internal(self) -> pd.DataFrame: """Return dataset as Pandas DataFrame synchronously""" @@ -121,8 +120,7 @@ def _to_df_internal(self) -> pd.DataFrame: @abstractmethod def _to_dask_df_internal(self) -> dd.DataFrame: """Return dataset as Dask DataFrame synchronously""" - pass - + raise NotImplementedError("Dask is currently not supported for this provider") @abstractmethod def _to_arrow_internal(self) -> pyarrow.Table: diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 3468b9dc927..f110d7f68fc 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -126,6 +126,7 @@ def materialize_single_feature_view( registry: Registry, project: str, tqdm_builder: Callable[[int], tqdm], + use_dask: bool = False, ) -> None: set_usage_attribute("provider", self.__class__.__name__) @@ -151,7 +152,7 @@ def materialize_single_feature_view( end_date=end_date, ) - table = offline_job.to_arrow() + table = offline_job.to_arrow(use_dask) if feature_view.batch_source.field_mapping is not None: table = _run_field_mapping(table, feature_view.batch_source.field_mapping) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index d62b395709d..17941cce4b5 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -136,6 +136,7 @@ def materialize_single_feature_view( registry: Registry, project: str, tqdm_builder: Callable[[int], tqdm], + use_dask: bool = False, ) -> None: pass From 60921e15617bf867e1e7f6b883fa30ba46d62ae8 Mon Sep 17 00:00:00 2001 From: qooba Date: Sun, 17 Oct 2021 13:01:22 +0000 Subject: [PATCH 04/39] add dask Signed-off-by: qooba --- sdk/python/feast/cli.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 66cfb960a28..f6d326410a3 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -420,13 +420,12 @@ def registry_dump_command(ctx: click.Context): @cli.command("materialize") @click.argument("start_ts") @click.argument("end_ts") -@click.argument("use_dask") @click.option( "--views", "-v", help="Feature views to materialize", multiple=True, ) @click.pass_context def materialize_command( - ctx: click.Context, start_ts: str, end_ts: str, views: List[str], use_dask: bool + ctx: click.Context, start_ts: str, end_ts: str, views: List[str] ): """ Run a (non-incremental) materialization job to ingest data into the online store. Feast @@ -443,13 +442,11 @@ def materialize_command( feature_views=None if not views else views, start_date=utils.make_tzaware(datetime.fromisoformat(start_ts)), end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)), - use_dask=use_dask, ) @cli.command("materialize-incremental") @click.argument("end_ts") -@click.argument("use_dask") @click.option( "--views", "-v", help="Feature views to incrementally materialize", multiple=True, ) @@ -469,7 +466,6 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List store.materialize_incremental( feature_views=None if not views else views, end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)), - use_dask=use_dask, ) From 34313f785540ded5504f0b5df52ed726fa825334 Mon Sep 17 00:00:00 2001 From: qooba Date: Sun, 17 Oct 2021 17:25:22 +0000 Subject: [PATCH 05/39] add dask Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 12 +++++------- .../feast/infra/offline_stores/offline_store.py | 1 - 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index c4f549739d6..3bf52522fdd 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -103,7 +103,6 @@ def persist(self, storage: SavedDatasetStorage): def metadata(self) -> Optional[RetrievalMetadata]: return self._metadata - class FileOfflineStore(OfflineStore): @staticmethod @log_exceptions_and_usage(offline_store="file") @@ -395,16 +394,16 @@ def evaluate_offline_job(use_dask: bool = False): } } if data_source.file_options.s3_endpoint_override else None - source_df = dd.read_parquet(path, storage_options=storage_options) + source_df = dd.read_parquet(data_source.path, storage_options=storage_options) source_df[event_timestamp_column] = source_df[event_timestamp_column].apply( - lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), meta=(event_timestamp_column,'datetime64[ns, UTC]' + lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), meta=(event_timestamp_column,'datetime64[ns, UTC]') ) if created_timestamp_column: source_df[created_timestamp_column] = source_df[ created_timestamp_column ].apply( - lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), meta=(event_timestamp_column,'datetime64[ns, UTC]' + lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), meta=(event_timestamp_column,'datetime64[ns, UTC]') ) else: @@ -435,8 +434,7 @@ def evaluate_offline_job(use_dask: bool = False): else [event_timestamp_column] ) - - source_df=source_df.sort_values(by=ts_columns) + source_df=source_df.sort_values(by=ts_columns[0] if use_dask else ts_columns) filtered_df = source_df[ (source_df[event_timestamp_column] >= start_date) @@ -455,7 +453,7 @@ def evaluate_offline_job(use_dask: bool = False): last_values_df[DUMMY_ENTITY_ID] = DUMMY_ENTITY_VAL columns_to_extract.add(DUMMY_ENTITY_ID) - return last_values_df[columns_to_extract] + return last_values_df[list(columns_to_extract) if use_dask else columns_to_extract] # When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized return FileRetrievalJob( diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 7e99cb26bb5..d0324d26a4b 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -117,7 +117,6 @@ def _to_df_internal(self) -> pd.DataFrame: """Return dataset as Pandas DataFrame synchronously""" pass - @abstractmethod def _to_dask_df_internal(self) -> dd.DataFrame: """Return dataset as Dask DataFrame synchronously""" raise NotImplementedError("Dask is currently not supported for this provider") From 06ec2901e4f733b1cd54d63e9f6db40483edf50c Mon Sep 17 00:00:00 2001 From: qooba Date: Sun, 17 Oct 2021 21:29:33 +0000 Subject: [PATCH 06/39] add dask Signed-off-by: qooba --- sdk/python/feast/feature_store.py | 7 +- sdk/python/feast/infra/offline_stores/file.py | 101 ++++++++++++------ .../infra/offline_stores/offline_store.py | 2 +- sdk/python/feast/infra/provider.py | 2 +- 4 files changed, 73 insertions(+), 39 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index d905d102ab3..ddd11eb7bb1 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -914,7 +914,10 @@ def get_saved_dataset(self, name: str) -> SavedDataset: @log_exceptions_and_usage def materialize_incremental( - self, end_date: datetime, feature_views: Optional[List[str]] = None, use_dask: bool = False + self, + end_date: datetime, + feature_views: Optional[List[str]] = None, + use_dask: bool = False, ) -> None: """ Materialize incremental new data from the offline store into the online store. @@ -1011,7 +1014,7 @@ def materialize( start_date: datetime, end_date: datetime, feature_views: Optional[List[str]] = None, - use_dask: bool = False + use_dask: bool = False, ) -> None: """ Materialize data from the offline store into the online store. diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 3bf52522fdd..a4bea8c8a94 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -1,8 +1,8 @@ from datetime import datetime from typing import Callable, List, Optional, Tuple, Union -import pandas as pd import dask.dataframe as dd +import pandas as pd import pyarrow import pytz from pydantic.typing import Literal @@ -22,8 +22,8 @@ ) from feast.infra.provider import ( _get_requested_feature_views_to_features_dict, + _run_dask_field_mapping, _run_field_mapping, - _run_dask_field_mapping ) from feast.registry import Registry from feast.repo_config import FeastConfigBaseModel, RepoConfig @@ -115,7 +115,9 @@ def get_historical_features( project: str, full_feature_names: bool = False, ) -> RetrievalJob: - if not isinstance(entity_df, pd.DataFrame) and not isinstance(entity_df, dd.DataFrame): + if not isinstance(entity_df, pd.DataFrame) and not isinstance( + entity_df, dd.DataFrame + ): raise ValueError( f"Please provide an entity_df of type {type(pd.DataFrame)} instead of type {type(entity_df)}" ) @@ -184,25 +186,33 @@ def evaluate_historical_retrieval(use_dask: bool = False): if use_dask: merge_asof = dd.merge_asof - storage_options = { - "client_kwargs": { - "endpoint_url": feature_view.batch_source.file_options.s3_endpoint_override + storage_options = ( + { + "client_kwargs": { + "endpoint_url": feature_view.batch_source.file_options.s3_endpoint_override + } } - } if feature_view.batch_source.file_options.s3_endpoint_override else None + if feature_view.batch_source.file_options.s3_endpoint_override + else None + ) - df_to_join = dd.read_parquet(feature_view.batch_source.path, storage_options=storage_options) + df_to_join = dd.read_parquet( + feature_view.batch_source.path, storage_options=storage_options + ) - # Rename entity columns by the join_key_map dictionary if it exists - if feature_view.projection.join_key_map: + if feature_view.batch_source.field_mapping is not None: _run_dask_field_mapping( - df_to_join, feature_view.projection.join_key_map + df_to_join, feature_view.batch_source.field_mapping ) # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC df_to_join[event_timestamp_column] = df_to_join[ event_timestamp_column ].apply( - lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), meta=(event_timestamp_column,'datetime64[ns, UTC]') + lambda x: x + if x.tzinfo is not None + else x.replace(tzinfo=pytz.utc), + meta=(event_timestamp_column, "datetime64[ns, UTC]"), ) if created_timestamp_column: df_to_join[created_timestamp_column] = df_to_join[ @@ -211,8 +221,7 @@ def evaluate_historical_retrieval(use_dask: bool = False): lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), - meta=(event_timestamp_column,'datetime64[ns, UTC]') - + meta=(event_timestamp_column, "datetime64[ns, UTC]"), ) else: @@ -238,13 +247,13 @@ def evaluate_historical_retrieval(use_dask: bool = False): # https://pandas.pydata.org/pandas-docs/stable/user_guide/missing_data.html#values-considered-missing df_to_join = table.to_pandas() - - # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC df_to_join[event_timestamp_column] = df_to_join[ event_timestamp_column ].apply( - lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc) + lambda x: x + if x.tzinfo is not None + else x.replace(tzinfo=pytz.utc) ) if created_timestamp_column: df_to_join[created_timestamp_column] = df_to_join[ @@ -274,7 +283,7 @@ def evaluate_historical_retrieval(use_dask: bool = False): feature_names.append(formatted_feature_name) # Ensure that the source dataframe feature column includes the feature view name as a prefix - df_to_join=df_to_join.rename( + df_to_join = df_to_join.rename( columns={feature: formatted_feature_name} ) @@ -299,7 +308,7 @@ def evaluate_historical_retrieval(use_dask: bool = False): created_timestamp_column ] - df_to_join=df_to_join.sort_values(by=event_timestamp_column) + df_to_join = df_to_join.sort_values(by=event_timestamp_column) df_to_join.drop_duplicates( right_entity_key_sort_columns, @@ -315,10 +324,14 @@ def evaluate_historical_retrieval(use_dask: bool = False): if use_dask: if not isinstance(entity_df_with_features, dd.DataFrame): - entity_df_with_features = dd.from_pandas(entity_df_with_features, npartitions=1) + entity_df_with_features = dd.from_pandas( + entity_df_with_features, npartitions=1 + ) df_to_join = df_to_join.sort_values(event_timestamp_column) - entity_df_with_features = entity_df_with_features.sort_values(entity_df_event_timestamp_col) + entity_df_with_features = entity_df_with_features.sort_values( + entity_df_event_timestamp_col + ) entity_df_with_features = merge_asof( entity_df_with_features, @@ -341,8 +354,6 @@ def evaluate_historical_retrieval(use_dask: bool = False): columns=[event_timestamp_column], inplace=True ) - - # Ensure that we delete dataframes to free up memory del df_to_join @@ -388,22 +399,34 @@ def pull_latest_from_table_or_query( def evaluate_offline_job(use_dask: bool = False): if use_dask: - storage_options = { - "client_kwargs": { - "endpoint_url": data_source.file_options.s3_endpoint_override + storage_options = ( + { + "client_kwargs": { + "endpoint_url": data_source.file_options.s3_endpoint_override + } } - } if data_source.file_options.s3_endpoint_override else None + if data_source.file_options.s3_endpoint_override + else None + ) - source_df = dd.read_parquet(data_source.path, storage_options=storage_options) + source_df = dd.read_parquet( + data_source.path, storage_options=storage_options + ) - source_df[event_timestamp_column] = source_df[event_timestamp_column].apply( - lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), meta=(event_timestamp_column,'datetime64[ns, UTC]') + source_df[event_timestamp_column] = source_df[ + event_timestamp_column + ].apply( + lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), + meta=(event_timestamp_column, "datetime64[ns, UTC]"), ) if created_timestamp_column: source_df[created_timestamp_column] = source_df[ created_timestamp_column ].apply( - lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), meta=(event_timestamp_column,'datetime64[ns, UTC]') + lambda x: x + if x.tzinfo is not None + else x.replace(tzinfo=pytz.utc), + meta=(event_timestamp_column, "datetime64[ns, UTC]"), ) else: @@ -412,14 +435,18 @@ def evaluate_offline_job(use_dask: bool = False): ) source_df = pd.read_parquet(path, filesystem=filesystem) # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC - source_df[event_timestamp_column] = source_df[event_timestamp_column].apply( + source_df[event_timestamp_column] = source_df[ + event_timestamp_column + ].apply( lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc) ) if created_timestamp_column: source_df[created_timestamp_column] = source_df[ created_timestamp_column ].apply( - lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc) + lambda x: x + if x.tzinfo is not None + else x.replace(tzinfo=pytz.utc) ) source_columns = set(source_df.columns) @@ -434,7 +461,9 @@ def evaluate_offline_job(use_dask: bool = False): else [event_timestamp_column] ) - source_df=source_df.sort_values(by=ts_columns[0] if use_dask else ts_columns) + source_df = source_df.sort_values( + by=ts_columns[0] if use_dask else ts_columns + ) filtered_df = source_df[ (source_df[event_timestamp_column] >= start_date) @@ -453,7 +482,9 @@ def evaluate_offline_job(use_dask: bool = False): last_values_df[DUMMY_ENTITY_ID] = DUMMY_ENTITY_VAL columns_to_extract.add(DUMMY_ENTITY_ID) - return last_values_df[list(columns_to_extract) if use_dask else columns_to_extract] + return last_values_df[ + list(columns_to_extract) if use_dask else columns_to_extract + ] # When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized return FileRetrievalJob( diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index d0324d26a4b..cb3cbb74f3e 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -16,8 +16,8 @@ from datetime import datetime from typing import TYPE_CHECKING, List, Optional, Union -import pandas as pd import dask.dataframe as dd +import pandas as pd import pyarrow from feast.data_source import DataSource diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 17941cce4b5..4a66e0f6004 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -4,10 +4,10 @@ from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union +import dask.dataframe as dd import pandas import pyarrow from tqdm import tqdm -import dask.dataframe as dd from feast import errors from feast.entity import Entity From ce6899d1410db1bb7f0249cff050accfa1483dd7 Mon Sep 17 00:00:00 2001 From: qooba Date: Sun, 17 Oct 2021 21:46:40 +0000 Subject: [PATCH 07/39] add dask Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index a4bea8c8a94..cef2d089f5e 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -200,10 +200,16 @@ def evaluate_historical_retrieval(use_dask: bool = False): feature_view.batch_source.path, storage_options=storage_options ) + # Rename columns by the field mapping dictionary if it exists if feature_view.batch_source.field_mapping is not None: _run_dask_field_mapping( df_to_join, feature_view.batch_source.field_mapping ) + # Rename entity columns by the join_key_map dictionary if it exists + if feature_view.projection.join_key_map: + _run_dask_field_mapping( + df_to_join, feature_view.projection.join_key_map + ) # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC df_to_join[event_timestamp_column] = df_to_join[ @@ -234,6 +240,11 @@ def evaluate_historical_retrieval(use_dask: bool = False): table = pyarrow.parquet.read_table(path, filesystem=filesystem) + # Rename columns by the field mapping dictionary if it exists + if feature_view.batch_source.field_mapping is not None: + table = _run_field_mapping( + table, feature_view.batch_source.field_mapping + ) # Rename entity columns by the join_key_map dictionary if it exists if feature_view.projection.join_key_map: table = _run_field_mapping( From f0808d9c19e287356d261bfffb2d5b8c0be90566 Mon Sep 17 00:00:00 2001 From: qooba Date: Sun, 17 Oct 2021 23:17:31 +0000 Subject: [PATCH 08/39] add dask Signed-off-by: qooba --- sdk/python/tests/foo_provider.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index 1d4ce7d6cb6..c34e7e89c3d 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -53,6 +53,7 @@ def materialize_single_feature_view( registry: Registry, project: str, tqdm_builder: Callable[[int], tqdm], + use_dask: bool = False, ) -> None: pass From 616bdb7767ba609a2282f460483454393be2c33b Mon Sep 17 00:00:00 2001 From: qooba Date: Mon, 18 Oct 2021 20:44:29 +0000 Subject: [PATCH 09/39] add dask Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index cef2d089f5e..fab8fb9356d 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -149,7 +149,7 @@ def get_historical_features( ) # Create lazy function that is only called from the RetrievalJob object - def evaluate_historical_retrieval(use_dask: bool = False): + def evaluate_historical_retrieval(use_dask=False): # Make sure all event timestamp fields are tz-aware. We default tz-naive fields to UTC entity_df[entity_df_event_timestamp_col] = entity_df[ @@ -407,7 +407,7 @@ def pull_latest_from_table_or_query( assert isinstance(data_source, FileSource) # Create lazy function that is only called from the RetrievalJob object - def evaluate_offline_job(use_dask: bool = False): + def evaluate_offline_job(use_dask=False): if use_dask: storage_options = ( From 6424ab6166e2413a2cf7d930a655208d104a0bc2 Mon Sep 17 00:00:00 2001 From: qooba Date: Mon, 18 Oct 2021 21:40:05 +0000 Subject: [PATCH 10/39] add das Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index fab8fb9356d..8904e851dfa 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -354,16 +354,12 @@ def evaluate_historical_retrieval(use_dask=False): ) # Remove right (feature table/view) event_timestamp column. - if event_timestamp_column != entity_df_event_timestamp_col: - if use_dask: - if event_timestamp_column in entity_df_with_features.columns: - entity_df_with_features.drop( - columns=[event_timestamp_column] - ) - else: - entity_df_with_features.drop( - columns=[event_timestamp_column], inplace=True - ) + entity_df_with_features = ( + entity_df_with_features.drop(columns=[event_timestamp_column]) + if event_timestamp_column != entity_df_event_timestamp_col + and event_timestamp_column in entity_df_with_features.columns + else entity_df_with_features + ) # Ensure that we delete dataframes to free up memory del df_to_join From 18485b50106d45b27e11d2a810a097de35b518ce Mon Sep 17 00:00:00 2001 From: qooba Date: Mon, 18 Oct 2021 21:48:47 +0000 Subject: [PATCH 11/39] add dask Signed-off-by: qooba --- sdk/python/setup.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 7535987f833..4a48d300796 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -81,6 +81,10 @@ "hiredis>=2.0.0", ] +DASK_REQUIRED = [ + "dask==2021.9.1", +] + AWS_REQUIRED = [ "boto3>=1.17.0", "docker>=5.0.2", @@ -243,6 +247,7 @@ def run(self): "redis": REDIS_REQUIRED, "snowflake": SNOWFLAKE_REQUIRED, "ge": GE_REQUIRED, + "dask": DASK_REQUIRED, }, include_package_data=True, license="Apache", From 1bc2ab30d3bd77edd396080cb4e332a626defe1c Mon Sep 17 00:00:00 2001 From: qooba Date: Wed, 27 Oct 2021 23:25:24 +0000 Subject: [PATCH 12/39] add dask new Signed-off-by: qooba --- sdk/python/feast/feature_store.py | 8 +- sdk/python/feast/infra/offline_stores/file.py | 232 ++++++------------ .../infra/offline_stores/offline_store.py | 16 -- .../feast/infra/passthrough_provider.py | 3 +- sdk/python/feast/infra/provider.py | 1 - sdk/python/tests/foo_provider.py | 1 - 6 files changed, 83 insertions(+), 178 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index ddd11eb7bb1..fcd94f9bea8 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -914,10 +914,7 @@ def get_saved_dataset(self, name: str) -> SavedDataset: @log_exceptions_and_usage def materialize_incremental( - self, - end_date: datetime, - feature_views: Optional[List[str]] = None, - use_dask: bool = False, + self, end_date: datetime, feature_views: Optional[List[str]] = None, ) -> None: """ Materialize incremental new data from the offline store into the online store. @@ -1001,7 +998,6 @@ def tqdm_builder(length): registry=self._registry, project=self.project, tqdm_builder=tqdm_builder, - use_dask=use_dask, ) self._registry.apply_materialization( @@ -1014,7 +1010,6 @@ def materialize( start_date: datetime, end_date: datetime, feature_views: Optional[List[str]] = None, - use_dask: bool = False, ) -> None: """ Materialize data from the offline store into the online store. @@ -1090,7 +1085,6 @@ def tqdm_builder(length): registry=self._registry, project=self.project, tqdm_builder=tqdm_builder, - use_dask=use_dask, ) self._registry.apply_materialization( diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 8904e851dfa..a6e50a33c45 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -149,7 +149,7 @@ def get_historical_features( ) # Create lazy function that is only called from the RetrievalJob object - def evaluate_historical_retrieval(use_dask=False): + def evaluate_historical_retrieval(): # Make sure all event timestamp fields are tz-aware. We default tz-naive fields to UTC entity_df[entity_df_event_timestamp_col] = entity_df[ @@ -177,6 +177,10 @@ def evaluate_historical_retrieval(use_dask=False): # Load feature view data from sources and join them incrementally for feature_view, features in feature_views_to_features.items(): + + entity_name = feature_view.entities[0] + entity_df_with_features.set_index(entity_name) + event_timestamp_column = ( feature_view.batch_source.event_timestamp_column ) @@ -184,102 +188,59 @@ def evaluate_historical_retrieval(use_dask=False): feature_view.batch_source.created_timestamp_column ) - if use_dask: - merge_asof = dd.merge_asof - storage_options = ( - { - "client_kwargs": { - "endpoint_url": feature_view.batch_source.file_options.s3_endpoint_override - } + storage_options = ( + { + "client_kwargs": { + "endpoint_url": feature_view.batch_source.file_options.s3_endpoint_override } - if feature_view.batch_source.file_options.s3_endpoint_override - else None - ) + } + if feature_view.batch_source.file_options.s3_endpoint_override + else None + ) - df_to_join = dd.read_parquet( - feature_view.batch_source.path, storage_options=storage_options - ) + df_to_join = dd.read_parquet( + feature_view.batch_source.path, + storage_options=storage_options, + index=[entity_name], + ) - # Rename columns by the field mapping dictionary if it exists - if feature_view.batch_source.field_mapping is not None: - _run_dask_field_mapping( - df_to_join, feature_view.batch_source.field_mapping - ) - # Rename entity columns by the join_key_map dictionary if it exists - if feature_view.projection.join_key_map: - _run_dask_field_mapping( - df_to_join, feature_view.projection.join_key_map - ) + # Get only data with requested entities + df_to_join = dd.merge( + df_to_join, entity_df_with_features[[entity_name]] + ) + df_to_join = df_to_join.persist() - # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC - df_to_join[event_timestamp_column] = df_to_join[ - event_timestamp_column - ].apply( - lambda x: x - if x.tzinfo is not None - else x.replace(tzinfo=pytz.utc), - meta=(event_timestamp_column, "datetime64[ns, UTC]"), + # Rename columns by the field mapping dictionary if it exists + if feature_view.batch_source.field_mapping is not None: + _run_dask_field_mapping( + df_to_join, feature_view.batch_source.field_mapping ) - if created_timestamp_column: - df_to_join[created_timestamp_column] = df_to_join[ - created_timestamp_column - ].apply( - lambda x: x - if x.tzinfo is not None - else x.replace(tzinfo=pytz.utc), - meta=(event_timestamp_column, "datetime64[ns, UTC]"), - ) - - else: - merge_asof = pd.merge_asof - # Read offline parquet data in pyarrow format. - filesystem, path = FileSource.create_filesystem_and_path( - feature_view.batch_source.path, - feature_view.batch_source.file_options.s3_endpoint_override, + # Rename entity columns by the join_key_map dictionary if it exists + if feature_view.projection.join_key_map: + _run_dask_field_mapping( + df_to_join, feature_view.projection.join_key_map ) - table = pyarrow.parquet.read_table(path, filesystem=filesystem) - - # Rename columns by the field mapping dictionary if it exists - if feature_view.batch_source.field_mapping is not None: - table = _run_field_mapping( - table, feature_view.batch_source.field_mapping - ) - # Rename entity columns by the join_key_map dictionary if it exists - if feature_view.projection.join_key_map: - table = _run_field_mapping( - table, feature_view.projection.join_key_map - ) - - # Convert pyarrow table to pandas dataframe. Note, if the underlying data has missing values, - # pandas will convert those values to np.nan if the dtypes are numerical (floats, ints, etc.) or boolean - # If the dtype is 'object', then missing values are inferred as python `None`s. - # More details at: - # https://pandas.pydata.org/pandas-docs/stable/user_guide/missing_data.html#values-considered-missing - df_to_join = table.to_pandas() - - # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC - df_to_join[event_timestamp_column] = df_to_join[ - event_timestamp_column + # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC + df_to_join[event_timestamp_column] = df_to_join[ + event_timestamp_column + ].apply( + lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), + meta=(event_timestamp_column, "datetime64[ns, UTC]"), + ) + if created_timestamp_column: + df_to_join[created_timestamp_column] = df_to_join[ + created_timestamp_column ].apply( lambda x: x if x.tzinfo is not None - else x.replace(tzinfo=pytz.utc) + else x.replace(tzinfo=pytz.utc), + meta=(event_timestamp_column, "datetime64[ns, UTC]"), ) - if created_timestamp_column: - df_to_join[created_timestamp_column] = df_to_join[ - created_timestamp_column - ].apply( - lambda x: x - if x.tzinfo is not None - else x.replace(tzinfo=pytz.utc) - ) - - # Sort dataframe by the event timestamp column - df_to_join = df_to_join.sort_values(event_timestamp_column) # Build a list of all the features we should select from this source feature_names = [] + columns_map = {} for feature in features: # Modify the separator for feature refs in column names to double underscore. We are using # double underscore as separator for consistency with other databases like BigQuery, @@ -294,9 +255,8 @@ def evaluate_historical_retrieval(use_dask=False): feature_names.append(formatted_feature_name) # Ensure that the source dataframe feature column includes the feature view name as a prefix - df_to_join = df_to_join.rename( - columns={feature: formatted_feature_name} - ) + df_to_join = df_to_join.rename(columns=columns_map) + df_to_join = df_to_join.persist() # Build a list of entity columns to join on (from the right table) join_keys = [] @@ -320,31 +280,30 @@ def evaluate_historical_retrieval(use_dask=False): ] df_to_join = df_to_join.sort_values(by=event_timestamp_column) + df_to_join = df_to_join.persist() - df_to_join.drop_duplicates( - right_entity_key_sort_columns, - keep="last", - ignore_index=True, - inplace=True, + df_to_join = df_to_join.drop_duplicates( + right_entity_key_sort_columns, keep="last", ignore_index=True, ) + df_to_join = df_to_join.persist() # Select only the columns we need to join from the feature dataframe df_to_join = df_to_join[right_entity_key_columns + feature_names] + df_to_join = df_to_join.persist() # Do point in-time-join between entity_df and feature dataframe - if use_dask: - if not isinstance(entity_df_with_features, dd.DataFrame): - entity_df_with_features = dd.from_pandas( - entity_df_with_features, npartitions=1 - ) - - df_to_join = df_to_join.sort_values(event_timestamp_column) - entity_df_with_features = entity_df_with_features.sort_values( - entity_df_event_timestamp_col + if not isinstance(entity_df_with_features, dd.DataFrame): + entity_df_with_features = dd.from_pandas( + entity_df_with_features, npartitions=1 ) - entity_df_with_features = merge_asof( + # df_to_join = df_to_join.sort_values(event_timestamp_column) + entity_df_with_features = entity_df_with_features.sort_values( + entity_df_event_timestamp_col + ) + + entity_df_with_features = dd.merge_asof( entity_df_with_features, df_to_join, left_on=entity_df_event_timestamp_col, @@ -403,58 +362,33 @@ def pull_latest_from_table_or_query( assert isinstance(data_source, FileSource) # Create lazy function that is only called from the RetrievalJob object - def evaluate_offline_job(use_dask=False): + def evaluate_offline_job(): - if use_dask: - storage_options = ( - { - "client_kwargs": { - "endpoint_url": data_source.file_options.s3_endpoint_override - } + storage_options = ( + { + "client_kwargs": { + "endpoint_url": data_source.file_options.s3_endpoint_override } - if data_source.file_options.s3_endpoint_override - else None - ) + } + if data_source.file_options.s3_endpoint_override + else None + ) - source_df = dd.read_parquet( - data_source.path, storage_options=storage_options - ) + source_df = dd.read_parquet( + data_source.path, storage_options=storage_options + ) - source_df[event_timestamp_column] = source_df[ - event_timestamp_column + source_df[event_timestamp_column] = source_df[event_timestamp_column].apply( + lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), + meta=(event_timestamp_column, "datetime64[ns, UTC]"), + ) + if created_timestamp_column: + source_df[created_timestamp_column] = source_df[ + created_timestamp_column ].apply( lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), meta=(event_timestamp_column, "datetime64[ns, UTC]"), ) - if created_timestamp_column: - source_df[created_timestamp_column] = source_df[ - created_timestamp_column - ].apply( - lambda x: x - if x.tzinfo is not None - else x.replace(tzinfo=pytz.utc), - meta=(event_timestamp_column, "datetime64[ns, UTC]"), - ) - - else: - filesystem, path = FileSource.create_filesystem_and_path( - data_source.path, data_source.file_options.s3_endpoint_override - ) - source_df = pd.read_parquet(path, filesystem=filesystem) - # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC - source_df[event_timestamp_column] = source_df[ - event_timestamp_column - ].apply( - lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc) - ) - if created_timestamp_column: - source_df[created_timestamp_column] = source_df[ - created_timestamp_column - ].apply( - lambda x: x - if x.tzinfo is not None - else x.replace(tzinfo=pytz.utc) - ) source_columns = set(source_df.columns) if not set(join_key_columns).issubset(source_columns): @@ -468,9 +402,7 @@ def evaluate_offline_job(use_dask=False): else [event_timestamp_column] ) - source_df = source_df.sort_values( - by=ts_columns[0] if use_dask else ts_columns - ) + source_df = source_df.sort_values(by=ts_columns[0]) filtered_df = source_df[ (source_df[event_timestamp_column] >= start_date) @@ -489,9 +421,7 @@ def evaluate_offline_job(use_dask=False): last_values_df[DUMMY_ENTITY_ID] = DUMMY_ENTITY_VAL columns_to_extract.add(DUMMY_ENTITY_ID) - return last_values_df[ - list(columns_to_extract) if use_dask else columns_to_extract - ] + return last_values_df[list(columns_to_extract)] # When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized return FileRetrievalJob( diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index cb3cbb74f3e..8385bd849fe 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -100,27 +100,11 @@ def to_df( return features_df - def to_dask_df(self) -> dd.DataFrame: - """Return dataset as Pandas DataFrame synchronously including on demand transforms""" - features_df = self._to_dask_df_internal() - if self.on_demand_feature_views is None: - return features_df - - for odfv in self.on_demand_feature_views: - features_df = features_df.join( - odfv.get_transformed_features_df(self.full_feature_names, features_df) - ) - return features_df - @abstractmethod def _to_df_internal(self) -> pd.DataFrame: """Return dataset as Pandas DataFrame synchronously""" pass - def _to_dask_df_internal(self) -> dd.DataFrame: - """Return dataset as Dask DataFrame synchronously""" - raise NotImplementedError("Dask is currently not supported for this provider") - @abstractmethod def _to_arrow_internal(self) -> pyarrow.Table: """Return dataset as pyarrow Table synchronously""" diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index f110d7f68fc..3468b9dc927 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -126,7 +126,6 @@ def materialize_single_feature_view( registry: Registry, project: str, tqdm_builder: Callable[[int], tqdm], - use_dask: bool = False, ) -> None: set_usage_attribute("provider", self.__class__.__name__) @@ -152,7 +151,7 @@ def materialize_single_feature_view( end_date=end_date, ) - table = offline_job.to_arrow(use_dask) + table = offline_job.to_arrow() if feature_view.batch_source.field_mapping is not None: table = _run_field_mapping(table, feature_view.batch_source.field_mapping) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 4a66e0f6004..74a56f28ea8 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -136,7 +136,6 @@ def materialize_single_feature_view( registry: Registry, project: str, tqdm_builder: Callable[[int], tqdm], - use_dask: bool = False, ) -> None: pass diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index c34e7e89c3d..1d4ce7d6cb6 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -53,7 +53,6 @@ def materialize_single_feature_view( registry: Registry, project: str, tqdm_builder: Callable[[int], tqdm], - use_dask: bool = False, ) -> None: pass From 799b38be7547c95363ae9a0b9d88e63be8623d9f Mon Sep 17 00:00:00 2001 From: qooba Date: Wed, 27 Oct 2021 23:48:01 +0000 Subject: [PATCH 13/39] add dask new Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 3 +-- sdk/python/feast/infra/offline_stores/offline_store.py | 1 - sdk/python/feast/infra/provider.py | 3 ++- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index a6e50a33c45..e822f48fdac 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -23,7 +23,6 @@ from feast.infra.provider import ( _get_requested_feature_views_to_features_dict, _run_dask_field_mapping, - _run_field_mapping, ) from feast.registry import Registry from feast.repo_config import FeastConfigBaseModel, RepoConfig @@ -330,7 +329,7 @@ def evaluate_historical_retrieval(): [entity_df_event_timestamp_col] + current_cols ] - return entity_df_with_features + return entity_df_with_features.compute() job = FileRetrievalJob( evaluation_function=evaluate_historical_retrieval, diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 8385bd849fe..1e5fe573774 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -16,7 +16,6 @@ from datetime import datetime from typing import TYPE_CHECKING, List, Optional, Union -import dask.dataframe as dd import pandas as pd import pyarrow diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 74a56f28ea8..de816d92833 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -317,7 +317,8 @@ def _run_dask_field_mapping( ): if field_mapping: # run field mapping in the forward direction - table.rename(field_mapping) + table = table.rename(field_mapping) + table = table.persist() def _coerce_datetime(ts): """ From e7764624fa3660297f5e214406910b2d03cc8cfe Mon Sep 17 00:00:00 2001 From: qooba Date: Thu, 28 Oct 2021 21:58:16 +0000 Subject: [PATCH 14/39] add dask new Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index e822f48fdac..0e845ae1eb8 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -178,7 +178,7 @@ def evaluate_historical_retrieval(): for feature_view, features in feature_views_to_features.items(): entity_name = feature_view.entities[0] - entity_df_with_features.set_index(entity_name) + entity_df_with_features = entity_df_with_features.set_index(entity_name) event_timestamp_column = ( feature_view.batch_source.event_timestamp_column @@ -205,7 +205,11 @@ def evaluate_historical_retrieval(): # Get only data with requested entities df_to_join = dd.merge( - df_to_join, entity_df_with_features[[entity_name]] + df_to_join, + entity_df_with_features, + left_index=True, + right_index=True, + suffixes=("", "__"), ) df_to_join = df_to_join.persist() @@ -278,10 +282,9 @@ def evaluate_historical_retrieval(): created_timestamp_column ] - df_to_join = df_to_join.sort_values(by=event_timestamp_column) - df_to_join = df_to_join.persist() - - df_to_join = df_to_join.drop_duplicates( + # df_to_join = df_to_join.sort_values(by=event_timestamp_column) + # df_to_join = df_to_join.persist() + df_to_join = df_to_join.reset_index().drop_duplicates( right_entity_key_sort_columns, keep="last", ignore_index=True, ) df_to_join = df_to_join.persist() @@ -297,8 +300,8 @@ def evaluate_historical_retrieval(): entity_df_with_features, npartitions=1 ) - # df_to_join = df_to_join.sort_values(event_timestamp_column) - entity_df_with_features = entity_df_with_features.sort_values( + df_to_join = df_to_join.sort_values(event_timestamp_column) + entity_df_with_features = entity_df_with_features.reset_index().sort_values( entity_df_event_timestamp_col ) From ef1a35c43d1759bb3c623f0e4509100735f863ee Mon Sep 17 00:00:00 2001 From: qooba Date: Fri, 29 Oct 2021 00:26:51 +0000 Subject: [PATCH 15/39] add dask new Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 0e845ae1eb8..fe3747750aa 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -392,6 +392,8 @@ def evaluate_offline_job(): meta=(event_timestamp_column, "datetime64[ns, UTC]"), ) + source_df = source_df.persist() + source_columns = set(source_df.columns) if not set(join_key_columns).issubset(source_columns): raise FeastJoinKeysDuringMaterialization( @@ -404,26 +406,29 @@ def evaluate_offline_job(): else [event_timestamp_column] ) - source_df = source_df.sort_values(by=ts_columns[0]) + # source_df = source_df.sort_values(by=ts_columns[0]) - filtered_df = source_df[ + source_df = source_df[ (source_df[event_timestamp_column] >= start_date) & (source_df[event_timestamp_column] < end_date) ] + source_df = source_df.persist() + columns_to_extract = set( join_key_columns + feature_name_columns + ts_columns ) if join_key_columns: - last_values_df = filtered_df.drop_duplicates( + source_df = source_df.drop_duplicates( join_key_columns, keep="last", ignore_index=True ) else: - last_values_df = filtered_df - last_values_df[DUMMY_ENTITY_ID] = DUMMY_ENTITY_VAL + source_df[DUMMY_ENTITY_ID] = DUMMY_ENTITY_VAL columns_to_extract.add(DUMMY_ENTITY_ID) - return last_values_df[list(columns_to_extract)] + source_df = source_df.persist() + + return source_df[list(columns_to_extract)] # When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized return FileRetrievalJob( From 0810a2d90b5bf5561181323447b3fbfb61f84825 Mon Sep 17 00:00:00 2001 From: qooba Date: Fri, 29 Oct 2021 23:19:31 +0000 Subject: [PATCH 16/39] add dask new Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index fe3747750aa..37be0b5582b 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -206,7 +206,7 @@ def evaluate_historical_retrieval(): # Get only data with requested entities df_to_join = dd.merge( df_to_join, - entity_df_with_features, + entity_df_with_features.drop(entity_df_event_timestamp_col, axis=1), left_index=True, right_index=True, suffixes=("", "__"), From 283e96a12e5e73db94f80b828a77b6dc93f476e0 Mon Sep 17 00:00:00 2001 From: qooba Date: Sat, 30 Oct 2021 20:56:45 +0000 Subject: [PATCH 17/39] add dask new Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 37be0b5582b..b4f8ef0df2b 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -72,7 +72,7 @@ def _to_df_internal(self) -> pd.DataFrame: @log_exceptions_and_usage def _to_dask_df_internal(self) -> dd.DataFrame: # Only execute the evaluation function to build the final historical retrieval dataframe at the last moment. - df = self.evaluation_function(True) + df = self.evaluation_function().compute() return df @log_exceptions_and_usage @@ -332,7 +332,7 @@ def evaluate_historical_retrieval(): [entity_df_event_timestamp_col] + current_cols ] - return entity_df_with_features.compute() + return entity_df_with_features.persist() job = FileRetrievalJob( evaluation_function=evaluate_historical_retrieval, @@ -428,7 +428,7 @@ def evaluate_offline_job(): source_df = source_df.persist() - return source_df[list(columns_to_extract)] + return source_df[list(columns_to_extract)].persist() # When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized return FileRetrievalJob( From 03c04ca078710b5683f37a0a07c077b070be19c7 Mon Sep 17 00:00:00 2001 From: qooba Date: Mon, 1 Nov 2021 16:11:03 +0000 Subject: [PATCH 18/39] add dask dtype check Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 99 ++++++++++++++----- 1 file changed, 74 insertions(+), 25 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index b4f8ef0df2b..025deabbbed 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -150,25 +150,38 @@ def get_historical_features( # Create lazy function that is only called from the RetrievalJob object def evaluate_historical_retrieval(): - # Make sure all event timestamp fields are tz-aware. We default tz-naive fields to UTC - entity_df[entity_df_event_timestamp_col] = entity_df[ - entity_df_event_timestamp_col - ].apply(lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc)) - # Create a copy of entity_df to prevent modifying the original entity_df_with_features = entity_df.copy() - # Convert event timestamp column to datetime and normalize time zone to UTC - # This is necessary to avoid issues with pd.merge_asof - if isinstance(entity_df_with_features, dd.DataFrame): - entity_df_with_features[entity_df_event_timestamp_col] = dd.to_datetime( - entity_df_with_features[entity_df_event_timestamp_col], utc=True - ) - else: - entity_df_with_features[entity_df_event_timestamp_col] = pd.to_datetime( - entity_df_with_features[entity_df_event_timestamp_col], utc=True + entity_df_event_timestamp_col_type = entity_df_with_features.dtypes[ + entity_df_event_timestamp_col + ] + if ( + not hasattr(entity_df_event_timestamp_col_type, "tz") + or entity_df_event_timestamp_col_type.tz != pytz.UTC + ): + # Make sure all event timestamp fields are tz-aware. We default tz-naive fields to UTC + entity_df_with_features[ + entity_df_event_timestamp_col + ] = entity_df_with_features[entity_df_event_timestamp_col].apply( + lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc) ) + # Convert event timestamp column to datetime and normalize time zone to UTC + # This is necessary to avoid issues with pd.merge_asof + if isinstance(entity_df_with_features, dd.DataFrame): + entity_df_with_features[ + entity_df_event_timestamp_col + ] = dd.to_datetime( + entity_df_with_features[entity_df_event_timestamp_col], utc=True + ) + else: + entity_df_with_features[ + entity_df_event_timestamp_col + ] = pd.to_datetime( + entity_df_with_features[entity_df_event_timestamp_col], utc=True + ) + # Sort event timestamp values entity_df_with_features = entity_df_with_features.sort_values( entity_df_event_timestamp_col @@ -224,14 +237,32 @@ def evaluate_historical_retrieval(): df_to_join, feature_view.projection.join_key_map ) - # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC - df_to_join[event_timestamp_column] = df_to_join[ - event_timestamp_column - ].apply( - lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), - meta=(event_timestamp_column, "datetime64[ns, UTC]"), - ) + df_to_join_types = df_to_join.dtypes + event_timestamp_column_type = df_to_join_types[event_timestamp_column] + if created_timestamp_column: + created_timestamp_column_type = df_to_join_types[ + created_timestamp_column + ] + + if ( + not hasattr(event_timestamp_column_type, "tz") + or event_timestamp_column_type.tz != pytz.UTC + ): + # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC + df_to_join[event_timestamp_column] = df_to_join[ + event_timestamp_column + ].apply( + lambda x: x + if x.tzinfo is not None + else x.replace(tzinfo=pytz.utc), + meta=(event_timestamp_column, "datetime64[ns, UTC]"), + ) + + if created_timestamp_column and ( + not hasattr(created_timestamp_column_type, "tz") + or created_timestamp_column_type.tz != pytz.UTC + ): df_to_join[created_timestamp_column] = df_to_join[ created_timestamp_column ].apply( @@ -380,11 +411,29 @@ def evaluate_offline_job(): data_source.path, storage_options=storage_options ) - source_df[event_timestamp_column] = source_df[event_timestamp_column].apply( - lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), - meta=(event_timestamp_column, "datetime64[ns, UTC]"), - ) + source_df_types = source_df.dtypes + event_timestamp_column_type = source_df_types[event_timestamp_column] + if created_timestamp_column: + created_timestamp_column_type = source_df_types[ + created_timestamp_column + ] + + if ( + not hasattr(event_timestamp_column_type, "tz") + or event_timestamp_column_type.tz != pytz.UTC + ): + source_df[event_timestamp_column] = source_df[ + event_timestamp_column + ].apply( + lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), + meta=(event_timestamp_column, "datetime64[ns, UTC]"), + ) + + if created_timestamp_column and ( + not hasattr(created_timestamp_column_type, "tz") + or created_timestamp_column_type.tz != pytz.UTC + ): source_df[created_timestamp_column] = source_df[ created_timestamp_column ].apply( From f3b18cf133c3dff83d9726bd81a3456e34aa1f84 Mon Sep 17 00:00:00 2001 From: qooba Date: Mon, 22 Nov 2021 00:10:44 +0000 Subject: [PATCH 19/39] add dask to setup Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 7 ++++--- sdk/python/setup.py | 5 +---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 025deabbbed..bb11b739396 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -313,8 +313,9 @@ def evaluate_historical_retrieval(): created_timestamp_column ] - # df_to_join = df_to_join.sort_values(by=event_timestamp_column) - # df_to_join = df_to_join.persist() + df_to_join = df_to_join.sort_values(by=event_timestamp_column) + df_to_join = df_to_join.persist() + df_to_join = df_to_join.reset_index().drop_duplicates( right_entity_key_sort_columns, keep="last", ignore_index=True, ) @@ -455,7 +456,7 @@ def evaluate_offline_job(): else [event_timestamp_column] ) - # source_df = source_df.sort_values(by=ts_columns[0]) + source_df = source_df.sort_values(by=event_timestamp_column) source_df = source_df[ (source_df[event_timestamp_column] >= start_date) diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 4a48d300796..50e9bcf160f 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -66,6 +66,7 @@ "uvicorn[standard]>=0.14.0", "proto-plus<1.19.7", "tensorflow-metadata>=1.0.0,<2.0.0", + "dask==2021.9.1", ] GCP_REQUIRED = [ @@ -81,10 +82,6 @@ "hiredis>=2.0.0", ] -DASK_REQUIRED = [ - "dask==2021.9.1", -] - AWS_REQUIRED = [ "boto3>=1.17.0", "docker>=5.0.2", From 14e9b85cd61fadb309535e3a74e5a98cbe514de3 Mon Sep 17 00:00:00 2001 From: qooba Date: Mon, 29 Nov 2021 23:38:08 +0000 Subject: [PATCH 20/39] add dask requirements Signed-off-by: qooba --- sdk/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 50e9bcf160f..8516db4a7ab 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -66,7 +66,7 @@ "uvicorn[standard]>=0.14.0", "proto-plus<1.19.7", "tensorflow-metadata>=1.0.0,<2.0.0", - "dask==2021.9.1", + "dask>=2021.9.*", ] GCP_REQUIRED = [ From 34200530f152d7e60f604ee639960bb3b6300376 Mon Sep 17 00:00:00 2001 From: qooba Date: Sun, 26 Dec 2021 00:45:56 +0000 Subject: [PATCH 21/39] add dask Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 108 ++++++++---------- 1 file changed, 49 insertions(+), 59 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index bb11b739396..23afc6136c7 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -189,10 +189,6 @@ def evaluate_historical_retrieval(): # Load feature view data from sources and join them incrementally for feature_view, features in feature_views_to_features.items(): - - entity_name = feature_view.entities[0] - entity_df_with_features = entity_df_with_features.set_index(entity_name) - event_timestamp_column = ( feature_view.batch_source.event_timestamp_column ) @@ -200,6 +196,22 @@ def evaluate_historical_retrieval(): feature_view.batch_source.created_timestamp_column ) + # Build a list of entity columns to join on (from the right table) + join_keys = [] + for entity_name in feature_view.entities: + entity = registry.get_entity(entity_name, project) + join_key = feature_view.projection.join_key_map.get( + entity.join_key, entity.join_key + ) + join_keys.append(join_key) + right_entity_columns = join_keys + right_entity_key_columns = [ + event_timestamp_column + ] + right_entity_columns + + entity_name = join_keys[0] + # entity_df_with_features = entity_df_with_features.set_index(entity_name) + storage_options = ( { "client_kwargs": { @@ -213,15 +225,22 @@ def evaluate_historical_retrieval(): df_to_join = dd.read_parquet( feature_view.batch_source.path, storage_options=storage_options, - index=[entity_name], + # index=[entity_name], ) # Get only data with requested entities + __entity_df_event_timestamp_col = f"__{entity_df_event_timestamp_col}" df_to_join = dd.merge( df_to_join, - entity_df_with_features.drop(entity_df_event_timestamp_col, axis=1), - left_index=True, - right_index=True, + entity_df_with_features[ + join_keys + [entity_df_event_timestamp_col] + ].rename( + columns={ + entity_df_event_timestamp_col: __entity_df_event_timestamp_col + } + ), + left_on=join_keys, + right_on=join_keys, suffixes=("", "__"), ) df_to_join = df_to_join.persist() @@ -272,6 +291,22 @@ def evaluate_historical_retrieval(): meta=(event_timestamp_column, "datetime64[ns, UTC]"), ) + # Filter rows by defined timestamp tolerance + df_to_join = df_to_join[ + ( + df_to_join[event_timestamp_column] + >= df_to_join[__entity_df_event_timestamp_col] + - feature_view.ttl + ) + & ( + df_to_join[event_timestamp_column] + <= df_to_join[__entity_df_event_timestamp_col] + ) + ] + df_to_join = df_to_join.persist() + df_to_join = df_to_join.drop([__entity_df_event_timestamp_col], axis=1) + df_to_join = df_to_join.persist() + # Build a list of all the features we should select from this source feature_names = [] columns_map = {} @@ -292,19 +327,6 @@ def evaluate_historical_retrieval(): df_to_join = df_to_join.rename(columns=columns_map) df_to_join = df_to_join.persist() - # Build a list of entity columns to join on (from the right table) - join_keys = [] - for entity_name in feature_view.entities: - entity = registry.get_entity(entity_name, project) - join_key = feature_view.projection.join_key_map.get( - entity.join_key, entity.join_key - ) - join_keys.append(join_key) - right_entity_columns = join_keys - right_entity_key_columns = [ - event_timestamp_column - ] + right_entity_columns - # Remove all duplicate entity keys (using created timestamp) right_entity_key_sort_columns = right_entity_key_columns if created_timestamp_column: @@ -317,7 +339,7 @@ def evaluate_historical_retrieval(): df_to_join = df_to_join.persist() df_to_join = df_to_join.reset_index().drop_duplicates( - right_entity_key_sort_columns, keep="last", ignore_index=True, + join_keys, keep="last", ignore_index=True, ) df_to_join = df_to_join.persist() @@ -325,46 +347,14 @@ def evaluate_historical_retrieval(): df_to_join = df_to_join[right_entity_key_columns + feature_names] df_to_join = df_to_join.persist() - # Do point in-time-join between entity_df and feature dataframe - - if not isinstance(entity_df_with_features, dd.DataFrame): - entity_df_with_features = dd.from_pandas( - entity_df_with_features, npartitions=1 - ) - - df_to_join = df_to_join.sort_values(event_timestamp_column) - entity_df_with_features = entity_df_with_features.reset_index().sort_values( - entity_df_event_timestamp_col - ) - - entity_df_with_features = dd.merge_asof( - entity_df_with_features, - df_to_join, - left_on=entity_df_event_timestamp_col, - right_on=event_timestamp_column, - by=right_entity_columns or None, - tolerance=feature_view.ttl, - ) - - # Remove right (feature table/view) event_timestamp column. - entity_df_with_features = ( - entity_df_with_features.drop(columns=[event_timestamp_column]) - if event_timestamp_column != entity_df_event_timestamp_col - and event_timestamp_column in entity_df_with_features.columns - else entity_df_with_features - ) - # Ensure that we delete dataframes to free up memory - del df_to_join + del entity_df_with_features - # Move "event_timestamp" column to front - current_cols = entity_df_with_features.columns.tolist() - current_cols.remove(entity_df_event_timestamp_col) - entity_df_with_features = entity_df_with_features[ - [entity_df_event_timestamp_col] + current_cols - ] + df_to_join = df_to_join.rename( + columns={event_timestamp_column: entity_df_event_timestamp_col} + ) - return entity_df_with_features.persist() + return df_to_join.persist() job = FileRetrievalJob( evaluation_function=evaluate_historical_retrieval, From f8873d913b30c28c05d2e3e2ba81611328cc7d7e Mon Sep 17 00:00:00 2001 From: qooba Date: Wed, 29 Dec 2021 23:40:25 +0000 Subject: [PATCH 22/39] add dask Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 100 +++++++----------- 1 file changed, 40 insertions(+), 60 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 23afc6136c7..ae0d71c1d8c 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -204,13 +204,12 @@ def evaluate_historical_retrieval(): entity.join_key, entity.join_key ) join_keys.append(join_key) - right_entity_columns = join_keys - right_entity_key_columns = [ - event_timestamp_column - ] + right_entity_columns - entity_name = join_keys[0] - # entity_df_with_features = entity_df_with_features.set_index(entity_name) + right_entity_key_columns = [ + event_timestamp_column, + created_timestamp_column, + ] + join_keys + right_entity_key_columns = [c for c in right_entity_key_columns if c] storage_options = ( { @@ -223,25 +222,41 @@ def evaluate_historical_retrieval(): ) df_to_join = dd.read_parquet( - feature_view.batch_source.path, - storage_options=storage_options, - # index=[entity_name], + feature_view.batch_source.path, storage_options=storage_options, ) + # Build a list of all the features we should select from this source + feature_names = [] + columns_map = {} + for feature in features: + # Modify the separator for feature refs in column names to double underscore. We are using + # double underscore as separator for consistency with other databases like BigQuery, + # where there are very few characters available for use as separators + if full_feature_names: + formatted_feature_name = ( + f"{feature_view.projection.name_to_use()}__{feature}" + ) + else: + formatted_feature_name = feature + # Add the feature name to the list of columns + feature_names.append(formatted_feature_name) + + # Ensure that the source dataframe feature column includes the feature view name as a prefix + df_to_join = df_to_join.rename(columns=columns_map) + df_to_join = df_to_join.persist() + + # Select only the columns we need to join from the feature dataframe + df_to_join = df_to_join[right_entity_key_columns + feature_names] + df_to_join = df_to_join.persist() + # Get only data with requested entities - __entity_df_event_timestamp_col = f"__{entity_df_event_timestamp_col}" df_to_join = dd.merge( + entity_df_with_features, df_to_join, - entity_df_with_features[ - join_keys + [entity_df_event_timestamp_col] - ].rename( - columns={ - entity_df_event_timestamp_col: __entity_df_event_timestamp_col - } - ), left_on=join_keys, right_on=join_keys, suffixes=("", "__"), + how="left", ) df_to_join = df_to_join.persist() @@ -295,66 +310,31 @@ def evaluate_historical_retrieval(): df_to_join = df_to_join[ ( df_to_join[event_timestamp_column] - >= df_to_join[__entity_df_event_timestamp_col] - - feature_view.ttl + >= df_to_join[entity_df_event_timestamp_col] - feature_view.ttl ) & ( df_to_join[event_timestamp_column] - <= df_to_join[__entity_df_event_timestamp_col] + <= df_to_join[entity_df_event_timestamp_col] ) ] df_to_join = df_to_join.persist() - df_to_join = df_to_join.drop([__entity_df_event_timestamp_col], axis=1) - df_to_join = df_to_join.persist() - - # Build a list of all the features we should select from this source - feature_names = [] - columns_map = {} - for feature in features: - # Modify the separator for feature refs in column names to double underscore. We are using - # double underscore as separator for consistency with other databases like BigQuery, - # where there are very few characters available for use as separators - if full_feature_names: - formatted_feature_name = ( - f"{feature_view.projection.name_to_use()}__{feature}" - ) - else: - formatted_feature_name = feature - # Add the feature name to the list of columns - feature_names.append(formatted_feature_name) - - # Ensure that the source dataframe feature column includes the feature view name as a prefix - df_to_join = df_to_join.rename(columns=columns_map) - df_to_join = df_to_join.persist() - - # Remove all duplicate entity keys (using created timestamp) - right_entity_key_sort_columns = right_entity_key_columns - if created_timestamp_column: - # If created_timestamp is available, use it to dedupe deterministically - right_entity_key_sort_columns = right_entity_key_sort_columns + [ - created_timestamp_column - ] df_to_join = df_to_join.sort_values(by=event_timestamp_column) df_to_join = df_to_join.persist() - df_to_join = df_to_join.reset_index().drop_duplicates( + df_to_join = df_to_join.drop_duplicates( join_keys, keep="last", ignore_index=True, ) df_to_join = df_to_join.persist() - # Select only the columns we need to join from the feature dataframe - df_to_join = df_to_join[right_entity_key_columns + feature_names] - df_to_join = df_to_join.persist() + entity_df_with_features = df_to_join.drop( + [event_timestamp_column], axis=1 + ).persist() # Ensure that we delete dataframes to free up memory - del entity_df_with_features - - df_to_join = df_to_join.rename( - columns={event_timestamp_column: entity_df_event_timestamp_col} - ) + del df_to_join - return df_to_join.persist() + return entity_df_with_features.persist() job = FileRetrievalJob( evaluation_function=evaluate_historical_retrieval, From 909ee61c19e5926baeba029e2a38740f97324d9e Mon Sep 17 00:00:00 2001 From: qooba Date: Thu, 30 Dec 2021 00:09:42 +0000 Subject: [PATCH 23/39] add dask Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index ae0d71c1d8c..0f3caeb26a4 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -65,12 +65,6 @@ def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]: @log_exceptions_and_usage def _to_df_internal(self) -> pd.DataFrame: - # Only execute the evaluation function to build the final historical retrieval dataframe at the last moment. - df = self.evaluation_function() - return df - - @log_exceptions_and_usage - def _to_dask_df_internal(self) -> dd.DataFrame: # Only execute the evaluation function to build the final historical retrieval dataframe at the last moment. df = self.evaluation_function().compute() return df From f4aef2d0377452b6075a598b5eae2684d8230f88 Mon Sep 17 00:00:00 2001 From: qooba Date: Thu, 30 Dec 2021 00:26:03 +0000 Subject: [PATCH 24/39] add dask Signed-off-by: qooba --- sdk/python/feast/infra/provider.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index de816d92833..5d7dcc9cf10 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -312,6 +312,7 @@ def _run_field_mapping( table = table.rename_columns(mapped_cols) return table + def _run_dask_field_mapping( table: dd.DataFrame, field_mapping: Dict[str, str], ): @@ -320,6 +321,7 @@ def _run_dask_field_mapping( table = table.rename(field_mapping) table = table.persist() + def _coerce_datetime(ts): """ Depending on underlying time resolution, arrow to_pydict() sometimes returns pandas @@ -334,6 +336,7 @@ def _coerce_datetime(ts): else: return ts + def _convert_arrow_to_proto( table: Union[pyarrow.Table, pyarrow.RecordBatch], feature_view: FeatureView, From 394734cffeba2c9484c7ef620eb42f7dcaa0a2d0 Mon Sep 17 00:00:00 2001 From: qooba Date: Mon, 10 Jan 2022 00:04:25 +0000 Subject: [PATCH 25/39] change dask dependency Signed-off-by: qooba --- sdk/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 8516db4a7ab..3cb30a24215 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -66,7 +66,7 @@ "uvicorn[standard]>=0.14.0", "proto-plus<1.19.7", "tensorflow-metadata>=1.0.0,<2.0.0", - "dask>=2021.9.*", + "dask>=2021.*", ] GCP_REQUIRED = [ From 4606fdfcd9cb36992ad4c1d71a33980e42a5b6b2 Mon Sep 17 00:00:00 2001 From: qooba Date: Tue, 25 Jan 2022 00:58:12 +0000 Subject: [PATCH 26/39] add dask - sorting fix Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 0f3caeb26a4..0ae25eadb10 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -313,6 +313,10 @@ def evaluate_historical_retrieval(): ] df_to_join = df_to_join.persist() + if created_timestamp_column: + df_to_join = df_to_join.sort_values(by=created_timestamp_column) + df_to_join = df_to_join.persist() + df_to_join = df_to_join.sort_values(by=event_timestamp_column) df_to_join = df_to_join.persist() @@ -420,6 +424,9 @@ def evaluate_offline_job(): else [event_timestamp_column] ) + if created_timestamp_column: + source_df = source_df.sort_values(by=created_timestamp_column) + source_df = source_df.sort_values(by=event_timestamp_column) source_df = source_df[ From 15f8e26fd6fae0d9c93c8b10e13b00be44fe8894 Mon Sep 17 00:00:00 2001 From: qooba Date: Wed, 26 Jan 2022 22:56:20 +0000 Subject: [PATCH 27/39] add dask - fix integration tests Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 23 ++++++++++--------- sdk/python/feast/infra/provider.py | 4 +++- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 0ae25eadb10..3a0d9d07db4 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -234,11 +234,23 @@ def evaluate_historical_retrieval(): formatted_feature_name = feature # Add the feature name to the list of columns feature_names.append(formatted_feature_name) + columns_map[feature] = formatted_feature_name # Ensure that the source dataframe feature column includes the feature view name as a prefix df_to_join = df_to_join.rename(columns=columns_map) df_to_join = df_to_join.persist() + # Rename columns by the field mapping dictionary if it exists + if feature_view.batch_source.field_mapping: + df_to_join = _run_dask_field_mapping( + df_to_join, feature_view.batch_source.field_mapping + ) + # Rename entity columns by the join_key_map dictionary if it exists + if feature_view.projection.join_key_map: + df_to_join = _run_dask_field_mapping( + df_to_join, feature_view.projection.join_key_map + ) + # Select only the columns we need to join from the feature dataframe df_to_join = df_to_join[right_entity_key_columns + feature_names] df_to_join = df_to_join.persist() @@ -254,17 +266,6 @@ def evaluate_historical_retrieval(): ) df_to_join = df_to_join.persist() - # Rename columns by the field mapping dictionary if it exists - if feature_view.batch_source.field_mapping is not None: - _run_dask_field_mapping( - df_to_join, feature_view.batch_source.field_mapping - ) - # Rename entity columns by the join_key_map dictionary if it exists - if feature_view.projection.join_key_map: - _run_dask_field_mapping( - df_to_join, feature_view.projection.join_key_map - ) - df_to_join_types = df_to_join.dtypes event_timestamp_column_type = df_to_join_types[event_timestamp_column] diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 5d7dcc9cf10..b3f10292423 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -318,9 +318,11 @@ def _run_dask_field_mapping( ): if field_mapping: # run field mapping in the forward direction - table = table.rename(field_mapping) + table = table.rename(columns=field_mapping) table = table.persist() + return table + def _coerce_datetime(ts): """ From bb370d5d9a559dcc86087307f1f61ef9abea9e15 Mon Sep 17 00:00:00 2001 From: qooba Date: Wed, 26 Jan 2022 23:16:34 +0000 Subject: [PATCH 28/39] add dask - rebase Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 3a0d9d07db4..37ccb10670a 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -72,7 +72,7 @@ def _to_df_internal(self) -> pd.DataFrame: @log_exceptions_and_usage def _to_arrow_internal(self): # Only execute the evaluation function to build the final historical retrieval dataframe at the last moment. - df = self.evaluation_function() + df = self.evaluation_function().compute() return pyarrow.Table.from_pandas(df) def persist(self, storage: SavedDatasetStorage): @@ -96,6 +96,7 @@ def persist(self, storage: SavedDatasetStorage): def metadata(self) -> Optional[RetrievalMetadata]: return self._metadata + class FileOfflineStore(OfflineStore): @staticmethod @log_exceptions_and_usage(offline_store="file") From 477142475803b8087ebb1c218421d6770979073e Mon Sep 17 00:00:00 2001 From: qooba Date: Thu, 27 Jan 2022 00:08:26 +0000 Subject: [PATCH 29/39] add dask - fix offline store Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/offline_store.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 1e5fe573774..a3065a31c0e 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -116,7 +116,6 @@ def to_arrow( Return dataset as pyarrow Table synchronously Args: validation_reference: If provided resulting dataset will be validated against this reference profile. - """ if not self.on_demand_feature_views and not validation_reference: return self._to_arrow_internal() From 1bbe261bda3d8afb22454465f8bb9afd9828783b Mon Sep 17 00:00:00 2001 From: qooba Date: Mon, 31 Jan 2022 01:41:31 +0000 Subject: [PATCH 30/39] add dask - fix integration tests Signed-off-by: qooba --- .../integration/e2e/test_universal_e2e.py | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/sdk/python/tests/integration/e2e/test_universal_e2e.py b/sdk/python/tests/integration/e2e/test_universal_e2e.py index fbbdd14f23e..477c79614c5 100644 --- a/sdk/python/tests/integration/e2e/test_universal_e2e.py +++ b/sdk/python/tests/integration/e2e/test_universal_e2e.py @@ -76,14 +76,26 @@ def check_offline_and_online_features( if full_feature_names: if expected_value: - assert abs(df.to_dict()[f"{fv.name}__value"][0] - expected_value) < 1e-6 + assert ( + abs( + df.to_dict(orient="list")[f"{fv.name}__value"][0] + - expected_value + ) + < 1e-6 + ) else: - assert math.isnan(df.to_dict()[f"{fv.name}__value"][0]) + assert not df.to_dict(orient="list")[f"{fv.name}__value"] or math.isnan( + df.to_dict(orient="list")[f"{fv.name}__value"][0] + ) else: if expected_value: - assert abs(df.to_dict()["value"][0] - expected_value) < 1e-6 + assert ( + abs(df.to_dict(orient="list")["value"][0] - expected_value) < 1e-6 + ) else: - assert math.isnan(df.to_dict()["value"][0]) + assert not df.to_dict(orient="list")["value"] or math.isnan( + df.to_dict(orient="list")["value"][0] + ) def run_offline_online_store_consistency_test( From 234a70bf6ccf4bfed33dc70d1316db21d9f64709 Mon Sep 17 00:00:00 2001 From: qooba Date: Sun, 13 Feb 2022 13:59:03 +0100 Subject: [PATCH 31/39] add dask - fix tests Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 75 +++++++++++++++---- 1 file changed, 62 insertions(+), 13 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 37ccb10670a..df7f3ba5cae 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -182,6 +182,8 @@ def evaluate_historical_retrieval(): entity_df_event_timestamp_col ) + join_keys = [] + # Load feature view data from sources and join them incrementally for feature_view, features in feature_views_to_features.items(): event_timestamp_column = ( @@ -191,8 +193,12 @@ def evaluate_historical_retrieval(): feature_view.batch_source.created_timestamp_column ) + if join_keys: + last_join_keys = join_keys + # Build a list of entity columns to join on (from the right table) join_keys = [] + for entity_name in feature_view.entities: entity = registry.get_entity(entity_name, project) join_key = feature_view.projection.join_key_map.get( @@ -256,15 +262,39 @@ def evaluate_historical_retrieval(): df_to_join = df_to_join[right_entity_key_columns + feature_names] df_to_join = df_to_join.persist() + # Make sure to not have duplicated columns + df_to_join = df_to_join.rename( + columns={event_timestamp_column: f"__{event_timestamp_column}",} + ) + event_timestamp_column = f"__{event_timestamp_column}" + + if created_timestamp_column: + df_to_join = df_to_join.rename( + columns={ + created_timestamp_column: f"__{created_timestamp_column}", + } + ) + created_timestamp_column = f"__{created_timestamp_column}" + + df_to_join = df_to_join.persist() + + # tmp join keys needed for cross join with null join table view + tmp_join_keys = [] + if not join_keys: + entity_df_with_features["__tmp"] = 1 + df_to_join["__tmp"] = 1 + tmp_join_keys = ["__tmp"] + # Get only data with requested entities df_to_join = dd.merge( entity_df_with_features, df_to_join, - left_on=join_keys, - right_on=join_keys, + left_on=join_keys or tmp_join_keys, + right_on=join_keys or tmp_join_keys, suffixes=("", "__"), how="left", ) + df_to_join = df_to_join.persist() df_to_join_types = df_to_join.dtypes @@ -303,16 +333,19 @@ def evaluate_historical_retrieval(): ) # Filter rows by defined timestamp tolerance - df_to_join = df_to_join[ - ( - df_to_join[event_timestamp_column] - >= df_to_join[entity_df_event_timestamp_col] - feature_view.ttl - ) - & ( - df_to_join[event_timestamp_column] - <= df_to_join[entity_df_event_timestamp_col] - ) - ] + if feature_view.ttl != 0: + df_to_join = df_to_join[ + ( + df_to_join[event_timestamp_column] + >= df_to_join[entity_df_event_timestamp_col] + - feature_view.ttl + ) + & ( + df_to_join[event_timestamp_column] + <= df_to_join[entity_df_event_timestamp_col] + ) + ] + df_to_join = df_to_join.persist() if created_timestamp_column: @@ -322,8 +355,14 @@ def evaluate_historical_retrieval(): df_to_join = df_to_join.sort_values(by=event_timestamp_column) df_to_join = df_to_join.persist() + # For null join table view drop duplicates for last join_keys + if tmp_join_keys: + join_keys = last_join_keys + df_to_join = df_to_join.drop_duplicates( - join_keys, keep="last", ignore_index=True, + join_keys + [entity_df_event_timestamp_col], + keep="last", + ignore_index=True, ) df_to_join = df_to_join.persist() @@ -331,6 +370,16 @@ def evaluate_historical_retrieval(): [event_timestamp_column], axis=1 ).persist() + if created_timestamp_column: + entity_df_with_features = entity_df_with_features.drop( + [created_timestamp_column], axis=1 + ).persist() + + if tmp_join_keys: + entity_df_with_features = entity_df_with_features.drop( + ["__tmp"], axis=1 + ).persist() + # Ensure that we delete dataframes to free up memory del df_to_join From 245b6ea7df919069170e51aa4576f4f768d7954c Mon Sep 17 00:00:00 2001 From: qooba Date: Sun, 13 Feb 2022 15:06:58 +0100 Subject: [PATCH 32/39] add dask - fix integration tests Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index df7f3ba5cae..e483e0d1f2e 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -183,6 +183,7 @@ def evaluate_historical_retrieval(): ) join_keys = [] + all_join_keys = [] # Load feature view data from sources and join them incrementally for feature_view, features in feature_views_to_features.items(): @@ -193,10 +194,8 @@ def evaluate_historical_retrieval(): feature_view.batch_source.created_timestamp_column ) - if join_keys: - last_join_keys = join_keys - # Build a list of entity columns to join on (from the right table) + all_join_keys = list(set(all_join_keys + join_keys)) join_keys = [] for entity_name in feature_view.entities: @@ -295,7 +294,10 @@ def evaluate_historical_retrieval(): how="left", ) - df_to_join = df_to_join.persist() + if tmp_join_keys: + df_to_join = df_to_join.drop(tmp_join_keys, axis=1).persist() + else: + df_to_join = df_to_join.persist() df_to_join_types = df_to_join.dtypes event_timestamp_column_type = df_to_join_types[event_timestamp_column] @@ -355,12 +357,8 @@ def evaluate_historical_retrieval(): df_to_join = df_to_join.sort_values(by=event_timestamp_column) df_to_join = df_to_join.persist() - # For null join table view drop duplicates for last join_keys - if tmp_join_keys: - join_keys = last_join_keys - df_to_join = df_to_join.drop_duplicates( - join_keys + [entity_df_event_timestamp_col], + all_join_keys + [entity_df_event_timestamp_col], keep="last", ignore_index=True, ) @@ -375,11 +373,6 @@ def evaluate_historical_retrieval(): [created_timestamp_column], axis=1 ).persist() - if tmp_join_keys: - entity_df_with_features = entity_df_with_features.drop( - ["__tmp"], axis=1 - ).persist() - # Ensure that we delete dataframes to free up memory del df_to_join From 050e1641198cb5abcafd2248622e215dbcada313 Mon Sep 17 00:00:00 2001 From: qooba Date: Sun, 13 Feb 2022 15:09:28 +0100 Subject: [PATCH 33/39] add dask - fix integration tests Signed-off-by: qooba --- sdk/python/setup.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 3cb30a24215..8951f23c509 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -244,7 +244,6 @@ def run(self): "redis": REDIS_REQUIRED, "snowflake": SNOWFLAKE_REQUIRED, "ge": GE_REQUIRED, - "dask": DASK_REQUIRED, }, include_package_data=True, license="Apache", From f51f592cb27e5d15c3b81418d5c8dbec4fb5ee0d Mon Sep 17 00:00:00 2001 From: qooba Date: Sun, 13 Feb 2022 18:09:17 +0100 Subject: [PATCH 34/39] add dask - fix integration tests Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index e483e0d1f2e..29dedb672b2 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -225,6 +225,17 @@ def evaluate_historical_retrieval(): feature_view.batch_source.path, storage_options=storage_options, ) + # Rename columns by the field mapping dictionary if it exists + if feature_view.batch_source.field_mapping: + df_to_join = _run_dask_field_mapping( + df_to_join, feature_view.batch_source.field_mapping + ) + # Rename entity columns by the join_key_map dictionary if it exists + if feature_view.projection.join_key_map: + df_to_join = _run_dask_field_mapping( + df_to_join, feature_view.projection.join_key_map + ) + # Build a list of all the features we should select from this source feature_names = [] columns_map = {} @@ -246,17 +257,6 @@ def evaluate_historical_retrieval(): df_to_join = df_to_join.rename(columns=columns_map) df_to_join = df_to_join.persist() - # Rename columns by the field mapping dictionary if it exists - if feature_view.batch_source.field_mapping: - df_to_join = _run_dask_field_mapping( - df_to_join, feature_view.batch_source.field_mapping - ) - # Rename entity columns by the join_key_map dictionary if it exists - if feature_view.projection.join_key_map: - df_to_join = _run_dask_field_mapping( - df_to_join, feature_view.projection.join_key_map - ) - # Select only the columns we need to join from the feature dataframe df_to_join = df_to_join[right_entity_key_columns + feature_names] df_to_join = df_to_join.persist() From 9f613e61b28e8af80cad6dba40537576539ab973 Mon Sep 17 00:00:00 2001 From: qooba Date: Sun, 13 Feb 2022 19:03:39 +0100 Subject: [PATCH 35/39] add dask - fix integration tests Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 71 ++++++++++--------- 1 file changed, 37 insertions(+), 34 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 29dedb672b2..e9481c6f098 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -254,50 +254,23 @@ def evaluate_historical_retrieval(): columns_map[feature] = formatted_feature_name # Ensure that the source dataframe feature column includes the feature view name as a prefix - df_to_join = df_to_join.rename(columns=columns_map) - df_to_join = df_to_join.persist() + df_to_join = _run_dask_field_mapping(df_to_join, columns_map) # Select only the columns we need to join from the feature dataframe df_to_join = df_to_join[right_entity_key_columns + feature_names] df_to_join = df_to_join.persist() # Make sure to not have duplicated columns - df_to_join = df_to_join.rename( - columns={event_timestamp_column: f"__{event_timestamp_column}",} - ) - event_timestamp_column = f"__{event_timestamp_column}" - - if created_timestamp_column: - df_to_join = df_to_join.rename( - columns={ - created_timestamp_column: f"__{created_timestamp_column}", - } + if entity_df_event_timestamp_col == event_timestamp_column: + df_to_join = _run_dask_field_mapping( + df_to_join, + {event_timestamp_column: f"__{event_timestamp_column}"}, ) - created_timestamp_column = f"__{created_timestamp_column}" + event_timestamp_column = f"__{event_timestamp_column}" df_to_join = df_to_join.persist() - # tmp join keys needed for cross join with null join table view - tmp_join_keys = [] - if not join_keys: - entity_df_with_features["__tmp"] = 1 - df_to_join["__tmp"] = 1 - tmp_join_keys = ["__tmp"] - - # Get only data with requested entities - df_to_join = dd.merge( - entity_df_with_features, - df_to_join, - left_on=join_keys or tmp_join_keys, - right_on=join_keys or tmp_join_keys, - suffixes=("", "__"), - how="left", - ) - - if tmp_join_keys: - df_to_join = df_to_join.drop(tmp_join_keys, axis=1).persist() - else: - df_to_join = df_to_join.persist() + df_to_join = _merge(entity_df_with_features, df_to_join, join_keys) df_to_join_types = df_to_join.dtypes event_timestamp_column_type = df_to_join_types[event_timestamp_column] @@ -542,3 +515,33 @@ def _get_entity_df_event_timestamp_range( entity_df_event_timestamp.min().to_pydatetime(), entity_df_event_timestamp.max().to_pydatetime(), ) + + +def _merge( + entity_df_with_features: dd.DataFrame, + df_to_join: dd.DataFrame, + join_keys: List[str], +) -> dd.DataFrame: + # tmp join keys needed for cross join with null join table view + tmp_join_keys = [] + if not join_keys: + entity_df_with_features["__tmp"] = 1 + df_to_join["__tmp"] = 1 + tmp_join_keys = ["__tmp"] + + # Get only data with requested entities + df_to_join = dd.merge( + entity_df_with_features, + df_to_join, + left_on=join_keys or tmp_join_keys, + right_on=join_keys or tmp_join_keys, + suffixes=("", "__"), + how="left", + ) + + if tmp_join_keys: + df_to_join = df_to_join.drop(tmp_join_keys, axis=1).persist() + else: + df_to_join = df_to_join.persist() + + return df_to_join From 6bdeec303dabcdf81a6201d03e1bc36d2f8b72a8 Mon Sep 17 00:00:00 2001 From: qooba Date: Mon, 14 Feb 2022 00:23:38 +0100 Subject: [PATCH 36/39] add dask - fix integration tests Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 294 +++++++++++------- 1 file changed, 180 insertions(+), 114 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index e9481c6f098..d8280bfb682 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -195,7 +195,6 @@ def evaluate_historical_retrieval(): ) # Build a list of entity columns to join on (from the right table) - all_join_keys = list(set(all_join_keys + join_keys)) join_keys = [] for entity_name in feature_view.entities: @@ -211,6 +210,8 @@ def evaluate_historical_retrieval(): ] + join_keys right_entity_key_columns = [c for c in right_entity_key_columns if c] + all_join_keys = list(set(all_join_keys + join_keys)) + storage_options = ( { "client_kwargs": { @@ -225,126 +226,40 @@ def evaluate_historical_retrieval(): feature_view.batch_source.path, storage_options=storage_options, ) - # Rename columns by the field mapping dictionary if it exists - if feature_view.batch_source.field_mapping: - df_to_join = _run_dask_field_mapping( - df_to_join, feature_view.batch_source.field_mapping - ) - # Rename entity columns by the join_key_map dictionary if it exists - if feature_view.projection.join_key_map: - df_to_join = _run_dask_field_mapping( - df_to_join, feature_view.projection.join_key_map - ) - - # Build a list of all the features we should select from this source - feature_names = [] - columns_map = {} - for feature in features: - # Modify the separator for feature refs in column names to double underscore. We are using - # double underscore as separator for consistency with other databases like BigQuery, - # where there are very few characters available for use as separators - if full_feature_names: - formatted_feature_name = ( - f"{feature_view.projection.name_to_use()}__{feature}" - ) - else: - formatted_feature_name = feature - # Add the feature name to the list of columns - feature_names.append(formatted_feature_name) - columns_map[feature] = formatted_feature_name - - # Ensure that the source dataframe feature column includes the feature view name as a prefix - df_to_join = _run_dask_field_mapping(df_to_join, columns_map) - - # Select only the columns we need to join from the feature dataframe - df_to_join = df_to_join[right_entity_key_columns + feature_names] - df_to_join = df_to_join.persist() - - # Make sure to not have duplicated columns - if entity_df_event_timestamp_col == event_timestamp_column: - df_to_join = _run_dask_field_mapping( - df_to_join, - {event_timestamp_column: f"__{event_timestamp_column}"}, - ) - event_timestamp_column = f"__{event_timestamp_column}" - - df_to_join = df_to_join.persist() + df_to_join, event_timestamp_column = _field_mapping( + df_to_join, + feature_view, + features, + right_entity_key_columns, + entity_df_event_timestamp_col, + event_timestamp_column, + full_feature_names, + ) df_to_join = _merge(entity_df_with_features, df_to_join, join_keys) - df_to_join_types = df_to_join.dtypes - event_timestamp_column_type = df_to_join_types[event_timestamp_column] - - if created_timestamp_column: - created_timestamp_column_type = df_to_join_types[ - created_timestamp_column - ] - - if ( - not hasattr(event_timestamp_column_type, "tz") - or event_timestamp_column_type.tz != pytz.UTC - ): - # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC - df_to_join[event_timestamp_column] = df_to_join[ - event_timestamp_column - ].apply( - lambda x: x - if x.tzinfo is not None - else x.replace(tzinfo=pytz.utc), - meta=(event_timestamp_column, "datetime64[ns, UTC]"), - ) - - if created_timestamp_column and ( - not hasattr(created_timestamp_column_type, "tz") - or created_timestamp_column_type.tz != pytz.UTC - ): - df_to_join[created_timestamp_column] = df_to_join[ - created_timestamp_column - ].apply( - lambda x: x - if x.tzinfo is not None - else x.replace(tzinfo=pytz.utc), - meta=(event_timestamp_column, "datetime64[ns, UTC]"), - ) + df_to_join = _normalize_timestamp( + df_to_join, event_timestamp_column, created_timestamp_column + ) - # Filter rows by defined timestamp tolerance - if feature_view.ttl != 0: - df_to_join = df_to_join[ - ( - df_to_join[event_timestamp_column] - >= df_to_join[entity_df_event_timestamp_col] - - feature_view.ttl - ) - & ( - df_to_join[event_timestamp_column] - <= df_to_join[entity_df_event_timestamp_col] - ) - ] - - df_to_join = df_to_join.persist() - - if created_timestamp_column: - df_to_join = df_to_join.sort_values(by=created_timestamp_column) - df_to_join = df_to_join.persist() - - df_to_join = df_to_join.sort_values(by=event_timestamp_column) - df_to_join = df_to_join.persist() - - df_to_join = df_to_join.drop_duplicates( - all_join_keys + [entity_df_event_timestamp_col], - keep="last", - ignore_index=True, + df_to_join = _filter_ttl( + df_to_join, + feature_view, + entity_df_event_timestamp_col, + event_timestamp_column, ) - df_to_join = df_to_join.persist() - entity_df_with_features = df_to_join.drop( - [event_timestamp_column], axis=1 - ).persist() + df_to_join = _drop_duplicates( + df_to_join, + all_join_keys, + event_timestamp_column, + created_timestamp_column, + entity_df_event_timestamp_col, + ) - if created_timestamp_column: - entity_df_with_features = entity_df_with_features.drop( - [created_timestamp_column], axis=1 - ).persist() + entity_df_with_features = _drop_columns( + df_to_join, event_timestamp_column, created_timestamp_column + ) # Ensure that we delete dataframes to free up memory del df_to_join @@ -517,6 +432,60 @@ def _get_entity_df_event_timestamp_range( ) +def _field_mapping( + df_to_join: dd.DataFrame, + feature_view: FeatureView, + features: List[str], + right_entity_key_columns: List[str], + entity_df_event_timestamp_col: str, + event_timestamp_column: str, + full_feature_names: bool, +) -> dd.DataFrame: + # Rename columns by the field mapping dictionary if it exists + if feature_view.batch_source.field_mapping: + df_to_join = _run_dask_field_mapping( + df_to_join, feature_view.batch_source.field_mapping + ) + # Rename entity columns by the join_key_map dictionary if it exists + if feature_view.projection.join_key_map: + df_to_join = _run_dask_field_mapping( + df_to_join, feature_view.projection.join_key_map + ) + + # Build a list of all the features we should select from this source + feature_names = [] + columns_map = {} + for feature in features: + # Modify the separator for feature refs in column names to double underscore. We are using + # double underscore as separator for consistency with other databases like BigQuery, + # where there are very few characters available for use as separators + if full_feature_names: + formatted_feature_name = ( + f"{feature_view.projection.name_to_use()}__{feature}" + ) + else: + formatted_feature_name = feature + # Add the feature name to the list of columns + feature_names.append(formatted_feature_name) + columns_map[feature] = formatted_feature_name + + # Ensure that the source dataframe feature column includes the feature view name as a prefix + df_to_join = _run_dask_field_mapping(df_to_join, columns_map) + + # Select only the columns we need to join from the feature dataframe + df_to_join = df_to_join[right_entity_key_columns + feature_names] + df_to_join = df_to_join.persist() + + # Make sure to not have duplicated columns + if entity_df_event_timestamp_col == event_timestamp_column: + df_to_join = _run_dask_field_mapping( + df_to_join, {event_timestamp_column: f"__{event_timestamp_column}"}, + ) + event_timestamp_column = f"__{event_timestamp_column}" + + return df_to_join.persist(), event_timestamp_column + + def _merge( entity_df_with_features: dd.DataFrame, df_to_join: dd.DataFrame, @@ -545,3 +514,100 @@ def _merge( df_to_join = df_to_join.persist() return df_to_join + + +def _normalize_timestamp( + df_to_join: dd.DataFrame, + event_timestamp_column: str, + created_timestamp_column: str, +) -> dd.DataFrame: + df_to_join_types = df_to_join.dtypes + event_timestamp_column_type = df_to_join_types[event_timestamp_column] + + if created_timestamp_column: + created_timestamp_column_type = df_to_join_types[created_timestamp_column] + + if ( + not hasattr(event_timestamp_column_type, "tz") + or event_timestamp_column_type.tz != pytz.UTC + ): + # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC + df_to_join[event_timestamp_column] = df_to_join[event_timestamp_column].apply( + lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), + meta=(event_timestamp_column, "datetime64[ns, UTC]"), + ) + + if created_timestamp_column and ( + not hasattr(created_timestamp_column_type, "tz") + or created_timestamp_column_type.tz != pytz.UTC + ): + df_to_join[created_timestamp_column] = df_to_join[ + created_timestamp_column + ].apply( + lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), + meta=(event_timestamp_column, "datetime64[ns, UTC]"), + ) + + return df_to_join.persist() + + +def _filter_ttl( + df_to_join: dd.DataFrame, + feature_view: FeatureView, + entity_df_event_timestamp_col: str, + event_timestamp_column: str, +) -> dd.DataFrame: + # Filter rows by defined timestamp tolerance + if feature_view.ttl and feature_view.ttl != 0: + df_to_join = df_to_join[ + ( + df_to_join[event_timestamp_column] + >= df_to_join[entity_df_event_timestamp_col] - feature_view.ttl + ) + & ( + df_to_join[event_timestamp_column] + <= df_to_join[entity_df_event_timestamp_col] + ) + ] + + df_to_join = df_to_join.persist() + + return df_to_join + + +def _drop_duplicates( + df_to_join: dd.DataFrame, + all_join_keys: List[str], + event_timestamp_column: str, + created_timestamp_column: str, + entity_df_event_timestamp_col: str, +) -> dd.DataFrame: + if created_timestamp_column: + df_to_join = df_to_join.sort_values(by=created_timestamp_column) + df_to_join = df_to_join.persist() + + df_to_join = df_to_join.sort_values(by=event_timestamp_column) + df_to_join = df_to_join.persist() + + df_to_join = df_to_join.drop_duplicates( + all_join_keys + [entity_df_event_timestamp_col], keep="last", ignore_index=True, + ) + + return df_to_join.persist() + + +def _drop_columns( + df_to_join: dd.DataFrame, + event_timestamp_column: str, + created_timestamp_column: str, +) -> dd.DataFrame: + entity_df_with_features = df_to_join.drop( + [event_timestamp_column], axis=1 + ).persist() + + if created_timestamp_column: + entity_df_with_features = entity_df_with_features.drop( + [created_timestamp_column], axis=1 + ).persist() + + return entity_df_with_features From 242adb3c08a0f1f14207bb332803ac7196c86137 Mon Sep 17 00:00:00 2001 From: qooba Date: Mon, 14 Feb 2022 01:36:19 +0100 Subject: [PATCH 37/39] add dask - code refactoring Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 75 +++++-------------- 1 file changed, 18 insertions(+), 57 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index d8280bfb682..ec1f5afe673 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -212,19 +212,7 @@ def evaluate_historical_retrieval(): all_join_keys = list(set(all_join_keys + join_keys)) - storage_options = ( - { - "client_kwargs": { - "endpoint_url": feature_view.batch_source.file_options.s3_endpoint_override - } - } - if feature_view.batch_source.file_options.s3_endpoint_override - else None - ) - - df_to_join = dd.read_parquet( - feature_view.batch_source.path, storage_options=storage_options, - ) + df_to_join = _read_datasource(feature_view.batch_source) df_to_join, event_timestamp_column = _field_mapping( df_to_join, @@ -297,53 +285,12 @@ def pull_latest_from_table_or_query( # Create lazy function that is only called from the RetrievalJob object def evaluate_offline_job(): + source_df = _read_datasource(data_source) - storage_options = ( - { - "client_kwargs": { - "endpoint_url": data_source.file_options.s3_endpoint_override - } - } - if data_source.file_options.s3_endpoint_override - else None + source_df = _normalize_timestamp( + source_df, event_timestamp_column, created_timestamp_column ) - source_df = dd.read_parquet( - data_source.path, storage_options=storage_options - ) - - source_df_types = source_df.dtypes - event_timestamp_column_type = source_df_types[event_timestamp_column] - - if created_timestamp_column: - created_timestamp_column_type = source_df_types[ - created_timestamp_column - ] - - if ( - not hasattr(event_timestamp_column_type, "tz") - or event_timestamp_column_type.tz != pytz.UTC - ): - source_df[event_timestamp_column] = source_df[ - event_timestamp_column - ].apply( - lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), - meta=(event_timestamp_column, "datetime64[ns, UTC]"), - ) - - if created_timestamp_column and ( - not hasattr(created_timestamp_column_type, "tz") - or created_timestamp_column_type.tz != pytz.UTC - ): - source_df[created_timestamp_column] = source_df[ - created_timestamp_column - ].apply( - lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), - meta=(event_timestamp_column, "datetime64[ns, UTC]"), - ) - - source_df = source_df.persist() - source_columns = set(source_df.columns) if not set(join_key_columns).issubset(source_columns): raise FeastJoinKeysDuringMaterialization( @@ -432,6 +379,20 @@ def _get_entity_df_event_timestamp_range( ) +def _read_datasource(data_source) -> dd.DataFrame: + storage_options = ( + { + "client_kwargs": { + "endpoint_url": data_source.file_options.s3_endpoint_override + } + } + if data_source.file_options.s3_endpoint_override + else None + ) + + return dd.read_parquet(data_source.path, storage_options=storage_options,) + + def _field_mapping( df_to_join: dd.DataFrame, feature_view: FeatureView, From e9bcd7ab3bff187c2daa5fa3cb45573cee99152f Mon Sep 17 00:00:00 2001 From: qooba Date: Mon, 14 Feb 2022 02:26:42 +0100 Subject: [PATCH 38/39] add dask - code refactoring Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index ec1f5afe673..e0e502b8a86 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -544,10 +544,12 @@ def _drop_duplicates( entity_df_event_timestamp_col: str, ) -> dd.DataFrame: if created_timestamp_column: - df_to_join = df_to_join.sort_values(by=created_timestamp_column) + df_to_join = df_to_join.sort_values( + by=created_timestamp_column, na_position="first" + ) df_to_join = df_to_join.persist() - df_to_join = df_to_join.sort_values(by=event_timestamp_column) + df_to_join = df_to_join.sort_values(by=event_timestamp_column, na_position="first") df_to_join = df_to_join.persist() df_to_join = df_to_join.drop_duplicates( From 7799a67e7ab3faa729d85c5aeeb3dc5529c84e5c Mon Sep 17 00:00:00 2001 From: qooba Date: Wed, 16 Feb 2022 02:30:39 +0100 Subject: [PATCH 39/39] feast dask - fix timedelta Signed-off-by: qooba --- sdk/python/feast/infra/offline_stores/file.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index e0e502b8a86..c71f0c3ff74 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -519,7 +519,7 @@ def _filter_ttl( event_timestamp_column: str, ) -> dd.DataFrame: # Filter rows by defined timestamp tolerance - if feature_view.ttl and feature_view.ttl != 0: + if feature_view.ttl and feature_view.ttl.total_seconds() != 0: df_to_join = df_to_join[ ( df_to_join[event_timestamp_column]