From 4d3200c87ac3d2c9e3c2133d4a385d33a5e53e09 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Mon, 22 Apr 2024 11:14:56 +0000 Subject: [PATCH 01/29] feat: Support `axis=1` in `df.apply` for scalar outputs --- bigframes/core/blocks.py | 50 ++++++ bigframes/dataframe.py | 16 +- bigframes/functions/remote_function.py | 144 +++++++++++++----- bigframes/pandas/__init__.py | 2 +- bigframes/session/__init__.py | 2 +- tests/system/small/test_remote_function.py | 17 +++ .../bigframes_vendored/pandas/core/frame.py | 2 +- user_data.csv | 3 + 8 files changed, 192 insertions(+), 44 deletions(-) create mode 100644 user_data.csv diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 5b411e5416..461a45938f 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -26,6 +26,7 @@ import itertools import os import random +import textwrap import typing from typing import Iterable, List, Literal, Mapping, Optional, Sequence, Tuple import warnings @@ -1967,6 +1968,55 @@ def _is_monotonic( self._stats_cache[column_name].update({op_name: result}) return result + def _get_rows_as_json_values(self) -> Block: + compiled = self.expr._compile_unordered() + sql = compiled.to_sql() + column_names_csv = ", ".join([f'"{col}"' for col in compiled.column_ids]) + column_types_csv = ", ".join( + [f'"{self.expr.get_column_type(col)}"' for col in compiled.column_ids] + ) + column_references_csv = ", ".join( + [f"CAST({col} AS STRING)" for col in compiled.column_ids] + ) + row_json_column_name = "row_json" + select_columns = list(self.index_columns) + [row_json_column_name] + select_columns_csv = ", ".join(select_columns) + json_sql = f"""\ +With T0 AS ( +{textwrap.indent(sql, " ")} +), +T1 AS ( + SELECT *, + JSON_OBJECT( + "names", [{column_names_csv}], + "types", [{column_types_csv}], + "values", [{column_references_csv}] + ) AS {row_json_column_name} FROM T0 +) +SELECT {select_columns_csv} FROM T1 +""" + ibis_table = self.session.ibis_client.sql(json_sql) + order_for_ibis_table = ordering.ExpressionOrdering( + ordering_value_columns=tuple( + [ordering.ascending_over(column_id) for column_id in self.index_columns] + ), + total_ordering_columns=frozenset(self.index_columns), + ) + expr = core.ArrayValue.from_ibis( + self.session, + ibis_table, + [ibis_table[col] for col in select_columns], + hidden_ordering_columns=[], + ordering=order_for_ibis_table, + ) + block = Block( + expr, + index_columns=self.index_columns, + column_labels=[row_json_column_name], + index_labels=self._index_labels, + ) + return block + class BlockIndexProperties: """Accessor for the index-related block properties.""" diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 2deef95277..c700a5c3d3 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3268,7 +3268,21 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame: ops.RemoteFunctionOp(func=func, apply_on_null=(na_action is None)) ) - def apply(self, func, *, args: typing.Tuple = (), **kwargs): + def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): + if utils.get_axis_number(axis) == 1: + if not hasattr(func, "bigframes_remote_function"): + raise ValueError("For axis=1 a remote function must be used.") + block = self._get_block() + rows_as_json_series = bigframes.series.Series( + block._get_rows_as_json_values()._force_reproject() + ) + result_series = rows_as_json_series._apply_unary_op( + ops.RemoteFunctionOp(func=func, apply_on_null=True) + ) + result_series.name = None + return result_series + + # Per-column apply results = {name: func(col, *args, **kwargs) for name, col in self.items()} if all( [ diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 178c911591..29f0fbdf5e 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -24,9 +24,12 @@ import sys import tempfile import textwrap -from typing import List, NamedTuple, Optional, Sequence, TYPE_CHECKING + +# TODO(shobs): import typing module and use its classes through namespapace.* +from typing import List, Literal, NamedTuple, Optional, Sequence, TYPE_CHECKING, Union import ibis +import pandas import requests if TYPE_CHECKING: @@ -245,7 +248,7 @@ def generate_udf_code(self, def_, dir): return udf_code_file_name, udf_bytecode_file_name - def generate_cloud_function_main_code(self, def_, dir): + def generate_cloud_function_main_code(self, def_, dir, is_row_processor=False): """Get main.py code for the cloud function for the given user defined function.""" # Pickle the udf with all its dependencies @@ -268,38 +271,78 @@ def generate_cloud_function_main_code(self, def_, dir): # ... # } # https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#input_format - code_template = textwrap.dedent( - """\ - import cloudpickle - import functions_framework - from flask import jsonify - import json - - # original udf code is in {udf_code_file} - # serialized udf code is in {udf_bytecode_file} - with open("{udf_bytecode_file}", "rb") as f: - udf = cloudpickle.load(f) - - def {handler_func_name}(request): - try: - request_json = request.get_json(silent=True) - calls = request_json["calls"] - replies = [] - for call in calls: - reply = udf(*call) - replies.append(reply) - return_json = json.dumps({{"replies" : replies}}) - return return_json - except Exception as e: - return jsonify( {{ "errorMessage": str(e) }} ), 400 - """ - ) - - code = code_template.format( - udf_code_file=udf_code_file, - udf_bytecode_file=udf_bytecode_file, - handler_func_name=handler_func_name, - ) + code = """\ +import cloudpickle +import functions_framework +from flask import jsonify +import json +""" + if is_row_processor: + code += """\ +import pandas as pd + +def get_pd_series(row): + row_json = json.loads(row) + col_names = row_json["names"] + col_types = row_json["types"] + col_values = row_json["values"] + + value_converters = { + "boolean": lambda val: val == "true", + "Int64": int, + "Float64": float, + "string": str, + } + + def convert_value(value, value_type): + if value is None: + return None + value_converter = value_converters.get(value_type) + if value_converter is None: + raise ValueError(f"Don't know how to handle type '{value_type}'") + return value_converter(value) + + row_values = [pd.Series([convert_value(a, col_types[i])], dtype=col_types[i])[0] for i, a in enumerate(col_values)] + row_series = pd.Series(row_values, index=col_names) + return row_series +""" + code += f"""\ + +# original udf code is in {udf_code_file} +# serialized udf code is in {udf_bytecode_file} +with open("{udf_bytecode_file}", "rb") as f: + udf = cloudpickle.load(f) + +def {handler_func_name}(request): + try: + request_json = request.get_json(silent=True) + calls = request_json["calls"] + replies = [] + for call in calls: +""" + if is_row_processor: + code += """\ + reply = udf(get_pd_series(call[0])) + if pd.isna(reply): + # Pandas N/A values are not json serializable, so use a python + # equivalent instead + reply = None + elif hasattr(reply, "item"): + # Numpy types are not json serializable, so use its Python + # value instead + reply = reply.item() +""" + else: + code += """\ + reply = udf(*call) +""" + code += """\ + replies.append(reply) + return_json = json.dumps({"replies" : replies}) + return return_json + except Exception as e: + return jsonify( { "errorMessage": str(e) } ), 400 +""" main_py = os.path.join(dir, "main.py") with open(main_py, "w") as f: @@ -308,11 +351,17 @@ def {handler_func_name}(request): return handler_func_name - def generate_cloud_function_code(self, def_, dir, package_requirements=None): + def generate_cloud_function_code( + self, def_, dir, package_requirements=None, is_row_processor=False + ): """Generate the cloud function code for a given user defined function.""" # requirements.txt requirements = ["cloudpickle >= 2.1.0"] + if is_row_processor: + # bigframes remote function will send an entire row of data as json, + # which would be converted to a pandas series and processed + requirements.append(f"pandas=={pandas.__version__}") if package_requirements: requirements.extend(package_requirements) requirements = sorted(requirements) @@ -321,16 +370,20 @@ def generate_cloud_function_code(self, def_, dir, package_requirements=None): f.write("\n".join(requirements)) # main.py - entry_point = self.generate_cloud_function_main_code(def_, dir) + entry_point = self.generate_cloud_function_main_code( + def_, dir, is_row_processor + ) return entry_point - def create_cloud_function(self, def_, cf_name, package_requirements=None): + def create_cloud_function( + self, def_, cf_name, package_requirements=None, is_row_processor=False + ): """Create a cloud function from the given user defined function.""" # Build and deploy folder structure containing cloud function with tempfile.TemporaryDirectory() as dir: entry_point = self.generate_cloud_function_code( - def_, dir, package_requirements + def_, dir, package_requirements, is_row_processor ) archive_path = shutil.make_archive(dir, "zip", dir) @@ -438,6 +491,7 @@ def provision_bq_remote_function( reuse, name, package_requirements, + is_row_processor, ): """Provision a BigQuery remote function.""" # If reuse of any existing function with the same name (indicated by the @@ -459,7 +513,7 @@ def provision_bq_remote_function( # Create the cloud function if it does not exist if not cf_endpoint: cf_endpoint = self.create_cloud_function( - def_, cloud_function_name, package_requirements + def_, cloud_function_name, package_requirements, is_row_processor ) else: logger.info(f"Cloud function {cloud_function_name} already exists.") @@ -590,7 +644,7 @@ def get_routine_reference( # which has moved as @js to the ibis package # https://github.com/ibis-project/ibis/blob/master/ibis/backends/bigquery/udf/__init__.py def remote_function( - input_types: Sequence[type], + input_types: Union[Literal["row"], type, Sequence[type]], output_type: type, session: Optional[Session] = None, bigquery_client: Optional[bigquery.Client] = None, @@ -724,6 +778,15 @@ def remote_function( For more details see https://cloud.google.com/functions/docs/securing/cmek#before_you_begin. """ + + is_row_processor = False + if input_types == "row": + input_types = [str] + is_row_processor = True + elif isinstance(input_types, type): + input_types = [input_types] + + # Some defaults may be used from the session if not provided otherwise import bigframes.pandas as bpd session = session or bpd.get_global_session() @@ -846,6 +909,7 @@ def wrapper(f): reuse, name, packages, + is_row_processor, ) # TODO: Move ibis logic to compiler step diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 91c3eb603b..706db70d61 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -633,7 +633,7 @@ def read_parquet( def remote_function( - input_types: List[type], + input_types: Union[Literal["row"], type, Sequence[type]], output_type: type, dataset: Optional[str] = None, bigquery_connection: Optional[str] = None, diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index b6d56006be..1fdf2ed570 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1531,7 +1531,7 @@ def _ibis_to_temp_table( def remote_function( self, - input_types: List[type], + input_types: Union[Literal["row"], type, Sequence[type]], output_type: type, dataset: Optional[str] = None, bigquery_connection: Optional[str] = None, diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index 106638cef3..9dc593c26f 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -685,3 +685,20 @@ def test_read_gbq_function_enforces_explicit_types(bigquery_client, dataset_id): rf.read_gbq_function( str(neither_type_specified.reference), bigquery_client=bigquery_client ) + + +@pytest.mark.flaky(retries=2, delay=120) +# https://github.com/googleapis/python-bigquery-dataframes/issues/592 +def test_df_apply_axis_1(session, scalars_dfs): + columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] + scalars_df, scalars_pandas_df = scalars_dfs + + def add_ints(row): + return row["int64_col"] + row["int64_too"] + + add_ints_remote = session.remote_function("row", int)(add_ints) + + bf_result = scalars_df[columns].apply(add_ints_remote, axis=1).to_pandas() + pd_result = scalars_pandas_df[columns].apply(add_ints, axis=1) + + pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 6707dc1403..7fe098fb72 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -4118,7 +4118,7 @@ def merge( """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def apply(self, func, *, args=(), **kwargs): + def apply(self, func, *, axis=0, args=(), **kwargs): """Apply a function along an axis of the DataFrame. Objects passed to the function are Series objects whose index is diff --git a/user_data.csv b/user_data.csv new file mode 100644 index 0000000000..5484e2d72a --- /dev/null +++ b/user_data.csv @@ -0,0 +1,3 @@ +sensitive_data,safe_data +sensitive_value_1,safe_value_1 +sensitive_value_2,safe_value_2 From 22b7f3209c50ab4d70b209499a481bdf90a0c34a Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Mon, 22 Apr 2024 23:23:58 +0000 Subject: [PATCH 02/29] avoid mixing other changes in the input_types param --- bigframes/functions/remote_function.py | 10 +++++----- bigframes/pandas/__init__.py | 2 +- bigframes/session/__init__.py | 8 +++++--- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 55a7f7ebc2..103da22d2f 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -667,7 +667,7 @@ def get_routine_reference( # which has moved as @js to the ibis package # https://github.com/ibis-project/ibis/blob/master/ibis/backends/bigquery/udf/__init__.py def remote_function( - input_types: Union[Literal["row"], type, Sequence[type]], + input_types: Union[Literal["row"], Sequence[type]], output_type: type, session: Optional[Session] = None, bigquery_client: Optional[bigquery.Client] = None, @@ -729,8 +729,10 @@ def remote_function( `$ gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:CONNECTION_SERVICE_ACCOUNT_ID" --role="roles/run.invoker"`. Args: - input_types list(type): - List of input data types in the user defined function. + input_types (list(type) or "row"): + For scalar user defined function it should be a list of input. + For row processing user defined function, literal "row" should + be specified. output_type type: Data type of the output in the user defined function. session (bigframes.Session, Optional): @@ -816,8 +818,6 @@ def remote_function( if input_types == "row": input_types = [str] is_row_processor = True - elif isinstance(input_types, type): - input_types = [input_types] # Some defaults may be used from the session if not provided otherwise import bigframes.pandas as bpd diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index f333c423f2..83c78ea513 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -633,7 +633,7 @@ def read_parquet( def remote_function( - input_types: Union[Literal["row"], type, Sequence[type]], + input_types: Union[Literal["row"], Sequence[type]], output_type: type, dataset: Optional[str] = None, bigquery_connection: Optional[str] = None, diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 3ce3d5778e..79fe417423 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1534,7 +1534,7 @@ def _ibis_to_temp_table( def remote_function( self, - input_types: Union[Literal["row"], type, Sequence[type]], + input_types: Union[Literal["row"], Sequence[type]], output_type: type, dataset: Optional[str] = None, bigquery_connection: Optional[str] = None, @@ -1587,8 +1587,10 @@ def remote_function( `$ gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:CONNECTION_SERVICE_ACCOUNT_ID" --role="roles/run.invoker"`. Args: - input_types (list(type)): - List of input data types in the user defined function. + input_types (list(type) or "row"): + For scalar user defined function it should be a list of input. + For row processing user defined function, literal "row" should + be specified. output_type (type): Data type of the output in the user defined function. dataset (str, Optional): From 50491707d10fddbbd895b367ad1c95edb18023ae Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 23 Apr 2024 00:16:09 +0000 Subject: [PATCH 03/29] use guid instead of hard coded column name --- bigframes/core/blocks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index cb2145b9d3..1357204410 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1971,7 +1971,7 @@ def _get_rows_as_json_values(self) -> Block: column_references_csv = ", ".join( [f"CAST({col} AS STRING)" for col in compiled.column_ids] ) - row_json_column_name = "row_json" + row_json_column_name = guid.generate_guid() select_columns = list(self.index_columns) + [row_json_column_name] select_columns_csv = ", ".join(select_columns) json_sql = f"""\ From 4f28f5681d9394df34ba82585ccaafb8fe8b2e8f Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 23 Apr 2024 08:28:19 +0000 Subject: [PATCH 04/29] check_exact=False to avoid failing system_prerelease --- tests/system/small/test_remote_function.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index 9dc593c26f..2632c5a5d3 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -701,4 +701,12 @@ def add_ints(row): bf_result = scalars_df[columns].apply(add_ints_remote, axis=1).to_pandas() pd_result = scalars_pandas_df[columns].apply(add_ints, axis=1) - pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + # bf_result.dtype is 'Int64' while pd_result.dtype is 'object', ignore this + # mismatch by using check_dtype=False. + # + # bf_result.to_numpy() produces an array of numpy.float64's, while + # pd_result.to_numpy() produces an array of ints, ignore this mismatch by + # using check_exact=False. + pd.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_exact=False + ) From 99e3e7be686dcce320513155bc12a9b94a3a47af Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 24 Apr 2024 18:02:11 +0000 Subject: [PATCH 05/29] handle index in remote function, add large system tests --- bigframes/core/blocks.py | 30 +++++---- bigframes/functions/remote_function.py | 24 ++++++- tests/system/large/test_remote_function.py | 74 ++++++++++++++++++++++ tests/system/small/test_remote_function.py | 34 ++++++++-- user_data.csv | 3 - 5 files changed, 144 insertions(+), 21 deletions(-) delete mode 100644 user_data.csv diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 646a8e8925..2338c06e75 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2013,17 +2013,22 @@ def _is_monotonic( return result def _get_rows_as_json_values(self) -> Block: - compiled = self.expr._compile_unordered() - sql = compiled.to_sql() - column_names_csv = ", ".join([f'"{col}"' for col in compiled.column_ids]) - column_types_csv = ", ".join( - [f'"{self.expr.get_column_type(col)}"' for col in compiled.column_ids] - ) + sql, index_column_ids, _ = self.to_sql_query(include_index=True) + + # all column names, types and values + all_column_names = index_column_ids + [col for col in self.column_labels] + column_names_csv = ", ".join([f'"{col}"' for col in all_column_names]) column_references_csv = ", ".join( - [f"CAST({col} AS STRING)" for col in compiled.column_ids] + [f"CAST({col} AS STRING)" for col in all_column_names] ) + all_column_types = list(self.index.dtypes) + list(self.dtypes) + column_types_csv = ", ".join([f'"{col}"' for col in all_column_types]) + + # index column names + index_column_names_csv = ", ".join([f'"{col}"' for col in index_column_ids]) + row_json_column_name = guid.generate_guid() - select_columns = list(self.index_columns) + [row_json_column_name] + select_columns = index_column_ids + [row_json_column_name] select_columns_csv = ", ".join(select_columns) json_sql = f"""\ With T0 AS ( @@ -2034,7 +2039,8 @@ def _get_rows_as_json_values(self) -> Block: JSON_OBJECT( "names", [{column_names_csv}], "types", [{column_types_csv}], - "values", [{column_references_csv}] + "values", [{column_references_csv}], + "index", [{index_column_names_csv}] ) AS {row_json_column_name} FROM T0 ) SELECT {select_columns_csv} FROM T1 @@ -2042,9 +2048,9 @@ def _get_rows_as_json_values(self) -> Block: ibis_table = self.session.ibis_client.sql(json_sql) order_for_ibis_table = ordering.ExpressionOrdering( ordering_value_columns=tuple( - [ordering.ascending_over(column_id) for column_id in self.index_columns] + [ordering.ascending_over(column_id) for column_id in index_column_ids] ), - total_ordering_columns=frozenset(self.index_columns), + total_ordering_columns=frozenset(index_column_ids), ) expr = core.ArrayValue.from_ibis( self.session, @@ -2055,7 +2061,7 @@ def _get_rows_as_json_values(self) -> Block: ) block = Block( expr, - index_columns=self.index_columns, + index_columns=index_column_ids, column_labels=[row_json_column_name], index_labels=self._index_labels, ) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 103da22d2f..97942e99b6 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -303,6 +303,7 @@ def get_pd_series(row): col_names = row_json["names"] col_types = row_json["types"] col_values = row_json["values"] + index_names = row_json["index"] value_converters = { "boolean": lambda val: val == "true", @@ -319,8 +320,27 @@ def convert_value(value, value_type): raise ValueError(f"Don't know how to handle type '{value_type}'") return value_converter(value) - row_values = [pd.Series([convert_value(a, col_types[i])], dtype=col_types[i])[0] for i, a in enumerate(col_values)] - row_series = pd.Series(row_values, index=col_names) + index_positions = [col_names.index(col) for col in index_names] + + index_values = [ + pd.Series([convert_value(col_values[i], col_types[i])], dtype=col_types[i])[0] + for i in index_positions + ] + + col_names = [ + col_names[i] + for i, a in enumerate(col_names) + if i not in index_positions + ] + + row_values = [ + pd.Series([convert_value(a, col_types[i])], dtype=col_types[i])[0] + for i, a in enumerate(col_values) + if i not in index_positions + ] + + row_index = index_values[0] if len(index_values) == 1 else tuple(index_values) + row_series = pd.Series(row_values, index=col_names, name=row_index) return row_series """ code += f"""\ diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index ec9acc292e..f16acbc4aa 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1336,3 +1336,77 @@ def square(x): cleanup_remote_function_assets( session.bqclient, session.cloudfunctionsclient, square_remote ) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1(session, scalars_dfs): + try: + columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] + scalars_df, scalars_pandas_df = scalars_dfs + + def serialize_row(row): + row_dict = { + "name": row.name, + "index": [idx for idx in row.index], + "values": [ + val.item() if hasattr(val, "item") else val for val in row.values + ], + } + + return str(row_dict) + + serialize_row_remote = session.remote_function("row", str)(serialize_row) + + bf_result = scalars_df[columns].apply(serialize_row_remote, axis=1).to_pandas() + pd_result = scalars_pandas_df[columns].apply(serialize_row, axis=1) + + # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, serialize_row_remote + ) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1_multiindex(session): + pd_df = pandas.DataFrame( + {"x": [1, 2, 3], "y": [1.5, 3.75, 5], "z": ["pq", "rs", "tu"]}, + index=pandas.MultiIndex.from_tuples([("a", 100), ("a", 200), ("b", 300)]), + ) + bf_df = session.read_pandas(pd_df) + + try: + + def serialize_row(row): + row_dict = { + "name": row.name, + "index": [idx for idx in row.index], + "values": [ + val.item() if hasattr(val, "item") else val for val in row.values + ], + } + + return str(row_dict) + + serialize_row_remote = session.remote_function("row", str)(serialize_row) + + bf_result = bf_df.apply(serialize_row_remote, axis=1).to_pandas() + pd_result = pd_df.apply(serialize_row, axis=1) + + # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + # + # bf_result.index[0].dtype is 'string[pyarrow]' while + # pd_result.index[0].dtype is 'object', ignore this mismatch by using + # check_index_type=False. + pandas.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, serialize_row_remote + ) diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index 2632c5a5d3..09960a0dd4 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -688,7 +688,6 @@ def test_read_gbq_function_enforces_explicit_types(bigquery_client, dataset_id): @pytest.mark.flaky(retries=2, delay=120) -# https://github.com/googleapis/python-bigquery-dataframes/issues/592 def test_df_apply_axis_1(session, scalars_dfs): columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] scalars_df, scalars_pandas_df = scalars_dfs @@ -704,9 +703,36 @@ def add_ints(row): # bf_result.dtype is 'Int64' while pd_result.dtype is 'object', ignore this # mismatch by using check_dtype=False. # - # bf_result.to_numpy() produces an array of numpy.float64's, while - # pd_result.to_numpy() produces an array of ints, ignore this mismatch by - # using check_exact=False. + # bf_result.to_numpy() produces an array of numpy.float64's + # (in system_prerelease tests), while pd_result.to_numpy() produces an + # array of ints, ignore this mismatch by using check_exact=False. pd.testing.assert_series_equal( pd_result, bf_result, check_dtype=False, check_exact=False ) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1_multiindex(session): + pd_df = pd.DataFrame( + {"x": [1, 2, 3], "y": [1.5, 3.75, 5], "z": ["pq", "rs", "tu"]}, + index=pd.MultiIndex.from_tuples([("a", 100), ("a", 200), ("b", 300)]), + ) + bf_df = session.read_pandas(pd_df) + + def add_numbers(row): + return row["x"] + row["y"] + + add_numbers_remote = session.remote_function("row", float)(add_numbers) + + bf_result = bf_df.apply(add_numbers_remote, axis=1).to_pandas() + pd_result = pd_df.apply(add_numbers, axis=1) + + # bf_result.dtype is 'Float64' while pd_result.dtype is 'float64', ignore this + # mismatch by using check_dtype=False. + # + # bf_result.index[0].dtype is 'string[pyarrow]' while + # pd_result.index[0].dtype is 'object', ignore this mismatch by using + # check_index_type=False. + pd.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) diff --git a/user_data.csv b/user_data.csv deleted file mode 100644 index 5484e2d72a..0000000000 --- a/user_data.csv +++ /dev/null @@ -1,3 +0,0 @@ -sensitive_data,safe_data -sensitive_value_1,safe_value_1 -sensitive_value_2,safe_value_2 From 7153db8e843c61a6ecbca0e30bd36cd5eb9f8132 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Thu, 25 Apr 2024 02:25:53 +0000 Subject: [PATCH 06/29] make the test case more robust --- tests/system/large/test_remote_function.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index f16acbc4aa..6d1da9857d 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1345,7 +1345,8 @@ def test_df_apply_axis_1(session, scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs def serialize_row(row): - row_dict = { + + custom = { "name": row.name, "index": [idx for idx in row.index], "values": [ @@ -1353,7 +1354,16 @@ def serialize_row(row): ], } - return str(row_dict) + return str( + { + "default": row.to_json(), + "split": row.to_json(orient="split"), + "records": row.to_json(orient="records"), + "index": row.to_json(orient="index"), + "table": row.to_json(orient="table"), + "custom": custom, + } + ) serialize_row_remote = session.remote_function("row", str)(serialize_row) From 5fb814894995e05a55e95a2d2ff94c5c694c7efd Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Mon, 29 Apr 2024 17:23:34 +0000 Subject: [PATCH 07/29] handle non-string column names, add unsupported dtype tests --- bigframes/core/blocks.py | 20 ++-- bigframes/functions/remote_function.py | 10 +- tests/system/large/test_remote_function.py | 105 +++++++++++++++++++-- 3 files changed, 119 insertions(+), 16 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index ab5af77a36..40bf2af810 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2056,18 +2056,26 @@ def _is_monotonic( def _get_rows_as_json_values(self) -> Block: sql, index_column_ids, _ = self.to_sql_query(include_index=True) - # all column names, types and values + # all column names all_column_names = index_column_ids + [col for col in self.column_labels] - column_names_csv = ", ".join([f'"{col}"' for col in all_column_names]) + column_names_csv = ", ".join([repr(repr(col)) for col in all_column_names]) + + # index column names + index_column_names_csv = ", ".join( + [repr(repr(col)) for col in index_column_ids] + ) + + # column names produced by the block sql, which may be different from + # the column labels due to internal normalization + sql_output_column_names = self.session.ibis_client.sql(sql).columns column_references_csv = ", ".join( - [f"CAST({col} AS STRING)" for col in all_column_names] + [f"CAST(`{col}` AS STRING)" for col in sql_output_column_names] ) + + # all column types all_column_types = list(self.index.dtypes) + list(self.dtypes) column_types_csv = ", ".join([f'"{col}"' for col in all_column_types]) - # index column names - index_column_names_csv = ", ".join([f'"{col}"' for col in index_column_ids]) - row_json_column_name = guid.generate_guid() select_columns = index_column_ids + [row_json_column_name] select_columns_csv = ", ".join(select_columns) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 8411684cb3..e0e87f177a 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -51,7 +51,6 @@ from bigframes import clients import bigframes.constants as constants import bigframes.dtypes -import bigframes.pandas logger = logging.getLogger(__name__) @@ -306,6 +305,11 @@ def get_pd_series(row): col_values = row_json["values"] index_names = row_json["index"] + # index and column names are not necessarily strings + # they are serialized as repr(repr(name)) at source + col_names = [eval(name) for name in col_names] + index_names = [eval(name) for name in index_names] + value_converters = { "boolean": lambda val: val == "true", "Int64": int, @@ -314,11 +318,11 @@ def get_pd_series(row): } def convert_value(value, value_type): - if value is None: - return None value_converter = value_converters.get(value_type) if value_converter is None: raise ValueError(f"Don't know how to handle type '{value_type}'") + if value is None: + return None return value_converter(value) index_positions = [col_names.index(col) for col in index_names] diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 80e17ce465..23063f001d 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1389,12 +1389,11 @@ def square(x): @pytest.mark.flaky(retries=2, delay=120) def test_df_apply_axis_1(session, scalars_dfs): + columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] + scalars_df, scalars_pandas_df = scalars_dfs try: - columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] - scalars_df, scalars_pandas_df = scalars_dfs def serialize_row(row): - custom = { "name": row.name, "index": [idx for idx in row.index], @@ -1414,7 +1413,9 @@ def serialize_row(row): } ) - serialize_row_remote = session.remote_function("row", str)(serialize_row) + serialize_row_remote = session.remote_function("row", str, reuse=False)( + serialize_row + ) bf_result = scalars_df[columns].apply(serialize_row_remote, axis=1).to_pandas() pd_result = scalars_pandas_df[columns].apply(serialize_row, axis=1) @@ -1429,6 +1430,58 @@ def serialize_row(row): ) +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1_non_string_column_names(session): + pd_df = pandas.DataFrame( + {"one": [1, 2, 3], 2: [1.5, 3.75, 5], (3, 4): ["pq", "rs", "tu"]} + ) + bf_df = session.read_pandas(pd_df) + + try: + + def serialize_row(row): + custom = { + "name": row.name, + "index": [idx for idx in row.index], + "values": [ + val.item() if hasattr(val, "item") else val for val in row.values + ], + } + + return str( + { + "default": row.to_json(), + "split": row.to_json(orient="split"), + "records": row.to_json(orient="records"), + "index": row.to_json(orient="index"), + "table": row.to_json(orient="table"), + "custom": custom, + } + ) + + serialize_row_remote = session.remote_function("row", str, reuse=False)( + serialize_row + ) + + bf_result = bf_df.apply(serialize_row_remote, axis=1).to_pandas() + pd_result = pd_df.apply(serialize_row, axis=1) + + # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + # + # bf_result.index[0].dtype is 'string[pyarrow]' while + # pd_result.index[0].dtype is 'object', ignore this mismatch by using + # check_index_type=False. + pandas.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, serialize_row_remote + ) + + @pytest.mark.flaky(retries=2, delay=120) def test_df_apply_axis_1_multiindex(session): pd_df = pandas.DataFrame( @@ -1440,7 +1493,7 @@ def test_df_apply_axis_1_multiindex(session): try: def serialize_row(row): - row_dict = { + custom = { "name": row.name, "index": [idx for idx in row.index], "values": [ @@ -1448,9 +1501,19 @@ def serialize_row(row): ], } - return str(row_dict) + return str( + { + "default": row.to_json(), + "split": row.to_json(orient="split"), + "records": row.to_json(orient="records"), + "index": row.to_json(orient="index"), + "custom": custom, + } + ) - serialize_row_remote = session.remote_function("row", str)(serialize_row) + serialize_row_remote = session.remote_function("row", str, reuse=False)( + serialize_row + ) bf_result = bf_df.apply(serialize_row_remote, axis=1).to_pandas() pd_result = pd_df.apply(serialize_row, axis=1) @@ -1469,3 +1532,31 @@ def serialize_row(row): cleanup_remote_function_assets( session.bqclient, session.cloudfunctionsclient, serialize_row_remote ) + + +@pytest.mark.parametrize( + ("column"), + [ + pytest.param("date_col"), + pytest.param("datetime_col"), + ], +) +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1_unsupported_dtype(session, scalars_dfs, column): + scalars_df, _ = scalars_dfs + + try: + + @session.remote_function("row", str, reuse=False) + def echo(row): + return row[column] + + with pytest.raises( + BadRequest, match="400.*errorMessage.*Don't know how to handle type" + ): + scalars_df[[column]].apply(echo, axis=1).to_pandas() + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, echo + ) From edbac1bfd19ef565df7bb16b35d075bb02e52a0e Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Mon, 29 Apr 2024 19:00:49 +0000 Subject: [PATCH 08/29] fix import --- bigframes/functions/remote_function.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index e0e87f177a..ccac95a2f2 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -873,7 +873,9 @@ def remote_function( is_row_processor = True # Some defaults may be used from the session if not provided otherwise - session = session or bigframes.pandas.get_global_session() + import bigframes.pandas as bpd + + session = session or bpd.get_global_session() # A BigQuery client is required to perform BQ operations if not bigquery_client: From d3c07e9602600081b3fbd28b4c06b5c5b6370b53 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Mon, 29 Apr 2024 20:46:00 +0000 Subject: [PATCH 09/29] use `_cached` in df.apply to catch any rf execution errors early --- bigframes/dataframe.py | 6 +++++- tests/system/large/test_remote_function.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 6806aa22e7..9e4c2a2ff6 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3313,7 +3313,11 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): ops.RemoteFunctionOp(func=func, apply_on_null=True) ) result_series.name = None - return result_series + + # return Series with materialized result so that any error in the remote + # function is caught early + materialized_series = result_series._cached() + return materialized_series # Per-column apply results = {name: func(col, *args, **kwargs) for name, col in self.items()} diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 9432a9546e..e05fe46fef 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1583,7 +1583,7 @@ def echo(row): with pytest.raises( BadRequest, match="400.*errorMessage.*Don't know how to handle type" ): - scalars_df[[column]].apply(echo, axis=1).to_pandas() + scalars_df[[column]].apply(echo, axis=1) finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( From 7122b8a8e725dcc83387d43ea79194f17c829deb Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Mon, 29 Apr 2024 23:49:56 +0000 Subject: [PATCH 10/29] add test for row aggregates --- tests/system/large/test_remote_function.py | 36 ++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index e05fe46fef..103693f2b0 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1459,6 +1459,42 @@ def serialize_row(row): ) +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1_aggregates(session, scalars_dfs): + columns = ["int64_col", "int64_too", "float64_col"] + scalars_df, scalars_pandas_df = scalars_dfs + + try: + + def analyze(row): + return str( + { + "count": row.count(), + "min": row.max(), + "max": row.max(), + "mean": row.mean(), + "std": row.std(), + "var": row.var(), + } + ) + + analyze_remote = session.remote_function("row", str)(analyze) + + bf_result = ( + scalars_df[columns].dropna().apply(analyze_remote, axis=1).to_pandas() + ) + pd_result = scalars_pandas_df[columns].dropna().apply(analyze, axis=1) + + # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, analyze_remote + ) + + @pytest.mark.flaky(retries=2, delay=120) def test_df_apply_axis_1_non_string_column_names(session): pd_df = pandas.DataFrame( From 9f9b61ed1f66397dbff0a6dbe50a2759bb88ee31 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 30 Apr 2024 06:19:06 +0000 Subject: [PATCH 11/29] add row dtype information, also test --- bigframes/core/blocks.py | 4 +++- bigframes/functions/remote_function.py | 3 ++- tests/system/large/test_remote_function.py | 1 + 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 40bf2af810..fd004b2df4 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2075,6 +2075,7 @@ def _get_rows_as_json_values(self) -> Block: # all column types all_column_types = list(self.index.dtypes) + list(self.dtypes) column_types_csv = ", ".join([f'"{col}"' for col in all_column_types]) + row_dtype = f'"{bigframes.dtypes.lcd_type(*all_column_types)}"' row_json_column_name = guid.generate_guid() select_columns = index_column_ids + [row_json_column_name] @@ -2089,7 +2090,8 @@ def _get_rows_as_json_values(self) -> Block: "names", [{column_names_csv}], "types", [{column_types_csv}], "values", [{column_references_csv}], - "index", [{index_column_names_csv}] + "index", [{index_column_names_csv}], + "dtype", {row_dtype} ) AS {row_json_column_name} FROM T0 ) SELECT {select_columns_csv} FROM T1 diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 4f4d9777fa..3b23bbc44c 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -304,6 +304,7 @@ def get_pd_series(row): col_types = row_json["types"] col_values = row_json["values"] index_names = row_json["index"] + dtype = row_json["dtype"] # index and column names are not necessarily strings # they are serialized as repr(repr(name)) at source @@ -345,7 +346,7 @@ def convert_value(value, value_type): ] row_index = index_values[0] if len(index_values) == 1 else tuple(index_values) - row_series = pd.Series(row_values, index=col_names, name=row_index) + row_series = pd.Series(row_values, index=col_names, name=row_index, dtype=dtype) return row_series """ code += f"""\ diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 103693f2b0..2636df5f50 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1469,6 +1469,7 @@ def test_df_apply_axis_1_aggregates(session, scalars_dfs): def analyze(row): return str( { + "dtype": row.dtype, "count": row.count(), "min": row.max(), "max": row.max(), From 6fdd282512dd01b21a3c9f4a7f2bc7a1a597136c Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 1 May 2024 07:08:10 +0000 Subject: [PATCH 12/29] preserve the order of input in the output --- bigframes/core/blocks.py | 56 ++++++++++++++-------- bigframes/dataframe.py | 4 +- tests/system/small/test_remote_function.py | 26 ++++++++++ 3 files changed, 63 insertions(+), 23 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index fd004b2df4..7671191de6 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2054,31 +2054,45 @@ def _is_monotonic( return result def _get_rows_as_json_values(self) -> Block: - sql, index_column_ids, _ = self.to_sql_query(include_index=True) - - # all column names - all_column_names = index_column_ids + [col for col in self.column_labels] - column_names_csv = ", ".join([repr(repr(col)) for col in all_column_names]) + # We want to preserve any ordering currently present before turning to + # direct SQL manipulation. We will restore the ordering when we rebuild + # expression. + # TODO(shobs): Replace direct SQL manipulation by structured expression + # manipulation + ordering_column_name = guid.generate_guid() + expr = self.session._cache_with_offsets(self.expr) + expr = expr.promote_offsets(ordering_column_name) + sql = self.session._to_sql(expr) + + # names of the columns to serialize for the row + column_names = list(self.index_columns) + [col for col in self.column_labels] + column_names_csv = ", ".join([repr(repr(col)) for col in column_names]) # index column names index_column_names_csv = ", ".join( - [repr(repr(col)) for col in index_column_ids] + [repr(repr(col)) for col in self.index_columns] ) - # column names produced by the block sql, which may be different from - # the column labels due to internal normalization - sql_output_column_names = self.session.ibis_client.sql(sql).columns + # column references to form the array of values for the row column_references_csv = ", ".join( - [f"CAST(`{col}` AS STRING)" for col in sql_output_column_names] + [f"CAST(`{col}` AS STRING)" for col in self.expr.column_ids] ) - # all column types - all_column_types = list(self.index.dtypes) + list(self.dtypes) - column_types_csv = ", ".join([f'"{col}"' for col in all_column_types]) - row_dtype = f'"{bigframes.dtypes.lcd_type(*all_column_types)}"' + # types of the columns to serialize for the row + column_types = list(self.index.dtypes) + list(self.dtypes) + column_types_csv = ", ".join([f'"{col}"' for col in column_types]) + + # row dtype to use for deserializing the row as pandas series + pandas_row_dtype = bigframes.dtypes.lcd_type(*column_types) + if pandas_row_dtype is None: + pandas_row_dtype = "object" + pandas_row_dtype = f'"{pandas_row_dtype}"' + # create a json column representing row through SQL manipulation row_json_column_name = guid.generate_guid() - select_columns = index_column_ids + [row_json_column_name] + select_columns = ( + [ordering_column_name] + list(self.index_columns) + [row_json_column_name] + ) select_columns_csv = ", ".join(select_columns) json_sql = f"""\ With T0 AS ( @@ -2091,7 +2105,7 @@ def _get_rows_as_json_values(self) -> Block: "types", [{column_types_csv}], "values", [{column_references_csv}], "index", [{index_column_names_csv}], - "dtype", {row_dtype} + "dtype", {pandas_row_dtype} ) AS {row_json_column_name} FROM T0 ) SELECT {select_columns_csv} FROM T1 @@ -2099,20 +2113,20 @@ def _get_rows_as_json_values(self) -> Block: ibis_table = self.session.ibis_client.sql(json_sql) order_for_ibis_table = ordering.ExpressionOrdering( ordering_value_columns=tuple( - [ordering.ascending_over(column_id) for column_id in index_column_ids] + [ordering.ascending_over(ordering_column_name)] ), - total_ordering_columns=frozenset(index_column_ids), + total_ordering_columns=frozenset([ordering_column_name]), ) expr = core.ArrayValue.from_ibis( self.session, ibis_table, - [ibis_table[col] for col in select_columns], - hidden_ordering_columns=[], + [ibis_table[col] for col in select_columns if col != ordering_column_name], + hidden_ordering_columns=[ibis_table[ordering_column_name]], ordering=order_for_ibis_table, ) block = Block( expr, - index_columns=index_column_ids, + index_columns=self.index_columns, column_labels=[row_json_column_name], index_labels=self._index_labels, ) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 9e4c2a2ff6..fb94fee633 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3307,7 +3307,7 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): raise ValueError("For axis=1 a remote function must be used.") block = self._get_block() rows_as_json_series = bigframes.series.Series( - block._get_rows_as_json_values()._force_reproject() + block._get_rows_as_json_values() ) result_series = rows_as_json_series._apply_unary_op( ops.RemoteFunctionOp(func=func, apply_on_null=True) @@ -3316,7 +3316,7 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): # return Series with materialized result so that any error in the remote # function is caught early - materialized_series = result_series._cached() + materialized_series = result_series.cache() return materialized_series # Per-column apply diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index 09960a0dd4..4a38988015 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -711,6 +711,32 @@ def add_ints(row): ) +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1_ordering(session, scalars_dfs): + columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] + ordering_columns = ["bool_col", "int64_col"] + scalars_df, scalars_pandas_df = scalars_dfs + + def add_ints(row): + return row["int64_col"] + row["int64_too"] + + add_ints_remote = session.remote_function("row", int)(add_ints) + + bf_result = ( + scalars_df[columns] + .sort_values(ordering_columns) + .apply(add_ints_remote, axis=1) + .to_pandas() + ) + pd_result = ( + scalars_pandas_df[columns].sort_values(ordering_columns).apply(add_ints, axis=1) + ) + + # bf_result.dtype is 'Int64' while pd_result.dtype is 'object', ignore this + # mismatch by using check_dtype=False. + pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + + @pytest.mark.flaky(retries=2, delay=120) def test_df_apply_axis_1_multiindex(session): pd_df = pd.DataFrame( From 2d137ca626b38182072611cdbb28aa04c087daff Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 1 May 2024 08:02:47 +0000 Subject: [PATCH 13/29] absorb to_numpy() disparity in prerelease tests --- tests/system/small/test_remote_function.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index 4a38988015..e6ea82a850 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -734,7 +734,13 @@ def add_ints(row): # bf_result.dtype is 'Int64' while pd_result.dtype is 'object', ignore this # mismatch by using check_dtype=False. - pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + # + # bf_result.to_numpy() produces an array of numpy.float64's + # (in system_prerelease tests), while pd_result.to_numpy() produces an + # array of ints, ignore this mismatch by using check_exact=False. + pd.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_exact=False + ) @pytest.mark.flaky(retries=2, delay=120) From 3e45f78121f4111f38e06139e6624431aea22d81 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Thu, 2 May 2024 01:21:12 +0000 Subject: [PATCH 14/29] add tests for column multiindex and non remote function --- tests/system/large/test_remote_function.py | 104 ++++++++++----------- tests/system/small/test_remote_function.py | 8 ++ 2 files changed, 56 insertions(+), 56 deletions(-) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 2636df5f50..e4470b0a4b 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1496,64 +1496,57 @@ def analyze(row): ) -@pytest.mark.flaky(retries=2, delay=120) -def test_df_apply_axis_1_non_string_column_names(session): - pd_df = pandas.DataFrame( - {"one": [1, 2, 3], 2: [1.5, 3.75, 5], (3, 4): ["pq", "rs", "tu"]} - ) - bf_df = session.read_pandas(pd_df) - - try: - - def serialize_row(row): - custom = { - "name": row.name, - "index": [idx for idx in row.index], - "values": [ - val.item() if hasattr(val, "item") else val for val in row.values - ], - } - - return str( +@pytest.mark.parametrize( + ("pd_df"), + [ + pytest.param( + pandas.DataFrame( { - "default": row.to_json(), - "split": row.to_json(orient="split"), - "records": row.to_json(orient="records"), - "index": row.to_json(orient="index"), - "table": row.to_json(orient="table"), - "custom": custom, + "one": [1, 2, 3], + 2: [1.5, 3.75, 5], + (3, 4): ["pq", "rs", "tu"], + (5.0, "six", 7): [8, 9, 10], } - ) - - serialize_row_remote = session.remote_function("row", str, reuse=False)( - serialize_row - ) - - bf_result = bf_df.apply(serialize_row_remote, axis=1).to_pandas() - pd_result = pd_df.apply(serialize_row, axis=1) - - # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' - # , ignore this mismatch by using check_dtype=False. - # - # bf_result.index[0].dtype is 'string[pyarrow]' while - # pd_result.index[0].dtype is 'object', ignore this mismatch by using - # check_index_type=False. - pandas.testing.assert_series_equal( - pd_result, bf_result, check_dtype=False, check_index_type=False - ) - finally: - # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, serialize_row_remote - ) - - + ), + id="non-string-column-names", + ), + pytest.param( + pandas.DataFrame( + { + "x": [1, 2, 3], + "y": [1.5, 3.75, 5], + "z": ["pq", "rs", "tu"], + }, + index=pandas.MultiIndex.from_tuples( + [ + ("a", 100), + ("a", 200), + ("b", 300), + ] + ), + ), + id="multiindex", + ), + pytest.param( + pandas.DataFrame( + [ + [10, 1.5, "pq"], + [20, 3.75, "rs"], + [30, 8.0, "tu"], + ], + columns=pandas.MultiIndex.from_arrays( + [ + ["first", "last_two", "last_two"], + [1, 2, 3], + ] + ), + ), + id="column-multiindex", + ), + ], +) @pytest.mark.flaky(retries=2, delay=120) -def test_df_apply_axis_1_multiindex(session): - pd_df = pandas.DataFrame( - {"x": [1, 2, 3], "y": [1.5, 3.75, 5], "z": ["pq", "rs", "tu"]}, - index=pandas.MultiIndex.from_tuples([("a", 100), ("a", 200), ("b", 300)]), - ) +def test_df_apply_axis_1_complex(session, pd_df): bf_df = session.read_pandas(pd_df) try: @@ -1566,7 +1559,6 @@ def serialize_row(row): val.item() if hasattr(val, "item") else val for val in row.values ], } - return str( { "default": row.to_json(), diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index e6ea82a850..6f4e8baee9 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -768,3 +768,11 @@ def add_numbers(row): pd.testing.assert_series_equal( pd_result, bf_result, check_dtype=False, check_index_type=False ) + + +def test_df_apply_axis_1_unsupported_callable(scalars_df_index): + def add_ints(row): + return row["in64_col"] + row["in64_too"] + + with pytest.raises(ValueError, match="For axis=1 a remote function must be used."): + scalars_df_index.apply(add_ints, axis=1) From e31a09d9a2911204058d53f409c03c366b69bfee Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Thu, 2 May 2024 17:00:38 +0000 Subject: [PATCH 15/29] add preview note for row processing --- bigframes/session/__init__.py | 3 +++ .../bigframes_vendored/pandas/core/frame.py | 27 +++++++++++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index b6e583cfe5..aa3a9a4840 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1426,6 +1426,9 @@ def remote_function( """Decorator to turn a user defined function into a BigQuery remote function. Check out the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes. + .. note:: + ``input_types="row"`` scenario is in preview. + .. note:: Please make sure following is setup before using this API: diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 039233318c..208e510191 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -4204,8 +4204,12 @@ def apply(self, func, *, axis=0, args=(), **kwargs): """Apply a function along an axis of the DataFrame. Objects passed to the function are Series objects whose index is - the DataFrame's index (``axis=0``) the final return type - is inferred from the return type of the applied function. + the DataFrame's index (``axis=0``) or the DataFrame's columns (``axis=1``). + The final return type is inferred from the return type of the applied + function. + + .. note:: + ``axis=1`` scenario is in preview. **Examples:** @@ -4230,9 +4234,28 @@ def apply(self, func, *, axis=0, args=(), **kwargs): [2 rows x 2 columns] + You could apply a user defined function to every row of the DataFrame by + creating a remote function out of it, and using it with `axis=1`. + + >>> @bpd.remote_function("row", int, reuse=False) + ... def foo(row): + ... result = 1 + ... result += row["col1"] + ... result += row["col2"]*row["col2"] + ... return result + + >>> df.apply(foo, axis=1) + 0 11 + 1 19 + dtype: Int64 + Args: func (function): Function to apply to each column or row. + axis ({index (0), columns (1)}): + Axis along which the function is applied. Specify 0 or 'index' + to apply function to each column. Specify 1 or 'columns' to + apply function to each row. args (tuple): Positional arguments to pass to `func` in addition to the array/series. From b828860832cfd8e705e86d9e86d250a344f39481 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Thu, 2 May 2024 17:41:59 +0000 Subject: [PATCH 16/29] add warning for input_types="row" and axis=1 --- bigframes/dataframe.py | 7 +++++++ bigframes/exceptions.py | 4 ++++ bigframes/functions/remote_function.py | 6 ++++++ tests/system/small/test_remote_function.py | 13 +++++++++++-- 4 files changed, 28 insertions(+), 2 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index fb94fee633..b47908a392 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -34,6 +34,7 @@ Tuple, Union, ) +import warnings import bigframes_vendored.pandas.core.frame as vendored_pandas_frame import bigframes_vendored.pandas.pandas._typing as vendored_pandas_typing @@ -60,6 +61,7 @@ import bigframes.core.utils as utils import bigframes.core.window import bigframes.dtypes +import bigframes.exceptions import bigframes.formatting_helpers as formatter import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops @@ -3303,6 +3305,11 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame: def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): if utils.get_axis_number(axis) == 1: + warnings.warn( + "axis=1 scenario is in preview.", + category=bigframes.exceptions.PreviewWarning, + ) + if not hasattr(func, "bigframes_remote_function"): raise ValueError("For axis=1 a remote function must be used.") block = self._get_block() diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index 62122e79d2..efe89b7728 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -15,3 +15,7 @@ class UnknownLocationWarning(Warning): """The location is set to an unknown value.""" + + +class PreviewWarning(Warning): + """The feature is in preview.""" diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 3b23bbc44c..b447d43885 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -27,6 +27,7 @@ # TODO(shobs): import typing module and use its classes through namespapace.* from typing import List, Literal, NamedTuple, Optional, Sequence, TYPE_CHECKING, Union +import warnings import ibis import pandas @@ -870,6 +871,11 @@ def remote_function( is_row_processor = False if input_types == "row": + warnings.warn( + 'input_types="row" scenario is in preview.', + stacklevel=1, + category=bigframes.exceptions.PreviewWarning, + ) input_types = [str] is_row_processor = True elif isinstance(input_types, type): diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index 6f4e8baee9..c23373a9ff 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -18,6 +18,7 @@ import pytest import bigframes +import bigframes.exceptions from bigframes.functions import remote_function as rf from tests.system.utils import assert_pandas_df_equal @@ -695,9 +696,17 @@ def test_df_apply_axis_1(session, scalars_dfs): def add_ints(row): return row["int64_col"] + row["int64_too"] - add_ints_remote = session.remote_function("row", int)(add_ints) + with pytest.warns( + bigframes.exceptions.PreviewWarning, + match='input_types="row" scenario is in preview.', + ): + add_ints_remote = session.remote_function("row", int)(add_ints) + + with pytest.warns( + bigframes.exceptions.PreviewWarning, match="axis=1 scenario is in preview." + ): + bf_result = scalars_df[columns].apply(add_ints_remote, axis=1).to_pandas() - bf_result = scalars_df[columns].apply(add_ints_remote, axis=1).to_pandas() pd_result = scalars_pandas_df[columns].apply(add_ints, axis=1) # bf_result.dtype is 'Int64' while pd_result.dtype is 'object', ignore this From eb383f35dba0638f0cc1a2268ffc2011b8aa1380 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Thu, 2 May 2024 19:14:10 +0000 Subject: [PATCH 17/29] introduce early check on the supported dtypes --- bigframes/dataframe.py | 31 +++++++++++++++++++++- bigframes/functions/remote_function.py | 3 +++ tests/system/large/test_remote_function.py | 28 ------------------- tests/system/small/test_remote_function.py | 31 ++++++++++++++++++++++ 4 files changed, 64 insertions(+), 29 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index b47908a392..d8d198d6e1 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3310,18 +3310,47 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): category=bigframes.exceptions.PreviewWarning, ) + # Early check whether the dataframe dtypes are currently supported + # in the remote function + # NOTE: Keep in sync with the value converters used in the gcf code + # generated in generate_cloud_function_main_code in remote_function.py + remote_function_supported_dtypes = ( + bigframes.dtypes.INT_DTYPE, + bigframes.dtypes.FLOAT_DTYPE, + bigframes.dtypes.BOOL_DTYPE, + bigframes.dtypes.STRING_DTYPE, + ) + supported_dtypes_types = tuple( + type(dtype) for dtype in remote_function_supported_dtypes + ) + supported_dtypes_hints = tuple( + str(dtype) for dtype in remote_function_supported_dtypes + ) + + for dtype in self.dtypes: + if not isinstance(dtype, supported_dtypes_types): + raise TypeError( + f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1." + f" Supported dtypes are {supported_dtypes_hints}." + ) + + # Check if the function is a remote function if not hasattr(func, "bigframes_remote_function"): raise ValueError("For axis=1 a remote function must be used.") + + # Serialize the rows as json values block = self._get_block() rows_as_json_series = bigframes.series.Series( block._get_rows_as_json_values() ) + + # Apply the function result_series = rows_as_json_series._apply_unary_op( ops.RemoteFunctionOp(func=func, apply_on_null=True) ) result_series.name = None - # return Series with materialized result so that any error in the remote + # Return Series with materialized result so that any error in the remote # function is caught early materialized_series = result_series.cache() return materialized_series diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index b447d43885..de30012b8c 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -876,6 +876,9 @@ def remote_function( stacklevel=1, category=bigframes.exceptions.PreviewWarning, ) + + # we will model the row as a json serialized string containing the data + # and the metadata representing the row input_types = [str] is_row_processor = True elif isinstance(input_types, type): diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index e4470b0a4b..fa2e8a739f 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1590,31 +1590,3 @@ def serialize_row(row): cleanup_remote_function_assets( session.bqclient, session.cloudfunctionsclient, serialize_row_remote ) - - -@pytest.mark.parametrize( - ("column"), - [ - pytest.param("date_col"), - pytest.param("datetime_col"), - ], -) -@pytest.mark.flaky(retries=2, delay=120) -def test_df_apply_axis_1_unsupported_dtype(session, scalars_dfs, column): - scalars_df, _ = scalars_dfs - - try: - - @session.remote_function("row", str, reuse=False) - def echo(row): - return row[column] - - with pytest.raises( - BadRequest, match="400.*errorMessage.*Don't know how to handle type" - ): - scalars_df[[column]].apply(echo, axis=1) - finally: - # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, echo - ) diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index c23373a9ff..ac0c174057 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re + import google.api_core.exceptions from google.cloud import bigquery import pandas as pd @@ -785,3 +787,32 @@ def add_ints(row): with pytest.raises(ValueError, match="For axis=1 a remote function must be used."): scalars_df_index.apply(add_ints, axis=1) + + +@pytest.mark.parametrize( + ("column"), + [ + pytest.param("bytes_col"), + pytest.param("date_col"), + pytest.param("datetime_col"), + pytest.param("geography_col"), + pytest.param("numeric_col"), + pytest.param("time_col"), + pytest.param("timestamp_col"), + ], +) +def test_df_apply_axis_1_unsupported_dtype(session, scalars_df_index, column): + # It doesn't matter if it is a remote function or not, the dtype check + # is done even before the function type check with axis=1 + def echo(row): + return row[column] + + dtype = scalars_df_index[column].dtype + + with pytest.raises( + TypeError, + match=re.escape( + f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1. Supported dtypes are ('Int64', 'Float64', 'boolean', 'string')." + ), + ): + scalars_df_index[[column]].apply(echo, axis=1) From 7a3aa5f7eb7f370ccb0abe388e2f451ca24c3fd5 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Thu, 2 May 2024 20:50:07 +0000 Subject: [PATCH 18/29] asjust test after early dtype handling --- tests/system/small/test_remote_function.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index ac0c174057..72f87dd0c4 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -786,7 +786,9 @@ def add_ints(row): return row["in64_col"] + row["in64_too"] with pytest.raises(ValueError, match="For axis=1 a remote function must be used."): - scalars_df_index.apply(add_ints, axis=1) + scalars_df_index[ + ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] + ].apply(add_ints, axis=1) @pytest.mark.parametrize( From 7383faf0ce312675b15243f519ecbb625e025346 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Sat, 4 May 2024 04:03:24 +0000 Subject: [PATCH 19/29] address review comments --- bigframes/core/blocks.py | 35 +++++++---------- bigframes/core/sql.py | 29 ++++++++++++++ bigframes/dataframe.py | 2 +- bigframes/functions/remote_function.py | 44 ++++++++++++---------- tests/system/large/test_remote_function.py | 5 ++- tests/system/small/test_remote_function.py | 2 +- 6 files changed, 72 insertions(+), 45 deletions(-) create mode 100644 bigframes/core/sql.py diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 7671191de6..32c6aa5aff 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -45,8 +45,8 @@ import bigframes.core.join_def as join_defs import bigframes.core.ordering as ordering import bigframes.core.schema as bf_schema +import bigframes.core.sql as sql import bigframes.core.tree_properties as tree_properties -import bigframes.core.utils import bigframes.core.utils as utils import bigframes.dtypes import bigframes.features @@ -1418,9 +1418,7 @@ def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]: ) def add_prefix(self, prefix: str, axis: str | int | None = None) -> Block: - axis_number = bigframes.core.utils.get_axis_number( - "rows" if (axis is None) else axis - ) + axis_number = utils.get_axis_number("rows" if (axis is None) else axis) if axis_number == 0: expr = self._expr for index_col in self._index_columns: @@ -1441,9 +1439,7 @@ def add_prefix(self, prefix: str, axis: str | int | None = None) -> Block: return self.rename(columns=lambda label: f"{prefix}{label}") def add_suffix(self, suffix: str, axis: str | int | None = None) -> Block: - axis_number = bigframes.core.utils.get_axis_number( - "rows" if (axis is None) else axis - ) + axis_number = utils.get_axis_number("rows" if (axis is None) else axis) if axis_number == 0: expr = self._expr for index_col in self._index_columns: @@ -2062,31 +2058,29 @@ def _get_rows_as_json_values(self) -> Block: ordering_column_name = guid.generate_guid() expr = self.session._cache_with_offsets(self.expr) expr = expr.promote_offsets(ordering_column_name) - sql = self.session._to_sql(expr) + expr_sql = self.session._to_sql(expr) # names of the columns to serialize for the row column_names = list(self.index_columns) + [col for col in self.column_labels] column_names_csv = ", ".join([repr(repr(col)) for col in column_names]) - # index column names - index_column_names_csv = ", ".join( - [repr(repr(col)) for col in self.index_columns] - ) + # index columns count + index_columns_count = len(self.index_columns) # column references to form the array of values for the row column_references_csv = ", ".join( - [f"CAST(`{col}` AS STRING)" for col in self.expr.column_ids] + [sql.cast_as_string(col) for col in self.expr.column_ids] ) # types of the columns to serialize for the row column_types = list(self.index.dtypes) + list(self.dtypes) - column_types_csv = ", ".join([f'"{col}"' for col in column_types]) + column_types_csv = ", ".join([sql.quote(typ) for typ in column_types]) # row dtype to use for deserializing the row as pandas series pandas_row_dtype = bigframes.dtypes.lcd_type(*column_types) if pandas_row_dtype is None: pandas_row_dtype = "object" - pandas_row_dtype = f'"{pandas_row_dtype}"' + pandas_row_dtype = sql.quote(pandas_row_dtype) # create a json column representing row through SQL manipulation row_json_column_name = guid.generate_guid() @@ -2096,7 +2090,7 @@ def _get_rows_as_json_values(self) -> Block: select_columns_csv = ", ".join(select_columns) json_sql = f"""\ With T0 AS ( -{textwrap.indent(sql, " ")} +{textwrap.indent(expr_sql, " ")} ), T1 AS ( SELECT *, @@ -2104,18 +2098,15 @@ def _get_rows_as_json_values(self) -> Block: "names", [{column_names_csv}], "types", [{column_types_csv}], "values", [{column_references_csv}], - "index", [{index_column_names_csv}], + "indexlength", {index_columns_count}, "dtype", {pandas_row_dtype} ) AS {row_json_column_name} FROM T0 ) SELECT {select_columns_csv} FROM T1 """ ibis_table = self.session.ibis_client.sql(json_sql) - order_for_ibis_table = ordering.ExpressionOrdering( - ordering_value_columns=tuple( - [ordering.ascending_over(ordering_column_name)] - ), - total_ordering_columns=frozenset([ordering_column_name]), + order_for_ibis_table = ordering.ExpressionOrdering.from_offset_col( + ordering_column_name ) expr = core.ArrayValue.from_ibis( self.session, diff --git a/bigframes/core/sql.py b/bigframes/core/sql.py new file mode 100644 index 0000000000..3e165a67f5 --- /dev/null +++ b/bigframes/core/sql.py @@ -0,0 +1,29 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Utility functions for SQL construction. +""" + + +def quote(value): + """Return a quoted string of the input.""" + + return f'"{value}"' + + +def cast_as_string(column_name): + """Return a string representing string casting of a column.""" + + return f"CAST(`{column_name}` AS STRING)" diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index d8d198d6e1..f666c28d5b 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3329,7 +3329,7 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): for dtype in self.dtypes: if not isinstance(dtype, supported_dtypes_types): - raise TypeError( + raise NotImplementedError( f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1." f" Supported dtypes are {supported_dtypes_hints}." ) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index de30012b8c..ca3f494c34 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -297,6 +297,7 @@ def generate_cloud_function_main_code(self, def_, dir, is_row_processor=False): """ if is_row_processor: code += """\ +import ast import pandas as pd def get_pd_series(row): @@ -304,14 +305,25 @@ def get_pd_series(row): col_names = row_json["names"] col_types = row_json["types"] col_values = row_json["values"] - index_names = row_json["index"] + index_length = row_json["indexlength"] dtype = row_json["dtype"] - # index and column names are not necessarily strings - # they are serialized as repr(repr(name)) at source - col_names = [eval(name) for name in col_names] - index_names = [eval(name) for name in index_names] + # At this point we are assuming that col_names, col_types and col_values are + # arrays of the same length, representing column names, types and values for + # one row of data + # column names are not necessarily strings + # they are serialized as repr(name) at source + evaluated_col_names = [] + for col_name in col_names: + try: + col_name = ast.literal_eval(col_name) + except Exception as ex: + raise RuntimeError(f"Failed to evaluate column name from '{col_name}': {ex}") + evaluated_col_names.append(col_name) + col_names = evaluated_col_names + + # Supported converters for pandas to python types value_converters = { "boolean": lambda val: val == "true", "Int64": int, @@ -327,27 +339,21 @@ def convert_value(value, value_type): return None return value_converter(value) - index_positions = [col_names.index(col) for col in index_names] - index_values = [ pd.Series([convert_value(col_values[i], col_types[i])], dtype=col_types[i])[0] - for i in index_positions - ] - - col_names = [ - col_names[i] - for i, a in enumerate(col_names) - if i not in index_positions + for i in range(index_length) ] - row_values = [ - pd.Series([convert_value(a, col_types[i])], dtype=col_types[i])[0] - for i, a in enumerate(col_values) - if i not in index_positions + data_col_names = col_names[index_length:] + data_col_types = col_types[index_length:] + data_col_values = col_values[index_length:] + data_col_values = [ + pd.Series([convert_value(a, data_col_types[i])], dtype=data_col_types[i])[0] + for i, a in enumerate(data_col_values) ] row_index = index_values[0] if len(index_values) == 1 else tuple(index_values) - row_series = pd.Series(row_values, index=col_names, name=row_index, dtype=dtype) + row_series = pd.Series(data_col_values, index=data_col_names, name=row_index, dtype=dtype) return row_series """ code += f"""\ diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index fa2e8a739f..c0f2c03e3d 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1502,13 +1502,14 @@ def analyze(row): pytest.param( pandas.DataFrame( { - "one": [1, 2, 3], + "2": [1, 2, 3], 2: [1.5, 3.75, 5], (3, 4): ["pq", "rs", "tu"], (5.0, "six", 7): [8, 9, 10], + 'raise Exception("hacked!")': [11, 12, 13], } ), - id="non-string-column-names", + id="mixed-type-column-names", ), pytest.param( pandas.DataFrame( diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index 72f87dd0c4..17485c5a40 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -812,7 +812,7 @@ def echo(row): dtype = scalars_df_index[column].dtype with pytest.raises( - TypeError, + NotImplementedError, match=re.escape( f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1. Supported dtypes are ('Int64', 'Float64', 'boolean', 'string')." ), From 84d719ccacbacd92746436bcb1b8145d009b6a9d Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Sat, 4 May 2024 19:02:13 +0000 Subject: [PATCH 20/29] user NameError for column name parsing issue, address test coverage failure --- bigframes/functions/remote_function.py | 2 +- tests/system/small/test_remote_function.py | 25 +++++++++++++++------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 296674ca17..ce2bb9abc2 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -319,7 +319,7 @@ def get_pd_series(row): try: col_name = ast.literal_eval(col_name) except Exception as ex: - raise RuntimeError(f"Failed to evaluate column name from '{col_name}': {ex}") + raise NameError(f"Failed to evaluate column name from '{col_name}': {ex}") evaluated_col_names.append(col_name) col_names = evaluated_col_names diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index 17485c5a40..71564e299a 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -781,14 +781,18 @@ def add_numbers(row): ) -def test_df_apply_axis_1_unsupported_callable(scalars_df_index): +def test_df_apply_axis_1_unsupported_callable(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] + def add_ints(row): - return row["in64_col"] + row["in64_too"] + return row["int64_col"] + row["int64_too"] + + # pandas works + scalars_pandas_df.apply(add_ints, axis=1) with pytest.raises(ValueError, match="For axis=1 a remote function must be used."): - scalars_df_index[ - ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] - ].apply(add_ints, axis=1) + scalars_df[columns].apply(add_ints, axis=1) @pytest.mark.parametrize( @@ -803,13 +807,18 @@ def add_ints(row): pytest.param("timestamp_col"), ], ) -def test_df_apply_axis_1_unsupported_dtype(session, scalars_df_index, column): +def test_df_apply_axis_1_unsupported_dtype(scalars_dfs, column): + scalars_df, scalars_pandas_df = scalars_dfs + # It doesn't matter if it is a remote function or not, the dtype check # is done even before the function type check with axis=1 def echo(row): return row[column] - dtype = scalars_df_index[column].dtype + # pandas works + scalars_pandas_df[[column]].apply(echo, axis=1) + + dtype = scalars_df[column].dtype with pytest.raises( NotImplementedError, @@ -817,4 +826,4 @@ def echo(row): f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1. Supported dtypes are ('Int64', 'Float64', 'boolean', 'string')." ), ): - scalars_df_index[[column]].apply(echo, axis=1) + scalars_df[[column]].apply(echo, axis=1) From 4e96b96c6b89a30f671bca7f9d5da18090330891 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 7 May 2024 09:31:52 +0000 Subject: [PATCH 21/29] address nan return handling in the gcf code --- bigframes/functions/remote_function.py | 7 ++++- tests/system/large/test_remote_function.py | 32 ++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index ce2bb9abc2..b5b98b860c 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -298,6 +298,7 @@ def generate_cloud_function_main_code(self, def_, dir, is_row_processor=False): if is_row_processor: code += """\ import ast +import math import pandas as pd def get_pd_series(row): @@ -370,10 +371,14 @@ def {handler_func_name}(request): replies = [] for call in calls: """ + # TODO(shobs): Make GCF return 'nan' and 'inf' as is after internal + # issue 339134338 is resolved if is_row_processor: code += """\ reply = udf(get_pd_series(call[0])) - if pd.isna(reply): + if isinstance(reply, float) and (math.isnan(reply) or math.isinf(reply)): + reply = None + elif pd.isna(reply): # Pandas N/A values are not json serializable, so use a python # equivalent instead reply = None diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index f235b6b0c5..db9d9184c0 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1631,3 +1631,35 @@ def serialize_row(row): cleanup_remote_function_assets( session.bqclient, session.cloudfunctionsclient, serialize_row_remote ) + + +# @pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1_na_nan_inf(session): + pd_df = pandas.DataFrame( + {"text": ["1", "2.5", "pandas na", "numpy nan", "nan", "inf"]} + ) + bf_df = session.read_pandas(pd_df) + + try: + + def float_parser(row): + import numpy as mynp + import pandas as mypd + + if row["text"] == "pandas na": + return mypd.NA + if row["text"] == "numpy nan": + return mynp.nan + return float(row["text"]) + + float_parser_remote = session.remote_function("row", float, reuse=False)( + float_parser + ) + + # We just want to test that the expression is valid and can execute in BQ + bf_df.apply(float_parser_remote, axis=1).to_pandas() + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, float_parser_remote + ) From 612055dbff06cbe6380eac5f8efffdd8981e58aa Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 7 May 2024 23:15:21 +0000 Subject: [PATCH 22/29] handle (nan, inf, -inf) --- bigframes/functions/remote_function.py | 11 +++++-- tests/system/large/test_remote_function.py | 36 ++++++++++++++++++---- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index b5b98b860c..8d4c58b533 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -371,13 +371,18 @@ def {handler_func_name}(request): replies = [] for call in calls: """ - # TODO(shobs): Make GCF return 'nan' and 'inf' as is after internal - # issue 339134338 is resolved + if is_row_processor: code += """\ reply = udf(get_pd_series(call[0])) if isinstance(reply, float) and (math.isnan(reply) or math.isinf(reply)): - reply = None + # json serialization of the special float values (nan, inf, -inf) + # is not in strict compliance of the JSON specification + # https://docs.python.org/3/library/json.html#basic-usage. + # Let's convert them to a quoted string representation ("NaN", + # "Infinity", "-Infinity" respectively) which is handled by + # BigQuery + reply = json.dumps(reply) elif pd.isna(reply): # Pandas N/A values are not json serializable, so use a python # equivalent instead diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index db9d9184c0..53315a99f1 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1633,12 +1633,27 @@ def serialize_row(row): ) -# @pytest.mark.flaky(retries=2, delay=120) +@pytest.mark.flaky(retries=2, delay=120) def test_df_apply_axis_1_na_nan_inf(session): - pd_df = pandas.DataFrame( - {"text": ["1", "2.5", "pandas na", "numpy nan", "nan", "inf"]} + bf_df = session.read_gbq( + """\ +SELECT "1" AS text, 1 AS num +UNION ALL +SELECT "2.5" AS text, 2.5 AS num +UNION ALL +SELECT "nan" AS text, IEEE_DIVIDE(0, 0) AS num +UNION ALL +SELECT "inf" AS text, IEEE_DIVIDE(1, 0) AS num +UNION ALL +SELECT "-inf" AS text, IEEE_DIVIDE(-1, 0) AS num +UNION ALL +SELECT "numpy nan" AS text, IEEE_DIVIDE(0, 0) AS num +UNION ALL +SELECT "pandas na" AS text, NULL AS num + """ ) - bf_df = session.read_pandas(pd_df) + + pd_df = bf_df.to_pandas() try: @@ -1656,8 +1671,17 @@ def float_parser(row): float_parser ) - # We just want to test that the expression is valid and can execute in BQ - bf_df.apply(float_parser_remote, axis=1).to_pandas() + pd_result = pd_df.apply(float_parser, axis=1) + bf_result = bf_df.apply(float_parser_remote, axis=1).to_pandas() + + # bf_result.dtype is 'Float64' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + + # Let's also assert that the data is consistent in this round trip + # BQ -> BigFrames -> BQ -> GCF -> BQ -> BigFrames + bq_result = bf_df["num"].to_pandas() + pandas.testing.assert_series_equal(pd_result, bq_result) finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( From 1c58ded92f2ab199f97a1a8e4ba76c24b58d85f6 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 7 May 2024 23:39:31 +0000 Subject: [PATCH 23/29] replace "row" by bpd.Series for input types --- bigframes/functions/remote_function.py | 15 +++++++++------ bigframes/pandas/__init__.py | 2 +- bigframes/session/__init__.py | 4 ++-- tests/system/large/test_remote_function.py | 9 +++++---- tests/system/small/test_remote_function.py | 12 ++++++++---- .../bigframes_vendored/pandas/core/frame.py | 2 +- 6 files changed, 26 insertions(+), 18 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 8d4c58b533..99567b4fc2 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -26,7 +26,7 @@ import textwrap # TODO(shobs): import typing module and use its classes through namespapace.* -from typing import List, Literal, NamedTuple, Optional, Sequence, TYPE_CHECKING, Union +from typing import List, NamedTuple, Optional, Sequence, TYPE_CHECKING, Union import warnings import ibis @@ -732,7 +732,7 @@ def get_routine_reference( # which has moved as @js to the ibis package # https://github.com/ibis-project/ibis/blob/master/ibis/backends/bigquery/udf/__init__.py def remote_function( - input_types: Union[type, Sequence[type], Literal["row"]], + input_types: Union[type, Sequence[type]], output_type: type, session: Optional[Session] = None, bigquery_client: Optional[bigquery.Client] = None, @@ -796,10 +796,10 @@ def remote_function( `$ gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:CONNECTION_SERVICE_ACCOUNT_ID" --role="roles/run.invoker"`. Args: - input_types (type, sequence(type) or "row"): + input_types (type, sequence(type)): For scalar user defined function it should be the input type or sequence of input types. For row processing user defined function, - literal "row" should be specified. + type `Series` should be specified. output_type (type): Data type of the output in the user defined function. session (bigframes.Session, Optional): @@ -899,9 +899,12 @@ def remote_function( https://cloud.google.com/functions/docs/configuring/max-instances """ is_row_processor = False - if input_types == "row": + + import bigframes.series + + if input_types == bigframes.series.Series: warnings.warn( - 'input_types="row" scenario is in preview.', + "input_types=Series scenario is in preview.", stacklevel=1, category=bigframes.exceptions.PreviewWarning, ) diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 247630adfc..2200fd6aa4 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -640,7 +640,7 @@ def read_parquet( def remote_function( - input_types: Union[type, Sequence[type], Literal["row"]], + input_types: Union[type, Sequence[type]], output_type: type, dataset: Optional[str] = None, bigquery_connection: Optional[str] = None, diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 9f1a624843..c0f321380d 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1454,7 +1454,7 @@ def _ibis_to_temp_table( def remote_function( self, - input_types: Union[type, Sequence[type], Literal["row"]], + input_types: Union[type, Sequence[type]], output_type: type, dataset: Optional[str] = None, bigquery_connection: Optional[str] = None, @@ -1472,7 +1472,7 @@ def remote_function( the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes. .. note:: - ``input_types="row"`` scenario is in preview. + ``input_types=Series`` scenario is in preview. .. note:: Please make sure following is setup before using this API: diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 53315a99f1..1a24ca2cfa 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -28,6 +28,7 @@ import bigframes from bigframes.functions.remote_function import get_cloud_function_name +import bigframes.series from tests.system.utils import ( assert_pandas_df_equal, delete_cloud_function, @@ -1482,9 +1483,9 @@ def serialize_row(row): } ) - serialize_row_remote = session.remote_function("row", str, reuse=False)( - serialize_row - ) + serialize_row_remote = session.remote_function( + bigframes.series.Series, str, reuse=False + )(serialize_row) bf_result = scalars_df[columns].apply(serialize_row_remote, axis=1).to_pandas() pd_result = scalars_pandas_df[columns].apply(serialize_row, axis=1) @@ -1519,7 +1520,7 @@ def analyze(row): } ) - analyze_remote = session.remote_function("row", str)(analyze) + analyze_remote = session.remote_function(bigframes.series.Series, str)(analyze) bf_result = ( scalars_df[columns].dropna().apply(analyze_remote, axis=1).to_pandas() diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index 71564e299a..9c60c821a7 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -700,9 +700,11 @@ def add_ints(row): with pytest.warns( bigframes.exceptions.PreviewWarning, - match='input_types="row" scenario is in preview.', + match="input_types=Series scenario is in preview.", ): - add_ints_remote = session.remote_function("row", int)(add_ints) + add_ints_remote = session.remote_function(bigframes.series.Series, int)( + add_ints + ) with pytest.warns( bigframes.exceptions.PreviewWarning, match="axis=1 scenario is in preview." @@ -731,7 +733,7 @@ def test_df_apply_axis_1_ordering(session, scalars_dfs): def add_ints(row): return row["int64_col"] + row["int64_too"] - add_ints_remote = session.remote_function("row", int)(add_ints) + add_ints_remote = session.remote_function(bigframes.series.Series, int)(add_ints) bf_result = ( scalars_df[columns] @@ -765,7 +767,9 @@ def test_df_apply_axis_1_multiindex(session): def add_numbers(row): return row["x"] + row["y"] - add_numbers_remote = session.remote_function("row", float)(add_numbers) + add_numbers_remote = session.remote_function(bigframes.series.Series, float)( + add_numbers + ) bf_result = bf_df.apply(add_numbers_remote, axis=1).to_pandas() pd_result = pd_df.apply(add_numbers, axis=1) diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 38fd847c3b..31d5e88c7e 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -4237,7 +4237,7 @@ def apply(self, func, *, axis=0, args=(), **kwargs): You could apply a user defined function to every row of the DataFrame by creating a remote function out of it, and using it with `axis=1`. - >>> @bpd.remote_function("row", int, reuse=False) + >>> @bpd.remote_function(bpd.Series, int, reuse=False) ... def foo(row): ... result = 1 ... result += row["col1"] From bede078c614bdb698baaa4730180bb71f3e395d6 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 7 May 2024 23:43:46 +0000 Subject: [PATCH 24/29] make the bq parity assert more readable --- tests/system/large/test_remote_function.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 1a24ca2cfa..ce301033d3 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1611,9 +1611,9 @@ def serialize_row(row): } ) - serialize_row_remote = session.remote_function("row", str, reuse=False)( - serialize_row - ) + serialize_row_remote = session.remote_function( + bigframes.series.Series, str, reuse=False + )(serialize_row) bf_result = bf_df.apply(serialize_row_remote, axis=1).to_pandas() pd_result = pd_df.apply(serialize_row, axis=1) @@ -1636,6 +1636,9 @@ def serialize_row(row): @pytest.mark.flaky(retries=2, delay=120) def test_df_apply_axis_1_na_nan_inf(session): + """This test is for special cases of float values, to make sure any (nan, + inf, -inf) produced by user code is honored. + """ bf_df = session.read_gbq( """\ SELECT "1" AS text, 1 AS num @@ -1668,9 +1671,9 @@ def float_parser(row): return mynp.nan return float(row["text"]) - float_parser_remote = session.remote_function("row", float, reuse=False)( - float_parser - ) + float_parser_remote = session.remote_function( + bigframes.series.Series, float, reuse=False + )(float_parser) pd_result = pd_df.apply(float_parser, axis=1) bf_result = bf_df.apply(float_parser_remote, axis=1).to_pandas() @@ -1680,9 +1683,10 @@ def float_parser(row): pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) # Let's also assert that the data is consistent in this round trip - # BQ -> BigFrames -> BQ -> GCF -> BQ -> BigFrames + # (BQ -> BigFrames -> BQ -> GCF -> BQ -> BigFrames) w.r.t. their + # expected values in BQ bq_result = bf_df["num"].to_pandas() - pandas.testing.assert_series_equal(pd_result, bq_result) + pandas.testing.assert_series_equal(bq_result, bf_result) finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( From 3409bc39e7f5a9dd47d874140a5addde89d69c9c Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 8 May 2024 00:45:53 +0000 Subject: [PATCH 25/29] fix the series name before assert --- bigframes/session/__init__.py | 4 ++-- tests/system/large/test_remote_function.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index c0f321380d..0914f73570 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1512,10 +1512,10 @@ def remote_function( `$ gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:CONNECTION_SERVICE_ACCOUNT_ID" --role="roles/run.invoker"`. Args: - input_types (type, sequence(type) or "row"): + input_types (type, sequence(type)): For scalar user defined function it should be the input type or sequence of input types. For row processing user defined function, - literal "row" should be specified. + type `Series` should be specified. output_type (type): Data type of the output in the user defined function. dataset (str, Optional): diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index ce301033d3..b78e331fa8 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1686,6 +1686,7 @@ def float_parser(row): # (BQ -> BigFrames -> BQ -> GCF -> BQ -> BigFrames) w.r.t. their # expected values in BQ bq_result = bf_df["num"].to_pandas() + bq_result.name = None pandas.testing.assert_series_equal(bq_result, bf_result) finally: # clean up the gcp assets created for the remote function From ea7e28e7a1b9e3cc3d15688fff6dffcab4e8194b Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 8 May 2024 00:48:45 +0000 Subject: [PATCH 26/29] fix docstring for args --- bigframes/functions/remote_function.py | 2 +- bigframes/session/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 99567b4fc2..9fbfda11b4 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -796,7 +796,7 @@ def remote_function( `$ gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:CONNECTION_SERVICE_ACCOUNT_ID" --role="roles/run.invoker"`. Args: - input_types (type, sequence(type)): + input_types (type or sequence(type)): For scalar user defined function it should be the input type or sequence of input types. For row processing user defined function, type `Series` should be specified. diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 0914f73570..63780cdcab 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1512,7 +1512,7 @@ def remote_function( `$ gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:CONNECTION_SERVICE_ACCOUNT_ID" --role="roles/run.invoker"`. Args: - input_types (type, sequence(type)): + input_types (type or sequence(type)): For scalar user defined function it should be the input type or sequence of input types. For row processing user defined function, type `Series` should be specified. From 14602f8a66f61028bc0029da83398c404a276492 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 8 May 2024 23:40:44 +0000 Subject: [PATCH 27/29] move more low level string logic in sql module --- bigframes/core/blocks.py | 12 ++++--- bigframes/core/sql.py | 40 +++++++++++++++++++--- bigframes/functions/remote_function.py | 2 -- tests/system/large/test_remote_function.py | 3 +- 4 files changed, 44 insertions(+), 13 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 7e587a96fb..ed905a705b 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2072,32 +2072,34 @@ def _get_rows_as_json_values(self) -> Block: # names of the columns to serialize for the row column_names = list(self.index_columns) + [col for col in self.column_labels] - column_names_csv = ", ".join([repr(repr(col)) for col in column_names]) + column_names_csv = sql.csv([repr(col) for col in column_names], quoted=True) # index columns count index_columns_count = len(self.index_columns) # column references to form the array of values for the row - column_references_csv = ", ".join( + column_references_csv = sql.csv( [sql.cast_as_string(col) for col in self.expr.column_ids] ) # types of the columns to serialize for the row column_types = list(self.index.dtypes) + list(self.dtypes) - column_types_csv = ", ".join([sql.quote(typ) for typ in column_types]) + column_types_csv = sql.csv([str(typ) for typ in column_types], quoted=True) # row dtype to use for deserializing the row as pandas series pandas_row_dtype = bigframes.dtypes.lcd_type(*column_types) if pandas_row_dtype is None: pandas_row_dtype = "object" - pandas_row_dtype = sql.quote(pandas_row_dtype) + pandas_row_dtype = sql.quote(str(pandas_row_dtype)) # create a json column representing row through SQL manipulation row_json_column_name = guid.generate_guid() select_columns = ( [ordering_column_name] + list(self.index_columns) + [row_json_column_name] ) - select_columns_csv = ", ".join(select_columns) + select_columns_csv = sql.csv( + [sql.column_reference(col) for col in select_columns] + ) json_sql = f"""\ With T0 AS ( {textwrap.indent(expr_sql, " ")} diff --git a/bigframes/core/sql.py b/bigframes/core/sql.py index 3e165a67f5..31ee5f9064 100644 --- a/bigframes/core/sql.py +++ b/bigframes/core/sql.py @@ -16,14 +16,44 @@ Utility functions for SQL construction. """ +from typing import Iterable -def quote(value): - """Return a quoted string of the input.""" - return f'"{value}"' +def quote(value: str): + """Return quoted input string.""" + # Let's use repr which also escapes any special characters + # + # >>> for val in [ + # ... "123", + # ... "str with no special chars", + # ... "str with special chars.,'\"/\\" + # ... ]: + # ... print(f"{val} -> {repr(val)}") + # ... + # 123 -> '123' + # str with no special chars -> 'str with no special chars' + # str with special chars.,'"/\ -> 'str with special chars.,\'"/\\' -def cast_as_string(column_name): + return repr(value) + + +def column_reference(column_name: str): + """Return a string representing column reference in a SQL.""" + + return f"`{column_name}`" + + +def cast_as_string(column_name: str): """Return a string representing string casting of a column.""" - return f"CAST(`{column_name}` AS STRING)" + return f"CAST({column_reference(column_name)} AS STRING)" + + +def csv(values: Iterable[str], quoted=False): + """Return a string of comma separated values.""" + + if quoted: + values = [quote(val) for val in values] + + return ", ".join(values) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 9fbfda11b4..6e42ca9f48 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -24,8 +24,6 @@ import sys import tempfile import textwrap - -# TODO(shobs): import typing module and use its classes through namespapace.* from typing import List, NamedTuple, Optional, Sequence, TYPE_CHECKING, Union import warnings diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index b78e331fa8..1c4bfac78b 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1545,12 +1545,13 @@ def analyze(row): { "2": [1, 2, 3], 2: [1.5, 3.75, 5], + "name, [with. special'- chars\")/\\": [10, 20, 30], (3, 4): ["pq", "rs", "tu"], (5.0, "six", 7): [8, 9, 10], 'raise Exception("hacked!")': [11, 12, 13], } ), - id="mixed-type-column-names", + id="all-kinds-of-column-names", ), pytest.param( pandas.DataFrame( From 3bf5bee8199b592f77a842916b3d6816d5af22a5 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Fri, 10 May 2024 02:06:02 +0000 Subject: [PATCH 28/29] raise explicit error when a column name cannot be supported --- bigframes/core/blocks.py | 19 ++++++++++++++++--- tests/system/large/test_remote_function.py | 10 ++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index ed905a705b..aeb88e7a64 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2070,9 +2070,22 @@ def _get_rows_as_json_values(self) -> Block: expr = expr.promote_offsets(ordering_column_name) expr_sql = self.session._to_sql(expr) - # names of the columns to serialize for the row - column_names = list(self.index_columns) + [col for col in self.column_labels] - column_names_csv = sql.csv([repr(col) for col in column_names], quoted=True) + # Names of the columns to serialize for the row. + # We will use the repr-eval pattern to serialize a value here and + # deserialize in the cloud function. Let's make sure that would work. + column_names = [] + for col in list(self.index_columns) + [col for col in self.column_labels]: + serialized_column_name = repr(col) + try: + eval(serialized_column_name) + except Exception: + raise NameError( + f"Column name type '{type(col).__name__}' is not supported for row serialization." + " Please consider using a name for which eval(repr(name)) works." + ) + + column_names.append(serialized_column_name) + column_names_csv = sql.csv(column_names, quoted=True) # index columns count index_columns_count = len(self.index_columns) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 1c4bfac78b..e086903d03 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import datetime import importlib.util import inspect import math # must keep this at top level to test udf referring global import @@ -1586,6 +1587,15 @@ def analyze(row): ), id="column-multiindex", ), + pytest.param( + pandas.DataFrame( + { + datetime.now(): [1, 2, 3], + } + ), + id="column-name-not-supported", + marks=pytest.mark.xfail(raises=NameError), + ), ], ) @pytest.mark.flaky(retries=2, delay=120) From b5f32323ec3aa72ec5dc0aa6dbb579737e63cc76 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Fri, 10 May 2024 17:22:15 +0000 Subject: [PATCH 29/29] keep literal_eval check on the serialization side to match deserialization --- bigframes/core/blocks.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index aeb88e7a64..136a51808c 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -21,6 +21,7 @@ from __future__ import annotations +import ast import dataclasses import functools import itertools @@ -2077,11 +2078,11 @@ def _get_rows_as_json_values(self) -> Block: for col in list(self.index_columns) + [col for col in self.column_labels]: serialized_column_name = repr(col) try: - eval(serialized_column_name) + ast.literal_eval(serialized_column_name) except Exception: raise NameError( f"Column name type '{type(col).__name__}' is not supported for row serialization." - " Please consider using a name for which eval(repr(name)) works." + " Please consider using a name for which literal_eval(repr(name)) works." ) column_names.append(serialized_column_name)