@@ -256,8 +256,8 @@ def to_bigquery(
256256 job_config = bigquery .QueryJobConfig (destination = path )
257257
258258 if not job_config .dry_run and self .on_demand_feature_views :
259- job = _write_pyarrow_table_to_bq (
260- self .client , self . to_arrow (), job_config .destination
259+ job = self . client . load_table_from_dataframe (
260+ self .to_df (), job_config .destination
261261 )
262262 job .result ()
263263 print (f"Done writing to '{ job_config .destination } '." )
@@ -366,7 +366,7 @@ def _upload_entity_df_and_get_entity_schema(
366366 elif isinstance (entity_df , pd .DataFrame ):
367367 # Drop the index so that we dont have unnecessary columns
368368 entity_df .reset_index (drop = True , inplace = True )
369- job = _write_df_to_bq ( client , entity_df , table_name )
369+ job = client . load_table_from_dataframe ( entity_df , table_name )
370370 block_until_done (client , job )
371371 entity_schema = dict (zip (entity_df .columns , entity_df .dtypes ))
372372 else :
@@ -400,44 +400,6 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str]
400400 return client
401401
402402
403- def _write_df_to_bq (
404- client : bigquery .Client , df : pd .DataFrame , table_name : str
405- ) -> bigquery .LoadJob :
406- # It is complicated to get BQ to understand that we want an ARRAY<value_type>
407- # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions
408- # https://github.com/googleapis/python-bigquery/issues/19
409- writer = pyarrow .BufferOutputStream ()
410- pyarrow .parquet .write_table (
411- pyarrow .Table .from_pandas (df ), writer , use_compliant_nested_type = True
412- )
413- return _write_pyarrow_buffer_to_bq (client , writer .getvalue (), table_name ,)
414-
415-
416- def _write_pyarrow_table_to_bq (
417- client : bigquery .Client , table : pyarrow .Table , table_name : str
418- ) -> bigquery .LoadJob :
419- # It is complicated to get BQ to understand that we want an ARRAY<value_type>
420- # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions
421- # https://github.com/googleapis/python-bigquery/issues/19
422- writer = pyarrow .BufferOutputStream ()
423- pyarrow .parquet .write_table (table , writer , use_compliant_nested_type = True )
424- return _write_pyarrow_buffer_to_bq (client , writer .getvalue (), table_name ,)
425-
426-
427- def _write_pyarrow_buffer_to_bq (
428- client : bigquery .Client , buf : pyarrow .Buffer , table_name : str
429- ) -> bigquery .LoadJob :
430- reader = pyarrow .BufferReader (buf )
431-
432- parquet_options = bigquery .format_options .ParquetOptions ()
433- parquet_options .enable_list_inference = True
434- job_config = bigquery .LoadJobConfig ()
435- job_config .source_format = bigquery .SourceFormat .PARQUET
436- job_config .parquet_options = parquet_options
437-
438- return client .load_table_from_file (reader , table_name , job_config = job_config ,)
439-
440-
441403# TODO: Optimizations
442404# * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly
443405# * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe
0 commit comments