From 26e0b3e5783f812205803dff56a955a8cf016abc Mon Sep 17 00:00:00 2001 From: niklasvm Date: Mon, 5 Sep 2022 10:57:43 +0200 Subject: [PATCH 01/18] implement spark materialization engine Signed-off-by: niklasvm --- .../spark/spark_materialization_engine.py | 282 ++++++++++++++++++ sdk/python/feast/repo_config.py | 1 + .../contrib/spark/test_spark.py | 98 ++++++ 3 files changed, 381 insertions(+) create mode 100644 sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py create mode 100644 sdk/python/tests/integration/materialization/contrib/spark/test_spark.py diff --git a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py new file mode 100644 index 00000000000..e2fe6e4a40b --- /dev/null +++ b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py @@ -0,0 +1,282 @@ +import tempfile +import uuid +from dataclasses import dataclass +from datetime import datetime +from typing import Callable, List, Literal, Optional, Sequence, Union + +import dill +import pyarrow +from pyspark.sql import DataFrame +from tqdm import tqdm + +from feast.batch_feature_view import BatchFeatureView +from feast.entity import Entity +from feast.feature_view import FeatureView +from feast.infra.materialization.batch_materialization_engine import ( + BatchMaterializationEngine, + MaterializationJob, + MaterializationJobStatus, + MaterializationTask, +) +from feast.infra.offline_stores.contrib.spark_offline_store.spark import ( + SparkOfflineStore, + SparkRetrievalJob, +) +from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.passthrough_provider import PassthroughProvider +from feast.infra.registry.base_registry import BaseRegistry +from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.stream_feature_view import StreamFeatureView +from feast.utils import ( + _convert_arrow_to_proto, + _get_column_names, + _run_pyarrow_field_mapping, +) + + +class SparkMaterializationEngineConfig(FeastConfigBaseModel): + """Batch Materialization Engine config for spark engine""" + + type: Literal["spark"] = "spark" + """ Type selector""" + batch_size: int + + +@dataclass +class SparkMaterializationJob(MaterializationJob): + def __init__( + self, + job_id: str, + status: MaterializationJobStatus, + error: Optional[BaseException] = None, + ) -> None: + super().__init__() + self._job_id: str = job_id + self._status: MaterializationJobStatus = status + self._error: Optional[BaseException] = error + + def status(self) -> MaterializationJobStatus: + return self._status + + def error(self) -> Optional[BaseException]: + return self._error + + def should_be_retried(self) -> bool: + return False + + def job_id(self) -> str: + return self._job_id + + def url(self) -> Optional[str]: + return None + + +class SparkMaterializationEngine(BatchMaterializationEngine): + def update( + self, + project: str, + views_to_delete: Sequence[ + Union[BatchFeatureView, StreamFeatureView, FeatureView] + ], + views_to_keep: Sequence[ + Union[BatchFeatureView, StreamFeatureView, FeatureView] + ], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + ): + # Nothing to set up. + pass + + def teardown_infra( + self, + project: str, + fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]], + entities: Sequence[Entity], + ): + # Nothing to tear down. + pass + + def __init__( + self, + *, + repo_config: RepoConfig, + offline_store: SparkOfflineStore, + online_store: OnlineStore, + **kwargs, + ): + if not isinstance(offline_store, SparkOfflineStore): + raise TypeError( + "SparkMaterializationEngine is only compatible with the SparkOfflineStore" + ) + super().__init__( + repo_config=repo_config, + offline_store=offline_store, + online_store=online_store, + **kwargs, + ) + + def materialize( + self, registry, tasks: List[MaterializationTask] + ) -> List[MaterializationJob]: + return [ + self._materialize_one( + registry, + task.feature_view, + task.start_time, + task.end_time, + task.project, + task.tqdm_builder, + ) + for task in tasks + ] + + def _materialize_one( + self, + registry: BaseRegistry, + feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView], + start_date: datetime, + end_date: datetime, + project: str, + tqdm_builder: Callable[[int], tqdm], + ): + entities = [] + for entity_name in feature_view.entities: + entities.append(registry.get_entity(entity_name, project)) + + ( + join_key_columns, + feature_name_columns, + timestamp_field, + created_timestamp_column, + ) = _get_column_names(feature_view, entities) + + job_id = f"{feature_view.name}-{start_date}-{end_date}" + + try: + offline_job: SparkRetrievalJob = ( + self.offline_store.pull_latest_from_table_or_query( + config=self.repo_config, + data_source=feature_view.batch_source, + join_key_columns=join_key_columns, + feature_name_columns=feature_name_columns, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + start_date=start_date, + end_date=end_date, + ) + ) + + # serialize feature view using proto + feature_view_proto = feature_view.to_proto().SerializeToString() + + # serialize repo_config to disk. Will be used to instantiate the online store + repo_config_file = tempfile.NamedTemporaryFile(delete=False).name + with open(repo_config_file, "wb") as f: + dill.dump(self.repo_config, f) + + # split data into batches + spark_df = offline_job.to_spark_df() + batch_size = self.repo_config.batch_engine.batch_size + batched_spark_df, batch_column_alias = add_batch_column( + spark_df, + join_key_columns=join_key_columns, + timestamp_field=timestamp_field, + batch_size=batch_size, + ) + + schema = [ + f"{x} {y}" + for x, y in batched_spark_df.dtypes + [("success_flag", "string")] + ] + schema_ddl = ", ".join(schema) + result = batched_spark_df.groupBy(batch_column_alias).applyInPandas( + lambda x: _process_by_pandas_batch( + x, + feature_view_proto=feature_view_proto, + repo_config_file=repo_config_file, + ), + schema=schema_ddl, + ) + result.collect() + + return SparkMaterializationJob( + job_id=job_id, status=MaterializationJobStatus.SUCCEEDED + ) + except BaseException as e: + return SparkMaterializationJob( + job_id=job_id, status=MaterializationJobStatus.ERROR, error=e + ) + + +def add_batch_column( + spark_df: DataFrame, join_key_columns, timestamp_field, batch_size +): + """ + Generates a batch column for a data frame + """ + spark_session = spark_df.sparkSession + + # generate a unique name for the view + view_name = f"{uuid.uuid4()}".replace("-", "") + + row_number_index_alias = f"{view_name}_row_index" + batch_column_alias = f"{view_name}_batch" + original_columns_snippet = ", ".join(spark_df.columns) + + # generate batch + spark_df.createOrReplaceTempView(view_name) + batched_spark_df = spark_session.sql( + f""" + with add_index as ( + select + {original_columns_snippet}, + monotonically_increasing_id() as {row_number_index_alias} + from {view_name} + ) + select + {original_columns_snippet}, + floor({(row_number_index_alias)}/{batch_size}) as {batch_column_alias} + from add_index + """ + ) + + return batched_spark_df, batch_column_alias + + +def _process_by_pandas_batch(pdf, feature_view_proto, repo_config_file): + + # unserialize + proto = FeatureViewProto() + proto.ParseFromString(feature_view_proto) + feature_view = FeatureView.from_proto(proto) + + # load + with open(repo_config_file, "rb") as f: + repo_config = dill.load(f) + + provider = PassthroughProvider(repo_config) + online_store = provider.online_store + + table = pyarrow.Table.from_pandas(pdf) + + if feature_view.batch_source.field_mapping is not None: + table = _run_pyarrow_field_mapping( + table, feature_view.batch_source.field_mapping + ) + + join_key_to_value_type = { + entity.name: entity.dtype.to_value_type() + for entity in feature_view.entity_columns + } + + rows_to_write = _convert_arrow_to_proto(table, feature_view, join_key_to_value_type) + online_store.online_write_batch( + repo_config, + feature_view, + rows_to_write, + lambda x: None, + ) + pdf["success_flag"] = "SUCCESS" + + return pdf diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 47a5ae321d9..28bb660f576 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -39,6 +39,7 @@ "snowflake.engine": "feast.infra.materialization.snowflake_engine.SnowflakeMaterializationEngine", "lambda": "feast.infra.materialization.aws_lambda.lambda_engine.LambdaMaterializationEngine", "bytewax": "feast.infra.materialization.contrib.bytewax.bytewax_materialization_engine.BytewaxMaterializationEngine", + "spark": "feast.infra.materialization.contrib.spark.spark_materialization_engine.SparkMaterializationEngine", } ONLINE_STORE_CLASS_FOR_TYPE = { diff --git a/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py new file mode 100644 index 00000000000..3873d6d4e78 --- /dev/null +++ b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py @@ -0,0 +1,98 @@ +from datetime import timedelta + +import pytest + +from feast.entity import Entity +from feast.feature_view import FeatureView +from feast.field import Field +from feast.infra.offline_stores.contrib.spark_offline_store.tests.data_source import ( + SparkDataSourceCreator, +) +from feast.types import Float32 +from tests.data.data_creator import create_basic_driver_dataset +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) +from tests.integration.feature_repos.repo_configuration import ( + construct_test_environment, +) +from tests.utils.e2e_test_validation import validate_offline_online_store_consistency + + +@pytest.mark.integration +def test_spark_materialization_consistency(): + spark_config = IntegrationTestRepoConfig( + provider="local", + online_store={ + "type": "redis", + # "path": "data/online_store.db" + }, + offline_store_creator=SparkDataSourceCreator, + batch_engine={"type": "spark", "batch_size": 10}, + ) + spark_environment = construct_test_environment( + spark_config, None, entity_key_serialization_version=1 + ) + + df = create_basic_driver_dataset() + + # # generate a large data set + # now = datetime.utcnow().replace(microsecond=0, second=0, minute=0) + + # ts = pd.Timestamp(now).round("ms") + + # size = 10000 + # driver_id = np.array(list(range(size))) + # value = np.array([round(np.random.uniform(size=1)[0],3) for x in list(range(size))]) + # ts_1 = [ts - timedelta(hours=4) for x in range(size)] + # created_ts = np.repeat(ts,repeats=size) + + # df = pd.DataFrame({ + # "driver_id": driver_id, + # "value": value, + # "ts_1": ts_1, + # "created_ts": created_ts + + # }) + # print(df) + + ds = spark_environment.data_source_creator.create_data_source( + df, + spark_environment.feature_store.project, + field_mapping={"ts_1": "ts"}, + ) + + fs = spark_environment.feature_store + driver = Entity( + name="driver_id", + join_keys=["driver_id"], + ) + + driver_stats_fv = FeatureView( + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(weeks=52), + schema=[Field(name="value", dtype=Float32)], + source=ds, + ) + + try: + + fs.apply([driver, driver_stats_fv]) + + print(df) + + # materialization is run in two steps and + # we use timestamp from generated dataframe as a split point + split_dt = df["ts_1"][4].to_pydatetime() - timedelta(seconds=1) + + print(f"Split datetime: {split_dt}") + + validate_offline_online_store_consistency(fs, driver_stats_fv, split_dt) + finally: + fs.teardown() + + +if __name__ == "__main__": + test_spark_materialization_consistency() + print() From 807e7cac5d21d58a5c5ddf1b2b51874abd758aad Mon Sep 17 00:00:00 2001 From: niklasvm Date: Mon, 5 Sep 2022 11:22:50 +0200 Subject: [PATCH 02/18] remove redundant code Signed-off-by: niklasvm --- .../integration/materialization/contrib/spark/test_spark.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py index 3873d6d4e78..b491409c579 100644 --- a/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py +++ b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py @@ -91,8 +91,3 @@ def test_spark_materialization_consistency(): validate_offline_online_store_consistency(fs, driver_stats_fv, split_dt) finally: fs.teardown() - - -if __name__ == "__main__": - test_spark_materialization_consistency() - print() From f8e70ea784cec8c4af7fd8f9aaa15cce24ff77a9 Mon Sep 17 00:00:00 2001 From: niklasvm Date: Mon, 5 Sep 2022 11:35:20 +0200 Subject: [PATCH 03/18] make function private Signed-off-by: niklasvm --- .../contrib/spark/spark_materialization_engine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py index e2fe6e4a40b..fdb01d8da3c 100644 --- a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py @@ -178,7 +178,7 @@ def _materialize_one( # split data into batches spark_df = offline_job.to_spark_df() batch_size = self.repo_config.batch_engine.batch_size - batched_spark_df, batch_column_alias = add_batch_column( + batched_spark_df, batch_column_alias = _add_batch_column( spark_df, join_key_columns=join_key_columns, timestamp_field=timestamp_field, @@ -209,7 +209,7 @@ def _materialize_one( ) -def add_batch_column( +def _add_batch_column( spark_df: DataFrame, join_key_columns, timestamp_field, batch_size ): """ From b42352e7b71850b801d187879328834387dd591a Mon Sep 17 00:00:00 2001 From: niklasvm Date: Mon, 5 Sep 2022 14:55:47 +0200 Subject: [PATCH 04/18] refactor serializing into a class Signed-off-by: niklasvm --- .../spark/spark_materialization_engine.py | 71 ++++++++++++------- 1 file changed, 44 insertions(+), 27 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py index fdb01d8da3c..6e7115f9db3 100644 --- a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py @@ -167,21 +167,15 @@ def _materialize_one( ) ) - # serialize feature view using proto - feature_view_proto = feature_view.to_proto().SerializeToString() - - # serialize repo_config to disk. Will be used to instantiate the online store - repo_config_file = tempfile.NamedTemporaryFile(delete=False).name - with open(repo_config_file, "wb") as f: - dill.dump(self.repo_config, f) + spark_serialized_artifacts = _SparkSerializedArtifacts.serialize( + feature_view=feature_view, repo_config=self.repo_config + ) # split data into batches spark_df = offline_job.to_spark_df() batch_size = self.repo_config.batch_engine.batch_size batched_spark_df, batch_column_alias = _add_batch_column( spark_df, - join_key_columns=join_key_columns, - timestamp_field=timestamp_field, batch_size=batch_size, ) @@ -191,11 +185,7 @@ def _materialize_one( ] schema_ddl = ", ".join(schema) result = batched_spark_df.groupBy(batch_column_alias).applyInPandas( - lambda x: _process_by_pandas_batch( - x, - feature_view_proto=feature_view_proto, - repo_config_file=repo_config_file, - ), + lambda x: _process_by_pandas_batch(x, spark_serialized_artifacts), schema=schema_ddl, ) result.collect() @@ -209,9 +199,7 @@ def _materialize_one( ) -def _add_batch_column( - spark_df: DataFrame, join_key_columns, timestamp_field, batch_size -): +def _add_batch_column(spark_df: DataFrame, batch_size): """ Generates a batch column for a data frame """ @@ -244,19 +232,48 @@ def _add_batch_column( return batched_spark_df, batch_column_alias -def _process_by_pandas_batch(pdf, feature_view_proto, repo_config_file): +@dataclass +class _SparkSerializedArtifacts: + """Class to assist with serializing unpicklable artifacts to the spark workers""" - # unserialize - proto = FeatureViewProto() - proto.ParseFromString(feature_view_proto) - feature_view = FeatureView.from_proto(proto) + feature_view_proto: str + repo_config_file: str - # load - with open(repo_config_file, "rb") as f: - repo_config = dill.load(f) + @classmethod + def serialize(cls, feature_view, repo_config): - provider = PassthroughProvider(repo_config) - online_store = provider.online_store + # serialize to proto + feature_view_proto = feature_view.to_proto().SerializeToString() + + # serialize repo_config to disk. Will be used to instantiate the online store + repo_config_file = tempfile.NamedTemporaryFile(delete=False).name + with open(repo_config_file, "wb") as f: + dill.dump(repo_config, f) + + return _SparkSerializedArtifacts( + feature_view_proto=feature_view_proto, repo_config_file=repo_config_file + ) + + def unserialize(self): + # unserialize + proto = FeatureViewProto() + proto.ParseFromString(self.feature_view_proto) + feature_view = FeatureView.from_proto(proto) + + # load + with open(self.repo_config_file, "rb") as f: + repo_config = dill.load(f) + + provider = PassthroughProvider(repo_config) + online_store = provider.online_store + return feature_view, online_store, repo_config + + +def _process_by_pandas_batch( + pdf, spark_serialized_artifacts: _SparkSerializedArtifacts +): + """Load pandas df to online store""" + feature_view, online_store, repo_config = spark_serialized_artifacts.unserialize() table = pyarrow.Table.from_pandas(pdf) From f609cfb3a54f244eb80cdc020193cc87f27a732f Mon Sep 17 00:00:00 2001 From: niklasvm Date: Mon, 5 Sep 2022 21:43:54 +0200 Subject: [PATCH 05/18] switch to using `foreachPartition` Signed-off-by: niklasvm --- .../spark/spark_materialization_engine.py | 77 +++++-------------- 1 file changed, 18 insertions(+), 59 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py index 6e7115f9db3..9ebc9a2b43c 100644 --- a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py @@ -1,12 +1,11 @@ import tempfile -import uuid from dataclasses import dataclass from datetime import datetime from typing import Callable, List, Literal, Optional, Sequence, Union import dill +import pandas as pd import pyarrow -from pyspark.sql import DataFrame from tqdm import tqdm from feast.batch_feature_view import BatchFeatureView @@ -171,25 +170,11 @@ def _materialize_one( feature_view=feature_view, repo_config=self.repo_config ) - # split data into batches spark_df = offline_job.to_spark_df() - batch_size = self.repo_config.batch_engine.batch_size - batched_spark_df, batch_column_alias = _add_batch_column( - spark_df, - batch_size=batch_size, + spark_df.foreachPartition( + lambda x: _process_by_partition(x, spark_serialized_artifacts) ) - schema = [ - f"{x} {y}" - for x, y in batched_spark_df.dtypes + [("success_flag", "string")] - ] - schema_ddl = ", ".join(schema) - result = batched_spark_df.groupBy(batch_column_alias).applyInPandas( - lambda x: _process_by_pandas_batch(x, spark_serialized_artifacts), - schema=schema_ddl, - ) - result.collect() - return SparkMaterializationJob( job_id=job_id, status=MaterializationJobStatus.SUCCEEDED ) @@ -199,39 +184,6 @@ def _materialize_one( ) -def _add_batch_column(spark_df: DataFrame, batch_size): - """ - Generates a batch column for a data frame - """ - spark_session = spark_df.sparkSession - - # generate a unique name for the view - view_name = f"{uuid.uuid4()}".replace("-", "") - - row_number_index_alias = f"{view_name}_row_index" - batch_column_alias = f"{view_name}_batch" - original_columns_snippet = ", ".join(spark_df.columns) - - # generate batch - spark_df.createOrReplaceTempView(view_name) - batched_spark_df = spark_session.sql( - f""" - with add_index as ( - select - {original_columns_snippet}, - monotonically_increasing_id() as {row_number_index_alias} - from {view_name} - ) - select - {original_columns_snippet}, - floor({(row_number_index_alias)}/{batch_size}) as {batch_column_alias} - from add_index - """ - ) - - return batched_spark_df, batch_column_alias - - @dataclass class _SparkSerializedArtifacts: """Class to assist with serializing unpicklable artifacts to the spark workers""" @@ -269,13 +221,23 @@ def unserialize(self): return feature_view, online_store, repo_config -def _process_by_pandas_batch( - pdf, spark_serialized_artifacts: _SparkSerializedArtifacts -): +def _process_by_partition(rows, spark_serialized_artifacts: _SparkSerializedArtifacts): """Load pandas df to online store""" - feature_view, online_store, repo_config = spark_serialized_artifacts.unserialize() - table = pyarrow.Table.from_pandas(pdf) + # convert to pyarrow table + dicts = [] + for row in rows: + dicts.append(row.asDict()) + + df = pd.DataFrame.from_records(dicts) + if df.shape[0] == 0: + print("Skipping") + return + + table = pyarrow.Table.from_pandas(df) + + # unserialize artifacts + feature_view, online_store, repo_config = spark_serialized_artifacts.unserialize() if feature_view.batch_source.field_mapping is not None: table = _run_pyarrow_field_mapping( @@ -294,6 +256,3 @@ def _process_by_pandas_batch( rows_to_write, lambda x: None, ) - pdf["success_flag"] = "SUCCESS" - - return pdf From 79ea412ad1c457a16a4b6f68db74b13f14f8165f Mon Sep 17 00:00:00 2001 From: niklasvm Date: Mon, 5 Sep 2022 21:52:01 +0200 Subject: [PATCH 06/18] remove batch_size parameter Signed-off-by: niklasvm --- .../contrib/spark/spark_materialization_engine.py | 1 - .../integration/materialization/contrib/spark/test_spark.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py index 9ebc9a2b43c..6796b54a36e 100644 --- a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py @@ -39,7 +39,6 @@ class SparkMaterializationEngineConfig(FeastConfigBaseModel): type: Literal["spark"] = "spark" """ Type selector""" - batch_size: int @dataclass diff --git a/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py index b491409c579..dcb88af89e3 100644 --- a/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py +++ b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py @@ -28,7 +28,7 @@ def test_spark_materialization_consistency(): # "path": "data/online_store.db" }, offline_store_creator=SparkDataSourceCreator, - batch_engine={"type": "spark", "batch_size": 10}, + batch_engine={"type": "spark"}, ) spark_environment = construct_test_environment( spark_config, None, entity_key_serialization_version=1 From 937a0e380a3e950c70c0d252c9819e645afc5d04 Mon Sep 17 00:00:00 2001 From: niklasvm Date: Wed, 7 Sep 2022 22:04:51 +0200 Subject: [PATCH 07/18] add partitions parameter Signed-off-by: niklasvm --- .../contrib/spark/spark_materialization_engine.py | 8 ++++++++ .../materialization/contrib/spark/test_spark.py | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py index 6796b54a36e..dae8bda3f7e 100644 --- a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py @@ -40,6 +40,9 @@ class SparkMaterializationEngineConfig(FeastConfigBaseModel): type: Literal["spark"] = "spark" """ Type selector""" + partitions: int = None + """Number of partitions to use when writing data to online store""" + @dataclass class SparkMaterializationJob(MaterializationJob): @@ -170,6 +173,11 @@ def _materialize_one( ) spark_df = offline_job.to_spark_df() + if self.repo_config.batch_engine.partitions: + spark_df = spark_df.repartition( + self.repo_config.batch_engine.partitions + ) + spark_df.foreachPartition( lambda x: _process_by_partition(x, spark_serialized_artifacts) ) diff --git a/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py index dcb88af89e3..486916dda19 100644 --- a/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py +++ b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py @@ -28,7 +28,7 @@ def test_spark_materialization_consistency(): # "path": "data/online_store.db" }, offline_store_creator=SparkDataSourceCreator, - batch_engine={"type": "spark"}, + batch_engine={"type": "spark", "partitions": 100}, ) spark_environment = construct_test_environment( spark_config, None, entity_key_serialization_version=1 From 610614acbd321f3f3cc6e4d6c9b0fbda09ce5430 Mon Sep 17 00:00:00 2001 From: niklasvm Date: Wed, 7 Sep 2022 22:07:16 +0200 Subject: [PATCH 08/18] linting Signed-off-by: niklasvm --- .../contrib/spark/spark_materialization_engine.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py index dae8bda3f7e..7fc348777e4 100644 --- a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py @@ -40,8 +40,8 @@ class SparkMaterializationEngineConfig(FeastConfigBaseModel): type: Literal["spark"] = "spark" """ Type selector""" - partitions: int = None - """Number of partitions to use when writing data to online store""" + partitions: int = 0 + """Number of partitions to use when writing data to online store. If 0, no repartitioning is done""" @dataclass @@ -173,7 +173,7 @@ def _materialize_one( ) spark_df = offline_job.to_spark_df() - if self.repo_config.batch_engine.partitions: + if self.repo_config.batch_engine.partitions != 0: spark_df = spark_df.repartition( self.repo_config.batch_engine.partitions ) From 84cc858b02188575eb27bf372c87dab1450033eb Mon Sep 17 00:00:00 2001 From: niklasvm Date: Wed, 7 Sep 2022 22:14:50 +0200 Subject: [PATCH 09/18] rename spark to spark.offline and spark.engine Signed-off-by: niklasvm --- .../contrib/spark/spark_materialization_engine.py | 2 +- .../offline_stores/contrib/spark_offline_store/spark.py | 6 +++--- .../integration/materialization/contrib/spark/test_spark.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py index 7fc348777e4..66eb97bca78 100644 --- a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py @@ -37,7 +37,7 @@ class SparkMaterializationEngineConfig(FeastConfigBaseModel): """Batch Materialization Engine config for spark engine""" - type: Literal["spark"] = "spark" + type: Literal["spark.engine"] = "spark.engine" """ Type selector""" partitions: int = 0 diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 58519014b44..569da5a0ce1 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -39,7 +39,7 @@ class SparkOfflineStoreConfig(FeastConfigBaseModel): - type: StrictStr = "spark" + type: StrictStr = "spark.offline" """ Offline store type selector""" spark_conf: Optional[Dict[str, str]] = None @@ -49,7 +49,7 @@ class SparkOfflineStoreConfig(FeastConfigBaseModel): class SparkOfflineStore(OfflineStore): @staticmethod - @log_exceptions_and_usage(offline_store="spark") + @log_exceptions_and_usage(offline_store="spark.offline") def pull_latest_from_table_or_query( config: RepoConfig, data_source: DataSource, @@ -247,7 +247,7 @@ def offline_write_batch( ) @staticmethod - @log_exceptions_and_usage(offline_store="spark") + @log_exceptions_and_usage(offline_store="spark.offline") def pull_all_from_table_or_query( config: RepoConfig, data_source: DataSource, diff --git a/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py index 486916dda19..a62b5a11abb 100644 --- a/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py +++ b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py @@ -28,7 +28,7 @@ def test_spark_materialization_consistency(): # "path": "data/online_store.db" }, offline_store_creator=SparkDataSourceCreator, - batch_engine={"type": "spark", "partitions": 100}, + batch_engine={"type": "spark.engine", "partitions": 100}, ) spark_environment = construct_test_environment( spark_config, None, entity_key_serialization_version=1 From 8cc4928d764d53e7c57e5d3689200eefadd6b7f9 Mon Sep 17 00:00:00 2001 From: niklasvm Date: Wed, 7 Sep 2022 22:15:57 +0200 Subject: [PATCH 10/18] fix to test Signed-off-by: niklasvm --- .../contrib/spark_offline_store/tests/data_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py index ab1acbef73e..dc8c9a80aee 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py @@ -56,7 +56,7 @@ def teardown(self): def create_offline_store_config(self): self.spark_offline_store_config = SparkOfflineStoreConfig() - self.spark_offline_store_config.type = "spark" + self.spark_offline_store_config.type = "spark.offline" self.spark_offline_store_config.spark_conf = self.spark_conf return self.spark_offline_store_config From 6a9663dbfe01a27d25ff9db5d78b3bd96a7a04f4 Mon Sep 17 00:00:00 2001 From: niklasvm Date: Wed, 7 Sep 2022 22:19:55 +0200 Subject: [PATCH 11/18] forgot to stage Signed-off-by: niklasvm --- sdk/python/feast/repo_config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 28bb660f576..1427d40bc3f 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -39,7 +39,7 @@ "snowflake.engine": "feast.infra.materialization.snowflake_engine.SnowflakeMaterializationEngine", "lambda": "feast.infra.materialization.aws_lambda.lambda_engine.LambdaMaterializationEngine", "bytewax": "feast.infra.materialization.contrib.bytewax.bytewax_materialization_engine.BytewaxMaterializationEngine", - "spark": "feast.infra.materialization.contrib.spark.spark_materialization_engine.SparkMaterializationEngine", + "spark.engine": "feast.infra.materialization.contrib.spark.spark_materialization_engine.SparkMaterializationEngine", } ONLINE_STORE_CLASS_FOR_TYPE = { @@ -58,7 +58,7 @@ "bigquery": "feast.infra.offline_stores.bigquery.BigQueryOfflineStore", "redshift": "feast.infra.offline_stores.redshift.RedshiftOfflineStore", "snowflake.offline": "feast.infra.offline_stores.snowflake.SnowflakeOfflineStore", - "spark": "feast.infra.offline_stores.contrib.spark_offline_store.spark.SparkOfflineStore", + "spark.offline": "feast.infra.offline_stores.contrib.spark_offline_store.spark.SparkOfflineStore", "trino": "feast.infra.offline_stores.contrib.trino_offline_store.trino.TrinoOfflineStore", "postgres": "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.PostgreSQLOfflineStore", "athena": "feast.infra.offline_stores.contrib.athena_offline_store.athena.AthenaOfflineStore", From 715bb7254235e007de06206c51c57d79664c5948 Mon Sep 17 00:00:00 2001 From: niklasvm Date: Thu, 8 Sep 2022 07:39:22 +0200 Subject: [PATCH 12/18] revert spark.offline to spark to ensure backward compatibility Signed-off-by: niklasvm --- .../offline_stores/contrib/spark_offline_store/spark.py | 6 +++--- .../contrib/spark_offline_store/tests/data_source.py | 2 +- sdk/python/feast/repo_config.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 569da5a0ce1..58519014b44 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -39,7 +39,7 @@ class SparkOfflineStoreConfig(FeastConfigBaseModel): - type: StrictStr = "spark.offline" + type: StrictStr = "spark" """ Offline store type selector""" spark_conf: Optional[Dict[str, str]] = None @@ -49,7 +49,7 @@ class SparkOfflineStoreConfig(FeastConfigBaseModel): class SparkOfflineStore(OfflineStore): @staticmethod - @log_exceptions_and_usage(offline_store="spark.offline") + @log_exceptions_and_usage(offline_store="spark") def pull_latest_from_table_or_query( config: RepoConfig, data_source: DataSource, @@ -247,7 +247,7 @@ def offline_write_batch( ) @staticmethod - @log_exceptions_and_usage(offline_store="spark.offline") + @log_exceptions_and_usage(offline_store="spark") def pull_all_from_table_or_query( config: RepoConfig, data_source: DataSource, diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py index dc8c9a80aee..ab1acbef73e 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py @@ -56,7 +56,7 @@ def teardown(self): def create_offline_store_config(self): self.spark_offline_store_config = SparkOfflineStoreConfig() - self.spark_offline_store_config.type = "spark.offline" + self.spark_offline_store_config.type = "spark" self.spark_offline_store_config.spark_conf = self.spark_conf return self.spark_offline_store_config diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 1427d40bc3f..d5f68a8db61 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -58,7 +58,7 @@ "bigquery": "feast.infra.offline_stores.bigquery.BigQueryOfflineStore", "redshift": "feast.infra.offline_stores.redshift.RedshiftOfflineStore", "snowflake.offline": "feast.infra.offline_stores.snowflake.SnowflakeOfflineStore", - "spark.offline": "feast.infra.offline_stores.contrib.spark_offline_store.spark.SparkOfflineStore", + "spark": "feast.infra.offline_stores.contrib.spark_offline_store.spark.SparkOfflineStore", "trino": "feast.infra.offline_stores.contrib.trino_offline_store.trino.TrinoOfflineStore", "postgres": "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.PostgreSQLOfflineStore", "athena": "feast.infra.offline_stores.contrib.athena_offline_store.athena.AthenaOfflineStore", From 32b31119de5d9b351d45235497a3f5cf49be6fbb Mon Sep 17 00:00:00 2001 From: niklasvm Date: Thu, 8 Sep 2022 08:14:28 +0200 Subject: [PATCH 13/18] fix import Signed-off-by: niklasvm --- .../infra/offline_stores/contrib/spark_offline_store/spark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 0a4ec05c23d..f47779a9220 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -15,7 +15,7 @@ from pyspark import SparkConf from pyspark.sql import SparkSession from pytz import utc -from sdk.python.feast.infra.utils import aws_utils +from feast.infra.utils import aws_utils from feast import FeatureView, OnDemandFeatureView from feast.data_source import DataSource From 0c13af9c1da79b65fca0f9ca2da47ce96d9dd9bc Mon Sep 17 00:00:00 2001 From: niklasvm Date: Thu, 8 Sep 2022 08:19:29 +0200 Subject: [PATCH 14/18] remove code from testing a large data set Signed-off-by: niklasvm --- .../contrib/spark/test_spark.py | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py index a62b5a11abb..5dc3b8dfb02 100644 --- a/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py +++ b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py @@ -36,26 +36,6 @@ def test_spark_materialization_consistency(): df = create_basic_driver_dataset() - # # generate a large data set - # now = datetime.utcnow().replace(microsecond=0, second=0, minute=0) - - # ts = pd.Timestamp(now).round("ms") - - # size = 10000 - # driver_id = np.array(list(range(size))) - # value = np.array([round(np.random.uniform(size=1)[0],3) for x in list(range(size))]) - # ts_1 = [ts - timedelta(hours=4) for x in range(size)] - # created_ts = np.repeat(ts,repeats=size) - - # df = pd.DataFrame({ - # "driver_id": driver_id, - # "value": value, - # "ts_1": ts_1, - # "created_ts": created_ts - - # }) - # print(df) - ds = spark_environment.data_source_creator.create_data_source( df, spark_environment.feature_store.project, From 542705f015b5201422a65eb0e7a75a9b89e6dd51 Mon Sep 17 00:00:00 2001 From: niklasvm Date: Thu, 8 Sep 2022 08:37:39 +0200 Subject: [PATCH 15/18] linting Signed-off-by: niklasvm --- .../infra/offline_stores/contrib/spark_offline_store/spark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index f47779a9220..035f2de6ad0 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -15,7 +15,6 @@ from pyspark import SparkConf from pyspark.sql import SparkSession from pytz import utc -from feast.infra.utils import aws_utils from feast import FeatureView, OnDemandFeatureView from feast.data_source import DataSource @@ -32,6 +31,7 @@ RetrievalMetadata, ) from feast.infra.registry.registry import Registry +from feast.infra.utils import aws_utils from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage from feast.type_map import spark_schema_to_np_dtypes From 262af105eab897d19f088dd662d5b205b9ac3dff Mon Sep 17 00:00:00 2001 From: niklasvm Date: Tue, 13 Sep 2022 06:56:53 +0200 Subject: [PATCH 16/18] test without repartition Signed-off-by: niklasvm --- .../integration/materialization/contrib/spark/test_spark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py index 5dc3b8dfb02..3c897ffb8e6 100644 --- a/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py +++ b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py @@ -28,7 +28,7 @@ def test_spark_materialization_consistency(): # "path": "data/online_store.db" }, offline_store_creator=SparkDataSourceCreator, - batch_engine={"type": "spark.engine", "partitions": 100}, + batch_engine={"type": "spark.engine", "partitions": 0}, ) spark_environment = construct_test_environment( spark_config, None, entity_key_serialization_version=1 From 8e59da2fc1c0aca1b368a71239fd3c62ea948c69 Mon Sep 17 00:00:00 2001 From: niklasvm Date: Tue, 13 Sep 2022 07:06:55 +0200 Subject: [PATCH 17/18] test alternate connection string Signed-off-by: niklasvm --- .../integration/materialization/contrib/spark/test_spark.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py index 3c897ffb8e6..c0337703328 100644 --- a/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py +++ b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py @@ -25,6 +25,7 @@ def test_spark_materialization_consistency(): provider="local", online_store={ "type": "redis", + "connection_string": "127.0.0.1:6379" # "path": "data/online_store.db" }, offline_store_creator=SparkDataSourceCreator, From f828d2b95bc5fd0dcbd813db47857d59f960cfce Mon Sep 17 00:00:00 2001 From: niklasvm Date: Tue, 13 Sep 2022 08:02:03 +0200 Subject: [PATCH 18/18] use redis online creator Signed-off-by: niklasvm --- .../materialization/contrib/spark/test_spark.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py index c0337703328..c7028a09ef4 100644 --- a/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py +++ b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py @@ -16,6 +16,9 @@ from tests.integration.feature_repos.repo_configuration import ( construct_test_environment, ) +from tests.integration.feature_repos.universal.online_store.redis import ( + RedisOnlineStoreCreator, +) from tests.utils.e2e_test_validation import validate_offline_online_store_consistency @@ -23,13 +26,9 @@ def test_spark_materialization_consistency(): spark_config = IntegrationTestRepoConfig( provider="local", - online_store={ - "type": "redis", - "connection_string": "127.0.0.1:6379" - # "path": "data/online_store.db" - }, + online_store_creator=RedisOnlineStoreCreator, offline_store_creator=SparkDataSourceCreator, - batch_engine={"type": "spark.engine", "partitions": 0}, + batch_engine={"type": "spark.engine", "partitions": 10}, ) spark_environment = construct_test_environment( spark_config, None, entity_key_serialization_version=1 @@ -72,3 +71,7 @@ def test_spark_materialization_consistency(): validate_offline_online_store_consistency(fs, driver_stats_fv, split_dt) finally: fs.teardown() + + +if __name__ == "__main__": + test_spark_materialization_consistency()