@@ -223,23 +223,8 @@ def to_bigquery(
223223 job_config = bigquery .QueryJobConfig (destination = path )
224224
225225 if not job_config .dry_run and self .on_demand_feature_views is not None :
226- # It is complicated to get BQ to understand that we want an ARRAY<value_type>
227- # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions
228- # https://github.com/googleapis/python-bigquery/issues/19
229- writer = pyarrow .BufferOutputStream ()
230- pyarrow .parquet .write_table (
231- self .to_arrow (), writer , use_compliant_nested_type = True
232- )
233- reader = pyarrow .BufferReader (writer .getvalue ())
234-
235- parquet_options = bigquery .format_options .ParquetOptions ()
236- parquet_options .enable_list_inference = True
237- job_config = bigquery .LoadJobConfig ()
238- job_config .source_format = bigquery .SourceFormat .PARQUET
239- job_config .parquet_options = parquet_options
240-
241- job = self .client .load_table_from_file (
242- reader , job_config .destination , job_config = job_config ,
226+ job = _write_pyarrow_table_to_bq (
227+ self .client , self .to_arrow (), job_config .destination
243228 )
244229 job .result ()
245230 print (f"Done writing to '{ job_config .destination } '." )
@@ -344,23 +329,7 @@ def _upload_entity_df_and_get_entity_schema(
344329 elif isinstance (entity_df , pd .DataFrame ):
345330 # Drop the index so that we dont have unnecessary columns
346331 entity_df .reset_index (drop = True , inplace = True )
347-
348- # Upload the dataframe into BigQuery, creating a temporary table
349- # It is complicated to get BQ to understand that we want an ARRAY<value_type>
350- # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions
351- # https://github.com/googleapis/python-bigquery/issues/19
352- writer = pyarrow .BufferOutputStream ()
353- pyarrow .parquet .write_table (
354- pyarrow .Table .from_pandas (entity_df ), writer , use_compliant_nested_type = True
355- )
356- reader = pyarrow .BufferReader (writer .getvalue ())
357-
358- parquet_options = bigquery .format_options .ParquetOptions ()
359- parquet_options .enable_list_inference = True
360- job_config = bigquery .LoadJobConfig ()
361- job_config .source_format = bigquery .SourceFormat .PARQUET
362- job_config .parquet_options = parquet_options
363- job = client .load_table_from_file (reader , table_name , job_config = job_config )
332+ job = _write_df_to_bq (client , entity_df , table_name )
364333 block_until_done (client , job )
365334
366335 entity_schema = dict (zip (entity_df .columns , entity_df .dtypes ))
@@ -395,6 +364,44 @@ def _get_bigquery_client(project: Optional[str] = None):
395364 return client
396365
397366
367+ def _write_df_to_bq (
368+ client : bigquery .Client , df : pd .DataFrame , table_name : str
369+ ) -> bigquery .LoadJob :
370+ # It is complicated to get BQ to understand that we want an ARRAY<value_type>
371+ # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions
372+ # https://github.com/googleapis/python-bigquery/issues/19
373+ writer = pyarrow .BufferOutputStream ()
374+ pyarrow .parquet .write_table (
375+ pyarrow .Table .from_pandas (df ), writer , use_compliant_nested_type = True
376+ )
377+ return _write_pyarrow_buffer_to_bq (client , writer .getvalue (), table_name ,)
378+
379+
380+ def _write_pyarrow_table_to_bq (
381+ client : bigquery .Client , table : pyarrow .Table , table_name : str
382+ ) -> bigquery .LoadJob :
383+ # It is complicated to get BQ to understand that we want an ARRAY<value_type>
384+ # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions
385+ # https://github.com/googleapis/python-bigquery/issues/19
386+ writer = pyarrow .BufferOutputStream ()
387+ pyarrow .parquet .write_table (table , writer , use_compliant_nested_type = True )
388+ return _write_pyarrow_buffer_to_bq (client , writer .getvalue (), table_name ,)
389+
390+
391+ def _write_pyarrow_buffer_to_bq (
392+ client : bigquery .Client , buf : pyarrow .Buffer , table_name : str
393+ ) -> bigquery .LoadJob :
394+ reader = pyarrow .BufferReader (buf )
395+
396+ parquet_options = bigquery .format_options .ParquetOptions ()
397+ parquet_options .enable_list_inference = True
398+ job_config = bigquery .LoadJobConfig ()
399+ job_config .source_format = bigquery .SourceFormat .PARQUET
400+ job_config .parquet_options = parquet_options
401+
402+ return client .load_table_from_file (reader , table_name , job_config = job_config ,)
403+
404+
398405# TODO: Optimizations
399406# * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly
400407# * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe
0 commit comments