From eb151dea78d2c055d991a3bfa89d9914bdd7dea1 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 1 Apr 2025 23:42:43 +0000 Subject: [PATCH] feat: use row_ids to deduplicate when uploading via streaming api --- bigframes/session/loader.py | 40 ++++++++++--------------------------- 1 file changed, 10 insertions(+), 30 deletions(-) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 1296e9d1b3..9b654881db 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -41,7 +41,6 @@ import bigframes.core as core import bigframes.core.blocks as blocks import bigframes.core.compile -import bigframes.core.expression as expression import bigframes.core.guid import bigframes.core.ordering import bigframes.core.pruning @@ -51,7 +50,6 @@ import bigframes.exceptions import bigframes.formatting_helpers as formatting_helpers import bigframes.operations -import bigframes.operations.aggregations as agg_ops import bigframes.session._io.bigquery as bf_io_bigquery import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table import bigframes.session._io.pandas as bf_io_pandas @@ -225,39 +223,21 @@ def read_pandas_streaming( for errors in self._bqclient.insert_rows_from_dataframe( destination_table, pandas_dataframe_copy, + # See: https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_insert_rows_json + row_ids=pandas_dataframe_copy[ordering_col], ): if errors: raise ValueError( f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}" ) - array_value = ( - core.ArrayValue.from_table( - table=destination_table, - schema=schemata.ArraySchema.from_bq_table( - destination_table, df_and_labels.col_type_overrides - ), - session=self._session, - # Don't set the offsets column because we want to group by it. - ) - # There may be duplicate rows because of hidden retries, so use a query to - # deduplicate based on the ordering ID, which is guaranteed to be unique. - # We know that rows with same ordering ID are duplicates, - # so ANY_VALUE() is deterministic. - .aggregate( - by_column_ids=[ordering_col], - aggregations=[ - ( - expression.UnaryAggregation( - agg_ops.AnyValueOp(), - expression.deref(field.name), - ), - field.name, - ) - for field in destination_table.schema - if field.name != ordering_col - ], - ).drop_columns([ordering_col]) - ) + array_value = core.ArrayValue.from_table( + table=destination_table, + schema=schemata.ArraySchema.from_bq_table( + destination_table, df_and_labels.col_type_overrides + ), + session=self._session, + offsets_col=ordering_col, + ).drop_columns([ordering_col]) block = blocks.Block( array_value, index_columns=new_idx_ids,