From 47541007ed4fc902cbf58b23b7a3bb28aa213a47 Mon Sep 17 00:00:00 2001 From: niklasvm Date: Thu, 11 Aug 2022 14:17:08 +0000 Subject: [PATCH 1/2] Fix unit tests related to empty list types Signed-off-by: niklasvm --- .../offline_stores/contrib/spark_offline_store/spark.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 45f860a8fbf..1fff4fd3c2f 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -1,3 +1,4 @@ +import tempfile import warnings from datetime import datetime from typing import Dict, List, Optional, Tuple, Union @@ -10,6 +11,7 @@ from pydantic import StrictStr from pyspark import SparkConf from pyspark.sql import SparkSession +import pyarrow.parquet as pq from pytz import utc from feast import FeatureView, OnDemandFeatureView @@ -267,8 +269,11 @@ def _to_df_internal(self) -> pd.DataFrame: def _to_arrow_internal(self) -> pyarrow.Table: """Return dataset as pyarrow Table synchronously""" - df = self.to_df() - return pyarrow.Table.from_pandas(df) # noqa + + # write to temp parquet and then load it as pyarrow table from disk + with tempfile.TemporaryDirectory() as temp_dir: + self.to_spark_df().write.parquet(temp_dir,mode="overwrite") + return pq.read_table(temp_dir) def persist(self, storage: SavedDatasetStorage): """ From 2ad009347dc82c9cfeb21456ec3b9b392b8ce187 Mon Sep 17 00:00:00 2001 From: niklasvm Date: Thu, 11 Aug 2022 14:18:43 +0000 Subject: [PATCH 2/2] formatting Signed-off-by: niklasvm --- .../infra/offline_stores/contrib/spark_offline_store/spark.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 1fff4fd3c2f..21a00a6c5a2 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -7,11 +7,11 @@ import pandas import pandas as pd import pyarrow +import pyarrow.parquet as pq import pyspark from pydantic import StrictStr from pyspark import SparkConf from pyspark.sql import SparkSession -import pyarrow.parquet as pq from pytz import utc from feast import FeatureView, OnDemandFeatureView @@ -272,7 +272,7 @@ def _to_arrow_internal(self) -> pyarrow.Table: # write to temp parquet and then load it as pyarrow table from disk with tempfile.TemporaryDirectory() as temp_dir: - self.to_spark_df().write.parquet(temp_dir,mode="overwrite") + self.to_spark_df().write.parquet(temp_dir, mode="overwrite") return pq.read_table(temp_dir) def persist(self, storage: SavedDatasetStorage):