Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 10 additions & 30 deletions bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import bigframes.core as core
import bigframes.core.blocks as blocks
import bigframes.core.compile
import bigframes.core.expression as expression
import bigframes.core.guid
import bigframes.core.ordering
import bigframes.core.pruning
Expand All @@ -51,7 +50,6 @@
import bigframes.exceptions
import bigframes.formatting_helpers as formatting_helpers
import bigframes.operations
import bigframes.operations.aggregations as agg_ops
import bigframes.session._io.bigquery as bf_io_bigquery
import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table
import bigframes.session._io.pandas as bf_io_pandas
Expand Down Expand Up @@ -225,39 +223,21 @@ def read_pandas_streaming(
for errors in self._bqclient.insert_rows_from_dataframe(
destination_table,
pandas_dataframe_copy,
# See: https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_insert_rows_json
row_ids=pandas_dataframe_copy[ordering_col],
):
if errors:
raise ValueError(
f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}"
)
array_value = (
core.ArrayValue.from_table(
table=destination_table,
schema=schemata.ArraySchema.from_bq_table(
destination_table, df_and_labels.col_type_overrides
),
session=self._session,
# Don't set the offsets column because we want to group by it.
)
# There may be duplicate rows because of hidden retries, so use a query to
# deduplicate based on the ordering ID, which is guaranteed to be unique.
# We know that rows with same ordering ID are duplicates,
# so ANY_VALUE() is deterministic.
.aggregate(
by_column_ids=[ordering_col],
aggregations=[
(
expression.UnaryAggregation(
agg_ops.AnyValueOp(),
expression.deref(field.name),
),
field.name,
)
for field in destination_table.schema
if field.name != ordering_col
],
).drop_columns([ordering_col])
)
array_value = core.ArrayValue.from_table(
table=destination_table,
schema=schemata.ArraySchema.from_bq_table(
destination_table, df_and_labels.col_type_overrides
),
session=self._session,
offsets_col=ordering_col,
).drop_columns([ordering_col])
block = blocks.Block(
array_value,
index_columns=new_idx_ids,
Expand Down
Loading