From d001d85273ebd6e8cccfa1db9078da12e57b9554 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 9 May 2025 22:40:49 +0000 Subject: [PATCH 1/7] feat: Add deferred data uploading --- bigframes/core/array_value.py | 11 +- bigframes/core/nodes.py | 20 +++- bigframes/session/__init__.py | 19 ++-- bigframes/session/bq_caching_executor.py | 76 ++++++++++++- bigframes/session/loader.py | 117 ++++++++++++-------- tests/system/small/test_large_local_data.py | 50 +++++++++ third_party/bigframes_vendored/constants.py | 1 + 7 files changed, 233 insertions(+), 61 deletions(-) create mode 100644 tests/system/small/test_large_local_data.py diff --git a/bigframes/core/array_value.py b/bigframes/core/array_value.py index 60f5315554..20773fd1b4 100644 --- a/bigframes/core/array_value.py +++ b/bigframes/core/array_value.py @@ -133,8 +133,17 @@ def from_table( ordering=ordering, n_rows=n_rows, ) + return cls.from_bq_data_source(source_def, scan_list, session) + + @classmethod + def from_bq_data_source( + cls, + source: nodes.BigqueryDataSource, + scan_list: nodes.ScanList, + session: Session, + ): node = nodes.ReadTableNode( - source=source_def, + source=source, scan_list=scan_list, table_session=session, ) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 0fbfe7bd37..3e4bdb57c4 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -578,6 +578,9 @@ class ScanItem(typing.NamedTuple): def with_id(self, id: identifiers.ColumnId) -> ScanItem: return ScanItem(id, self.dtype, self.source_id) + def with_source_id(self, source_id: str) -> ScanItem: + return ScanItem(self.id, self.dtype, source_id) + @dataclasses.dataclass(frozen=True) class ScanList: @@ -614,6 +617,21 @@ def project( result = ScanList((self.items[:1])) return result + def remap_source_ids( + self, + mapping: Mapping[str, str], + ) -> ScanList: + items = tuple( + item.with_source_id(mapping.get(item.source_id, item.source_id)) + for item in self.items + ) + return ScanList(items) + + def append( + self, source_id: str, dtype: bigframes.dtypes.Dtype, id: identifiers.ColumnId + ) -> ScanList: + return ScanList((*self.items, ScanItem(id, dtype, source_id))) + @dataclasses.dataclass(frozen=True, eq=False) class ReadLocalNode(LeafNode): @@ -621,9 +639,9 @@ class ReadLocalNode(LeafNode): local_data_source: local_data.ManagedArrowTable # Mapping of local ids to bfet id. scan_list: ScanList + session: bigframes.session.Session # Offsets are generated only if this is non-null offsets_col: Optional[identifiers.ColumnId] = None - session: typing.Optional[bigframes.session.Session] = None @property def fields(self) -> Sequence[Field]: diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 7260553c14..47aef29326 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -246,13 +246,6 @@ def __init__( self._temp_storage_manager = ( self._session_resource_manager or self._anon_dataset_manager ) - self._executor: executor.Executor = bq_caching_executor.BigQueryCachingExecutor( - bqclient=self._clients_provider.bqclient, - bqstoragereadclient=self._clients_provider.bqstoragereadclient, - storage_manager=self._temp_storage_manager, - strictly_ordered=self._strictly_ordered, - metrics=self._metrics, - ) self._loader = bigframes.session.loader.GbqDataLoader( session=self, bqclient=self._clients_provider.bqclient, @@ -263,6 +256,14 @@ def __init__( force_total_order=self._strictly_ordered, metrics=self._metrics, ) + self._executor: executor.Executor = bq_caching_executor.BigQueryCachingExecutor( + bqclient=self._clients_provider.bqclient, + bqstoragereadclient=self._clients_provider.bqstoragereadclient, + loader=self._loader, + storage_manager=self._temp_storage_manager, + strictly_ordered=self._strictly_ordered, + metrics=self._metrics, + ) def __del__(self): """Automatic cleanup of internal resources.""" @@ -929,6 +930,10 @@ def _read_pandas( return self._loader.read_pandas( pandas_dataframe, method="write", api_name=api_name ) + elif write_engine == "_deferred": + import bigframes.dataframe as dataframe + + return dataframe.DataFrame(blocks.Block.from_local(pandas_dataframe, self)) else: raise ValueError(f"Got unexpected write_engine '{write_engine}'") diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 72f2dfa4b5..2e295b08c7 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -17,6 +17,7 @@ import dataclasses import math import os +import threading from typing import cast, Literal, Mapping, Optional, Sequence, Tuple, Union import warnings import weakref @@ -28,7 +29,7 @@ import google.cloud.bigquery_storage_v1 import bigframes.core -from bigframes.core import compile, rewrite +from bigframes.core import compile, local_data, rewrite import bigframes.core.guid import bigframes.core.nodes as nodes import bigframes.core.ordering as order @@ -36,7 +37,7 @@ import bigframes.dtypes import bigframes.exceptions as bfe import bigframes.features -from bigframes.session import executor, local_scan_executor, read_api_execution +from bigframes.session import executor, loader, local_scan_executor, read_api_execution import bigframes.session._io.bigquery as bq_io import bigframes.session.metrics import bigframes.session.planner @@ -65,12 +66,19 @@ def _get_default_output_spec() -> OutputSpec: ) +SourceIdMapping = Mapping[str, str] + + class ExecutionCache: def __init__(self): # current assumption is only 1 cache of a given node # in future, might have multiple caches, with different layout, localities self._cached_executions: weakref.WeakKeyDictionary[ - nodes.BigFrameNode, nodes.BigFrameNode + nodes.BigFrameNode, nodes.CachedTableNode + ] = weakref.WeakKeyDictionary() + self._uploaded_local_data: weakref.WeakKeyDictionary[ + local_data.ManagedArrowTable, + tuple[nodes.BigqueryDataSource, SourceIdMapping], ] = weakref.WeakKeyDictionary() @property @@ -103,6 +111,17 @@ def cache_results_table( assert original_root.schema == cached_replacement.schema self._cached_executions[original_root] = cached_replacement + def cache_remote_replacement( + self, + local_data: local_data.ManagedArrowTable, + bq_data: nodes.BigqueryDataSource, + ): + mapping = { + local_data.schema.items[i].column: bq_data.table.physical_schema[i].name + for i in range(len(local_data.schema)) + } + self._uploaded_local_data[local_data] = (bq_data, mapping) + class BigQueryCachingExecutor(executor.Executor): """Computes BigFrames values using BigQuery Engine. @@ -118,6 +137,7 @@ def __init__( bqclient: bigquery.Client, storage_manager: bigframes.session.temporary_storage.TemporaryStorageManager, bqstoragereadclient: google.cloud.bigquery_storage_v1.BigQueryReadClient, + loader: loader.GbqDataLoader, *, strictly_ordered: bool = True, metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, @@ -127,6 +147,7 @@ def __init__( self.strictly_ordered: bool = strictly_ordered self.cache: ExecutionCache = ExecutionCache() self.metrics = metrics + self.loader = loader self.bqstoragereadclient = bqstoragereadclient # Simple left-to-right precedence for now self._semi_executors = ( @@ -136,6 +157,7 @@ def __init__( ), local_scan_executor.LocalScanExecutor(), ) + self._upload_lock = threading.Lock() def to_sql( self, @@ -147,6 +169,7 @@ def to_sql( if offset_column: array_value, _ = array_value.promote_offsets() node = self.logical_plan(array_value.node) if enable_cache else array_value.node + node = self._substitute_large_local_sources(node) compiled = compile.compile_sql(compile.CompileRequest(node, sort_rows=ordered)) return compiled.sql @@ -378,6 +401,7 @@ def _cache_with_cluster_cols( ): """Executes the query and uses the resulting table to rewrite future executions.""" plan = self.logical_plan(array_value.node) + plan = self._substitute_large_local_sources(plan) compiled = compile.compile_sql( compile.CompileRequest( plan, sort_rows=False, materialize_all_order_keys=True @@ -398,7 +422,7 @@ def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): w_offsets, offset_column = array_value.promote_offsets() compiled = compile.compile_sql( compile.CompileRequest( - self.logical_plan(w_offsets.node), + self.logical_plan(self._substitute_large_local_sources(w_offsets.node)), sort_rows=False, ) ) @@ -509,6 +533,48 @@ def _validate_result_schema( f"This error should only occur while testing. Ibis schema: {ibis_schema} does not match actual schema: {actual_schema}" ) + def _substitute_large_local_sources(self, original_root: nodes.BigFrameNode): + """ + Replace large local sources with the uploaded version of those datasources. + """ + # Step 1: Upload all previously un-uploaded data + for leaf in original_root.unique_nodes(): + if isinstance(leaf, nodes.ReadLocalNode): + if leaf.local_data_source.metadata.total_bytes > 5000: + self._upload_local_data(leaf.local_data_source) + + # Step 2: Replace local scans with remote scans + def map_local_scans(node: nodes.BigFrameNode): + if not isinstance(node, nodes.ReadLocalNode): + return node + if node.local_data_source not in self.cache._uploaded_local_data: + return node + bq_source, source_mapping = self.cache._uploaded_local_data[ + node.local_data_source + ] + scan_list = node.scan_list.remap_source_ids(source_mapping) + if node.offsets_col is not None: + scan_list = scan_list.append( + bq_source.table.physical_schema[-1].name, + bigframes.dtypes.INT_DTYPE, + node.offsets_col, + ) + return nodes.ReadTableNode(bq_source, scan_list, node.session) + + return original_root.bottom_up(map_local_scans) + + def _upload_local_data(self, local_table: local_data.ManagedArrowTable): + if local_table in self.cache._uploaded_local_data: + return + # Lock prevents concurrent repeated work, but slows things down. + # Might be better as a queue and a worker thread + with self._upload_lock: + if local_table not in self.cache._uploaded_local_data: + uploaded = self.loader.load_data( + local_table, bigframes.core.guid.generate_guid() + ) + self.cache.cache_remote_replacement(local_table, uploaded) + def _execute_plan( self, plan: nodes.BigFrameNode, @@ -539,6 +605,8 @@ def _execute_plan( # Use explicit destination to avoid 10GB limit of temporary table if destination_table is not None: job_config.destination = destination_table + + plan = self._substitute_large_local_sources(plan) compiled = compile.compile_sql( compile.CompileRequest(plan, sort_rows=ordered, peek_count=peek) ) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index b630dedb7b..fccc4b35de 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -44,7 +44,7 @@ import pandas import pyarrow as pa -from bigframes.core import guid, local_data, utils +from bigframes.core import guid, identifiers, local_data, nodes, ordering, utils import bigframes.core as core import bigframes.core.blocks as blocks import bigframes.core.schema as schemata @@ -184,36 +184,59 @@ def read_pandas( [*idx_cols, *val_cols], axis="columns" ) managed_data = local_data.ManagedArrowTable.from_pandas(prepared_df) + block = blocks.Block( + self.read_managed_data(managed_data, method=method, api_name=api_name), + index_columns=idx_cols, + column_labels=pandas_dataframe.columns, + index_labels=pandas_dataframe.index.names, + ) + return dataframe.DataFrame(block) + def read_managed_data( + self, + data: local_data.ManagedArrowTable, + method: Literal["load", "stream", "write"], + api_name: str, + ) -> core.ArrayValue: + offsets_col = guid.generate_guid("upload_offsets_") if method == "load": - array_value = self.load_data(managed_data, api_name=api_name) + gbq_source = self.load_data( + data, offsets_col=offsets_col, api_name=api_name + ) elif method == "stream": - array_value = self.stream_data(managed_data) + gbq_source = self.stream_data(data, offsets_col=offsets_col) elif method == "write": - array_value = self.write_data(managed_data) + gbq_source = self.write_data(data, offsets_col=offsets_col) else: raise ValueError(f"Unsupported read method {method}") - block = blocks.Block( - array_value, - index_columns=idx_cols, - column_labels=pandas_dataframe.columns, - index_labels=pandas_dataframe.index.names, + return core.ArrayValue.from_bq_data_source( + source=gbq_source, + scan_list=nodes.ScanList( + tuple( + nodes.ScanItem( + identifiers.ColumnId(item.column), item.dtype, item.column + ) + for item in data.schema.items + ) + ), + session=self._session, ) - return dataframe.DataFrame(block) def load_data( - self, data: local_data.ManagedArrowTable, api_name: Optional[str] = None - ) -> core.ArrayValue: + self, + data: local_data.ManagedArrowTable, + offsets_col: str, + api_name: Optional[str] = None, + ) -> nodes.BigqueryDataSource: """Load managed data into bigquery""" - ordering_col = guid.generate_guid("load_offsets_") # JSON support incomplete for item in data.schema.items: _validate_dtype_can_load(item.column, item.dtype) schema_w_offsets = data.schema.append( - schemata.SchemaItem(ordering_col, bigframes.dtypes.INT_DTYPE) + schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE) ) bq_schema = schema_w_offsets.to_bigquery(_LOAD_JOB_TYPE_OVERRIDES) @@ -231,13 +254,13 @@ def load_data( job_config.labels = {"bigframes-api": api_name} load_table_destination = self._storage_manager.create_temp_table( - bq_schema, [ordering_col] + bq_schema, [offsets_col] ) buffer = io.BytesIO() data.to_parquet( buffer, - offsets_col=ordering_col, + offsets_col=offsets_col, geo_format="wkt", duration_type="duration", json_type="string", @@ -249,23 +272,24 @@ def load_data( self._start_generic_job(load_job) # must get table metadata after load job for accurate metadata destination_table = self._bqclient.get_table(load_table_destination) - return core.ArrayValue.from_table( - table=destination_table, - schema=schema_w_offsets, - session=self._session, - offsets_col=ordering_col, - n_rows=data.data.num_rows, - ).drop_columns([ordering_col]) + return nodes.BigqueryDataSource( + nodes.GbqTable.from_table(destination_table), + ordering=ordering.TotalOrdering.from_offset_col(offsets_col), + n_rows=destination_table.num_rows, + ) - def stream_data(self, data: local_data.ManagedArrowTable) -> core.ArrayValue: + def stream_data( + self, + data: local_data.ManagedArrowTable, + offsets_col: str, + ) -> nodes.BigqueryDataSource: """Load managed data into bigquery""" - ordering_col = guid.generate_guid("stream_offsets_") schema_w_offsets = data.schema.append( - schemata.SchemaItem(ordering_col, bigframes.dtypes.INT_DTYPE) + schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE) ) bq_schema = schema_w_offsets.to_bigquery(_STREAM_JOB_TYPE_OVERRIDES) load_table_destination = self._storage_manager.create_temp_table( - bq_schema, [ordering_col] + bq_schema, [offsets_col] ) rows = data.itertuples( @@ -284,24 +308,23 @@ def stream_data(self, data: local_data.ManagedArrowTable) -> core.ArrayValue: f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}" ) destination_table = self._bqclient.get_table(load_table_destination) - return core.ArrayValue.from_table( - table=destination_table, - schema=schema_w_offsets, - session=self._session, - offsets_col=ordering_col, - n_rows=data.data.num_rows, - ).drop_columns([ordering_col]) + return nodes.BigqueryDataSource( + nodes.GbqTable.from_table(destination_table), + ordering=ordering.TotalOrdering.from_offset_col(offsets_col), + n_rows=destination_table.num_rows, + ) - def write_data(self, data: local_data.ManagedArrowTable) -> core.ArrayValue: + def write_data( + self, + data: local_data.ManagedArrowTable, + offsets_col: str, + ) -> nodes.BigqueryDataSource: """Load managed data into bigquery""" - ordering_col = guid.generate_guid("stream_offsets_") schema_w_offsets = data.schema.append( - schemata.SchemaItem(ordering_col, bigframes.dtypes.INT_DTYPE) + schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE) ) bq_schema = schema_w_offsets.to_bigquery(_STREAM_JOB_TYPE_OVERRIDES) - bq_table_ref = self._storage_manager.create_temp_table( - bq_schema, [ordering_col] - ) + bq_table_ref = self._storage_manager.create_temp_table(bq_schema, [offsets_col]) requested_stream = bq_storage_types.stream.WriteStream() requested_stream.type_ = bq_storage_types.stream.WriteStream.Type.COMMITTED # type: ignore @@ -313,7 +336,7 @@ def write_data(self, data: local_data.ManagedArrowTable) -> core.ArrayValue: def request_gen() -> Generator[bq_storage_types.AppendRowsRequest, None, None]: schema, batches = data.to_arrow( - offsets_col=ordering_col, duration_type="int" + offsets_col=offsets_col, duration_type="int" ) offset = 0 for batch in batches: @@ -339,13 +362,11 @@ def request_gen() -> Generator[bq_storage_types.AppendRowsRequest, None, None]: assert response.row_count == data.data.num_rows destination_table = self._bqclient.get_table(bq_table_ref) - return core.ArrayValue.from_table( - table=destination_table, - schema=schema_w_offsets, - session=self._session, - offsets_col=ordering_col, - n_rows=data.data.num_rows, - ).drop_columns([ordering_col]) + return nodes.BigqueryDataSource( + nodes.GbqTable.from_table(destination_table), + ordering=ordering.TotalOrdering.from_offset_col(offsets_col), + n_rows=destination_table.num_rows, + ) def _start_generic_job(self, job: formatting_helpers.GenericJob): if bigframes.options.display.progress_bar is not None: diff --git a/tests/system/small/test_large_local_data.py b/tests/system/small/test_large_local_data.py new file mode 100644 index 0000000000..91794df8b4 --- /dev/null +++ b/tests/system/small/test_large_local_data.py @@ -0,0 +1,50 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import pandas as pd + +import bigframes +from tests.system.utils import assert_pandas_df_equal + +large_dataframe = pd.DataFrame(np.random.rand(10000, 10), dtype="Float64") +large_dataframe.index = large_dataframe.index.astype("Int64") + + +def test_read_pandas_defer_noop(session: bigframes.Session): + bf_df = session.read_pandas(large_dataframe, write_engine="_deferred") + + assert_pandas_df_equal(large_dataframe, bf_df.to_pandas()) + + +def test_read_pandas_defer_cumsum(session: bigframes.Session): + bf_df = session.read_pandas(large_dataframe, write_engine="_deferred") + bf_df = bf_df.cumsum() + + assert_pandas_df_equal(large_dataframe.cumsum(), bf_df.to_pandas()) + + +def test_read_pandas_defer_cache_cumsum_cumsum(session: bigframes.Session): + bf_df = session.read_pandas(large_dataframe, write_engine="_deferred") + bf_df = bf_df.cumsum().cache().cumsum() + + assert_pandas_df_equal(large_dataframe.cumsum().cumsum(), bf_df.to_pandas()) + + +def test_read_pandas_defer_peek(session: bigframes.Session): + bf_df = session.read_pandas(large_dataframe, write_engine="_deferred") + bf_result = bf_df.peek(15) + + assert len(bf_result) == 15 + assert_pandas_df_equal(large_dataframe.loc[bf_df.index], bf_result) diff --git a/third_party/bigframes_vendored/constants.py b/third_party/bigframes_vendored/constants.py index af87694cd5..6d55817a27 100644 --- a/third_party/bigframes_vendored/constants.py +++ b/third_party/bigframes_vendored/constants.py @@ -52,5 +52,6 @@ "bigquery_load", "bigquery_streaming", "bigquery_write", + "_deferred", ] VALID_WRITE_ENGINES = typing.get_args(WriteEngineType) From fc55710caba44c57abcc07d12460f6924a88ffa4 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 12 May 2025 21:17:30 +0000 Subject: [PATCH 2/7] fix test issues --- bigframes/session/loader.py | 6 +++--- tests/system/small/test_large_local_data.py | 5 +++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index fccc4b35de..5f17b67a93 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -275,7 +275,7 @@ def load_data( return nodes.BigqueryDataSource( nodes.GbqTable.from_table(destination_table), ordering=ordering.TotalOrdering.from_offset_col(offsets_col), - n_rows=destination_table.num_rows, + n_rows=data.metadata.row_count, ) def stream_data( @@ -311,7 +311,7 @@ def stream_data( return nodes.BigqueryDataSource( nodes.GbqTable.from_table(destination_table), ordering=ordering.TotalOrdering.from_offset_col(offsets_col), - n_rows=destination_table.num_rows, + n_rows=data.metadata.row_count, ) def write_data( @@ -365,7 +365,7 @@ def request_gen() -> Generator[bq_storage_types.AppendRowsRequest, None, None]: return nodes.BigqueryDataSource( nodes.GbqTable.from_table(destination_table), ordering=ordering.TotalOrdering.from_offset_col(offsets_col), - n_rows=destination_table.num_rows, + n_rows=data.metadata.row_count, ) def _start_generic_job(self, job: formatting_helpers.GenericJob): diff --git a/tests/system/small/test_large_local_data.py b/tests/system/small/test_large_local_data.py index 91794df8b4..9cdd646cb5 100644 --- a/tests/system/small/test_large_local_data.py +++ b/tests/system/small/test_large_local_data.py @@ -14,6 +14,7 @@ import numpy as np import pandas as pd +import pytest import bigframes from tests.system.utils import assert_pandas_df_equal @@ -23,12 +24,14 @@ def test_read_pandas_defer_noop(session: bigframes.Session): + pytest.importorskip("pandas", minversion="2.0.0") bf_df = session.read_pandas(large_dataframe, write_engine="_deferred") assert_pandas_df_equal(large_dataframe, bf_df.to_pandas()) def test_read_pandas_defer_cumsum(session: bigframes.Session): + pytest.importorskip("pandas", minversion="2.0.0") bf_df = session.read_pandas(large_dataframe, write_engine="_deferred") bf_df = bf_df.cumsum() @@ -36,6 +39,7 @@ def test_read_pandas_defer_cumsum(session: bigframes.Session): def test_read_pandas_defer_cache_cumsum_cumsum(session: bigframes.Session): + pytest.importorskip("pandas", minversion="2.0.0") bf_df = session.read_pandas(large_dataframe, write_engine="_deferred") bf_df = bf_df.cumsum().cache().cumsum() @@ -43,6 +47,7 @@ def test_read_pandas_defer_cache_cumsum_cumsum(session: bigframes.Session): def test_read_pandas_defer_peek(session: bigframes.Session): + pytest.importorskip("pandas", minversion="2.0.0") bf_df = session.read_pandas(large_dataframe, write_engine="_deferred") bf_result = bf_df.peek(15) From 2a3d24e18d8ae86a19b133a28f0e51aca1ee05fe Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 14 May 2025 00:33:34 +0000 Subject: [PATCH 3/7] fix peek test --- tests/system/small/test_large_local_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/small/test_large_local_data.py b/tests/system/small/test_large_local_data.py index 9cdd646cb5..eddec37132 100644 --- a/tests/system/small/test_large_local_data.py +++ b/tests/system/small/test_large_local_data.py @@ -52,4 +52,4 @@ def test_read_pandas_defer_peek(session: bigframes.Session): bf_result = bf_df.peek(15) assert len(bf_result) == 15 - assert_pandas_df_equal(large_dataframe.loc[bf_df.index], bf_result) + assert_pandas_df_equal(large_dataframe.loc[bf_result.index], bf_result) From 4b5ec6ab28a1fca5e06e253904effbfc78aa61e3 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 16 May 2025 18:00:04 +0000 Subject: [PATCH 4/7] cleanup and comments --- bigframes/constants.py | 4 ++++ bigframes/session/__init__.py | 7 ++++--- bigframes/session/bq_caching_executor.py | 5 +++++ 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/bigframes/constants.py b/bigframes/constants.py index 89f27afd78..b6e0b8b221 100644 --- a/bigframes/constants.py +++ b/bigframes/constants.py @@ -128,4 +128,8 @@ # BigQuery default is 10000, leave 100 for overhead MAX_COLUMNS = 9900 +# BigQuery has 1 MB query size limit. Don't want to take up more than a few % of that inlining a table. +# Also must assume that text encoding as literals is much less efficient than in-memory representation. +MAX_INLINE_BYTES = 5000 + SUGGEST_PEEK_PREVIEW = "Use .peek(n) to preview n arbitrary rows." diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index d5697438a1..9030bb4ff0 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -60,6 +60,7 @@ from bigframes import version import bigframes._config.bigquery_options as bigquery_options import bigframes.clients +import bigframes.constants from bigframes.core import blocks, log_adapter import bigframes.core.pyformat @@ -939,15 +940,15 @@ def _read_pandas( if write_engine == "default": write_engine = ( "bigquery_load" - if mem_usage > MAX_INLINE_DF_BYTES + if mem_usage > bigframes.constants.MAX_INLINE_BYTES else "bigquery_inline" ) if write_engine == "bigquery_inline": - if mem_usage > MAX_INLINE_DF_BYTES: + if mem_usage > bigframes.constants.MAX_INLINE_BYTES: raise ValueError( f"DataFrame size ({mem_usage} bytes) exceeds the maximum allowed " - f"for inline data ({MAX_INLINE_DF_BYTES} bytes)." + f"for inline data ({bigframes.constants.MAX_INLINE_BYTES} bytes)." ) return self._read_pandas_inline(pandas_dataframe) elif write_engine == "bigquery_load": diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 3b237e6459..41749a69bc 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -116,6 +116,8 @@ def cache_remote_replacement( local_data: local_data.ManagedArrowTable, bq_data: nodes.BigqueryDataSource, ): + # bq table has one extra column for offsets, those are implicit for local data + assert len(local_data.schema.items) + 1 == len(bq_data.table.physical_schema) mapping = { local_data.schema.items[i].column: bq_data.table.physical_schema[i].name for i in range(len(local_data.schema)) @@ -546,7 +548,10 @@ def map_local_scans(node: nodes.BigFrameNode): node.local_data_source ] scan_list = node.scan_list.remap_source_ids(source_mapping) + # offsets_col isn't part of ReadTableNode, so emulate by adding to end of scan_list if node.offsets_col is not None: + # Offsets are always implicitly the final column of uploaded data + # See: Loader.load_data scan_list = scan_list.append( bq_source.table.physical_schema[-1].name, bigframes.dtypes.INT_DTYPE, From 87b666e57ba761afc92c87a28134b9b3fd75883c Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 19 May 2025 22:43:09 +0000 Subject: [PATCH 5/7] fix test patch const --- tests/unit/session/test_session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 91b6679702..dc8ee2c0d9 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -484,7 +484,7 @@ def today(cls): mocks.create_bigquery_session() -@mock.patch("bigframes.session.MAX_INLINE_DF_BYTES", 1) +@mock.patch("bigframes.constants.MAX_INLINE_BYTES", 1) def test_read_pandas_inline_exceeds_limit_raises_error(): session = mocks.create_bigquery_session() pd_df = pd.DataFrame([[1, 2, 3], [4, 5, 6]]) From 00a2cc1bcd4adb3d9b5f2965795860a1e5282c47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Tue, 20 May 2025 12:53:26 -0500 Subject: [PATCH 6/7] Apply suggestions from code review --- bigframes/session/bq_caching_executor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index b012b47e1d..a01010e0f5 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -28,6 +28,7 @@ import google.cloud.bigquery.table as bq_table import google.cloud.bigquery_storage_v1 +import bigframes.constants import bigframes.core from bigframes.core import compile, local_data, rewrite import bigframes.core.compile.sqlglot.sqlglot_ir as sqlglot_ir @@ -565,7 +566,7 @@ def _substitute_large_local_sources(self, original_root: nodes.BigFrameNode): # Step 1: Upload all previously un-uploaded data for leaf in original_root.unique_nodes(): if isinstance(leaf, nodes.ReadLocalNode): - if leaf.local_data_source.metadata.total_bytes > 5000: + if leaf.local_data_source.metadata.total_bytes > bigframes.constants.MAX_INLINE_BYTES: self._upload_local_data(leaf.local_data_source) # Step 2: Replace local scans with remote scans From 286641093e27f6e2ecdd987a7b8877cea2f66c45 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 20 May 2025 17:56:10 +0000 Subject: [PATCH 7/7] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- bigframes/session/bq_caching_executor.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index a01010e0f5..33d3314a1e 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -566,7 +566,10 @@ def _substitute_large_local_sources(self, original_root: nodes.BigFrameNode): # Step 1: Upload all previously un-uploaded data for leaf in original_root.unique_nodes(): if isinstance(leaf, nodes.ReadLocalNode): - if leaf.local_data_source.metadata.total_bytes > bigframes.constants.MAX_INLINE_BYTES: + if ( + leaf.local_data_source.metadata.total_bytes + > bigframes.constants.MAX_INLINE_BYTES + ): self._upload_local_data(leaf.local_data_source) # Step 2: Replace local scans with remote scans