Skip to content

Commit db6f607

Browse files
committed
Upload all data to BQ in ARRAY safe manner
Signed-off-by: Judah Rand <[email protected]>
1 parent 81c8abc commit db6f607

4 files changed

Lines changed: 55 additions & 49 deletions

File tree

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -223,23 +223,8 @@ def to_bigquery(
223223
job_config = bigquery.QueryJobConfig(destination=path)
224224

225225
if not job_config.dry_run and self.on_demand_feature_views is not None:
226-
# It is complicated to get BQ to understand that we want an ARRAY<value_type>
227-
# https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions
228-
# https://github.com/googleapis/python-bigquery/issues/19
229-
writer = pyarrow.BufferOutputStream()
230-
pyarrow.parquet.write_table(
231-
self.to_arrow(), writer, use_compliant_nested_type=True
232-
)
233-
reader = pyarrow.BufferReader(writer.getvalue())
234-
235-
parquet_options = bigquery.format_options.ParquetOptions()
236-
parquet_options.enable_list_inference = True
237-
job_config = bigquery.LoadJobConfig()
238-
job_config.source_format = bigquery.SourceFormat.PARQUET
239-
job_config.parquet_options = parquet_options
240-
241-
job = self.client.load_table_from_file(
242-
reader, job_config.destination, job_config=job_config,
226+
job = _write_pyarrow_table_to_bq(
227+
self.client, self.to_arrow(), job_config.destination
243228
)
244229
job.result()
245230
print(f"Done writing to '{job_config.destination}'.")
@@ -344,23 +329,7 @@ def _upload_entity_df_and_get_entity_schema(
344329
elif isinstance(entity_df, pd.DataFrame):
345330
# Drop the index so that we dont have unnecessary columns
346331
entity_df.reset_index(drop=True, inplace=True)
347-
348-
# Upload the dataframe into BigQuery, creating a temporary table
349-
# It is complicated to get BQ to understand that we want an ARRAY<value_type>
350-
# https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions
351-
# https://github.com/googleapis/python-bigquery/issues/19
352-
writer = pyarrow.BufferOutputStream()
353-
pyarrow.parquet.write_table(
354-
pyarrow.Table.from_pandas(entity_df), writer, use_compliant_nested_type=True
355-
)
356-
reader = pyarrow.BufferReader(writer.getvalue())
357-
358-
parquet_options = bigquery.format_options.ParquetOptions()
359-
parquet_options.enable_list_inference = True
360-
job_config = bigquery.LoadJobConfig()
361-
job_config.source_format = bigquery.SourceFormat.PARQUET
362-
job_config.parquet_options = parquet_options
363-
job = client.load_table_from_file(reader, table_name, job_config=job_config)
332+
job = _write_df_to_bq(client, entity_df, table_name)
364333
block_until_done(client, job)
365334

366335
entity_schema = dict(zip(entity_df.columns, entity_df.dtypes))
@@ -395,6 +364,44 @@ def _get_bigquery_client(project: Optional[str] = None):
395364
return client
396365

397366

367+
def _write_df_to_bq(
368+
client: bigquery.Client, df: pd.DataFrame, table_name: str
369+
) -> bigquery.LoadJob:
370+
# It is complicated to get BQ to understand that we want an ARRAY<value_type>
371+
# https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions
372+
# https://github.com/googleapis/python-bigquery/issues/19
373+
writer = pyarrow.BufferOutputStream()
374+
pyarrow.parquet.write_table(
375+
pyarrow.Table.from_pandas(df), writer, use_compliant_nested_type=True
376+
)
377+
return _write_pyarrow_buffer_to_bq(client, writer.getvalue(), table_name,)
378+
379+
380+
def _write_pyarrow_table_to_bq(
381+
client: bigquery.Client, table: pyarrow.Table, table_name: str
382+
) -> bigquery.LoadJob:
383+
# It is complicated to get BQ to understand that we want an ARRAY<value_type>
384+
# https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions
385+
# https://github.com/googleapis/python-bigquery/issues/19
386+
writer = pyarrow.BufferOutputStream()
387+
pyarrow.parquet.write_table(table, writer, use_compliant_nested_type=True)
388+
return _write_pyarrow_buffer_to_bq(client, writer.getvalue(), table_name,)
389+
390+
391+
def _write_pyarrow_buffer_to_bq(
392+
client: bigquery.Client, buf: pyarrow.Buffer, table_name: str
393+
) -> bigquery.LoadJob:
394+
reader = pyarrow.BufferReader(buf)
395+
396+
parquet_options = bigquery.format_options.ParquetOptions()
397+
parquet_options.enable_list_inference = True
398+
job_config = bigquery.LoadJobConfig()
399+
job_config.source_format = bigquery.SourceFormat.PARQUET
400+
job_config.parquet_options = parquet_options
401+
402+
return client.load_table_from_file(reader, table_name, job_config=job_config,)
403+
404+
398405
# TODO: Optimizations
399406
# * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly
400407
# * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe

sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@
66

77
from feast import BigQuerySource
88
from feast.data_source import DataSource
9-
from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig
9+
from feast.infra.offline_stores.bigquery import (
10+
BigQueryOfflineStoreConfig,
11+
_write_df_to_bq,
12+
)
1013
from tests.integration.feature_repos.universal.data_source_creator import (
1114
DataSourceCreator,
1215
)
@@ -61,15 +64,12 @@ def create_data_source(
6164

6265
self.create_dataset()
6366

64-
job_config = bigquery.LoadJobConfig()
6567
if self.gcp_project not in destination_name:
6668
destination_name = (
6769
f"{self.gcp_project}.{self.project_name}.{destination_name}"
6870
)
6971

70-
job = self.client.load_table_from_dataframe(
71-
df, destination_name, job_config=job_config
72-
)
72+
job = _write_df_to_bq(self.client, df, destination_name)
7373
job.result()
7474

7575
self.tables.append(destination_name)

sdk/python/tests/integration/offline_store/test_historical_retrieval.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
from feast.feature import Feature
2020
from feast.feature_store import FeatureStore, _validate_feature_refs
2121
from feast.feature_view import FeatureView
22-
from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig
22+
from feast.infra.offline_stores.bigquery import (
23+
BigQueryOfflineStoreConfig,
24+
_write_df_to_bq,
25+
)
2326
from feast.infra.offline_stores.offline_utils import (
2427
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
2528
)
@@ -62,9 +65,8 @@ def stage_driver_hourly_stats_parquet_source(directory, df):
6265

6366
def stage_driver_hourly_stats_bigquery_source(df, table_id):
6467
client = bigquery.Client()
65-
job_config = bigquery.LoadJobConfig()
6668
df.reset_index(drop=True, inplace=True)
67-
job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
69+
job = _write_df_to_bq(client, df, table_id)
6870
job.result()
6971

7072

@@ -99,9 +101,8 @@ def feature_service(name: str, views) -> FeatureService:
99101

100102
def stage_customer_daily_profile_bigquery_source(df, table_id):
101103
client = bigquery.Client()
102-
job_config = bigquery.LoadJobConfig()
103104
df.reset_index(drop=True, inplace=True)
104-
job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
105+
job = _write_df_to_bq(client, df, table_id)
105106
job.result()
106107

107108

@@ -231,9 +232,8 @@ def get_expected_training_df(
231232

232233
def stage_orders_bigquery(df, table_id):
233234
client = bigquery.Client()
234-
job_config = bigquery.LoadJobConfig()
235235
df.reset_index(drop=True, inplace=True)
236-
job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
236+
job = _write_df_to_bq(client, df, table_id)
237237
job.result()
238238

239239

sdk/python/tests/utils/data_source_utils.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from feast import BigQuerySource, FileSource
99
from feast.data_format import ParquetFormat
10+
from feast.infra.offline_stores.bigquery import _write_df_to_bq
1011

1112

1213
@contextlib.contextmanager
@@ -38,9 +39,7 @@ def simple_bq_source_using_table_ref_arg(
3839
client.update_dataset(dataset, ["default_table_expiration_ms"])
3940
table_ref = f"{gcp_project}.{bigquery_dataset}.table_{random.randrange(100, 999)}"
4041

41-
job = client.load_table_from_dataframe(
42-
df, table_ref, job_config=bigquery.LoadJobConfig()
43-
)
42+
job = _write_df_to_bq(client, df, table_ref)
4443
job.result()
4544

4645
return BigQuerySource(

0 commit comments

Comments
 (0)