From a802cfef662381816296a3ef784ea40e1cd75c95 Mon Sep 17 00:00:00 2001 From: ckarwicki <71740096+ckarwicki@users.noreply.github.com> Date: Thu, 1 Sep 2022 17:55:17 -0400 Subject: [PATCH 1/3] Fix materialization when running on Spark cluster. When running materialization and have Spark offline store configured to use cluster (`spark.master` pointing to actual Spark master node) `self.to_spark_df().write.parquet(temp_dir, mode="overwrite")` will create parquet file in worker node but `return pq.read_table(temp_dir)` is executed on driver node and it can't read from worker. Proposed fix makes materialization work when run on Spark cluster. Signed-off-by: ckarwicki <104110169+ckarwicki-deloitte@users.noreply.github.com> Signed-off-by: ckarwicki <71740096+ckarwicki@users.noreply.github.com> --- .../offline_stores/contrib/spark_offline_store/spark.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 58519014b44..5ea3626f8eb 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -325,11 +325,8 @@ def _to_df_internal(self) -> pd.DataFrame: def _to_arrow_internal(self) -> pyarrow.Table: """Return dataset as pyarrow Table synchronously""" - - # write to temp parquet and then load it as pyarrow table from disk - with tempfile.TemporaryDirectory() as temp_dir: - self.to_spark_df().write.parquet(temp_dir, mode="overwrite") - return pq.read_table(temp_dir) + + return pyarrow.Table.from_pandas(self._to_df_internal()) def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): """ From 21fa5bcbac195887fbc89517703fa735750af206 Mon Sep 17 00:00:00 2001 From: ckarwicki <71740096+ckarwicki@users.noreply.github.com> Date: Thu, 1 Sep 2022 18:49:17 -0400 Subject: [PATCH 2/3] Fix linter. Signed-off-by: ckarwicki Signed-off-by: ckarwicki <104110169+ckarwicki-deloitte@users.noreply.github.com> Signed-off-by: ckarwicki <71740096+ckarwicki@users.noreply.github.com> --- .../infra/offline_stores/contrib/spark_offline_store/spark.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 5ea3626f8eb..fa16e862451 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -324,8 +324,7 @@ def _to_df_internal(self) -> pd.DataFrame: return self.to_spark_df().toPandas() def _to_arrow_internal(self) -> pyarrow.Table: - """Return dataset as pyarrow Table synchronously""" - + """Return dataset as pyarrow Table synchronously""" return pyarrow.Table.from_pandas(self._to_df_internal()) def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): From b9d36c106ae2846e44a7c2caf438196d9c8a9e1e Mon Sep 17 00:00:00 2001 From: ckarwicki <71740096+ckarwicki@users.noreply.github.com> Date: Thu, 1 Sep 2022 20:05:11 -0400 Subject: [PATCH 3/3] Fix linter. Signed-off-by: ckarwicki Signed-off-by: ckarwicki <104110169+ckarwicki-deloitte@users.noreply.github.com> Signed-off-by: ckarwicki <71740096+ckarwicki@users.noreply.github.com> --- .../infra/offline_stores/contrib/spark_offline_store/spark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index fa16e862451..e46425a7a78 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -324,7 +324,7 @@ def _to_df_internal(self) -> pd.DataFrame: return self.to_spark_df().toPandas() def _to_arrow_internal(self) -> pyarrow.Table: - """Return dataset as pyarrow Table synchronously""" + """Return dataset as pyarrow Table synchronously""" return pyarrow.Table.from_pandas(self._to_df_internal()) def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):