diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 2b2803b649..58b8515418 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -21,11 +21,13 @@ from __future__ import annotations +import ast import dataclasses import functools import itertools import os import random +import textwrap import typing from typing import Iterable, List, Literal, Mapping, Optional, Sequence, Tuple, Union import warnings @@ -44,8 +46,8 @@ import bigframes.core.join_def as join_defs import bigframes.core.ordering as ordering import bigframes.core.schema as bf_schema +import bigframes.core.sql as sql import bigframes.core.tree_properties as tree_properties -import bigframes.core.utils import bigframes.core.utils as utils import bigframes.core.window_spec as window_specs import bigframes.dtypes @@ -1437,9 +1439,7 @@ def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]: ) def add_prefix(self, prefix: str, axis: str | int | None = None) -> Block: - axis_number = bigframes.core.utils.get_axis_number( - "rows" if (axis is None) else axis - ) + axis_number = utils.get_axis_number("rows" if (axis is None) else axis) if axis_number == 0: expr = self._expr for index_col in self._index_columns: @@ -1460,9 +1460,7 @@ def add_prefix(self, prefix: str, axis: str | int | None = None) -> Block: return self.rename(columns=lambda label: f"{prefix}{label}") def add_suffix(self, suffix: str, axis: str | int | None = None) -> Block: - axis_number = bigframes.core.utils.get_axis_number( - "rows" if (axis is None) else axis - ) + axis_number = utils.get_axis_number("rows" if (axis is None) else axis) if axis_number == 0: expr = self._expr for index_col in self._index_columns: @@ -2072,6 +2070,95 @@ def _is_monotonic( self._stats_cache[column_name].update({op_name: result}) return result + def _get_rows_as_json_values(self) -> Block: + # We want to preserve any ordering currently present before turning to + # direct SQL manipulation. We will restore the ordering when we rebuild + # expression. + # TODO(shobs): Replace direct SQL manipulation by structured expression + # manipulation + ordering_column_name = guid.generate_guid() + expr = self.session._cache_with_offsets(self.expr) + expr = expr.promote_offsets(ordering_column_name) + expr_sql = self.session._to_sql(expr) + + # Names of the columns to serialize for the row. + # We will use the repr-eval pattern to serialize a value here and + # deserialize in the cloud function. Let's make sure that would work. + column_names = [] + for col in list(self.index_columns) + [col for col in self.column_labels]: + serialized_column_name = repr(col) + try: + ast.literal_eval(serialized_column_name) + except Exception: + raise NameError( + f"Column name type '{type(col).__name__}' is not supported for row serialization." + " Please consider using a name for which literal_eval(repr(name)) works." + ) + + column_names.append(serialized_column_name) + column_names_csv = sql.csv(column_names, quoted=True) + + # index columns count + index_columns_count = len(self.index_columns) + + # column references to form the array of values for the row + column_references_csv = sql.csv( + [sql.cast_as_string(col) for col in self.expr.column_ids] + ) + + # types of the columns to serialize for the row + column_types = list(self.index.dtypes) + list(self.dtypes) + column_types_csv = sql.csv([str(typ) for typ in column_types], quoted=True) + + # row dtype to use for deserializing the row as pandas series + pandas_row_dtype = bigframes.dtypes.lcd_type(*column_types) + if pandas_row_dtype is None: + pandas_row_dtype = "object" + pandas_row_dtype = sql.quote(str(pandas_row_dtype)) + + # create a json column representing row through SQL manipulation + row_json_column_name = guid.generate_guid() + select_columns = ( + [ordering_column_name] + list(self.index_columns) + [row_json_column_name] + ) + select_columns_csv = sql.csv( + [sql.column_reference(col) for col in select_columns] + ) + json_sql = f"""\ +With T0 AS ( +{textwrap.indent(expr_sql, " ")} +), +T1 AS ( + SELECT *, + JSON_OBJECT( + "names", [{column_names_csv}], + "types", [{column_types_csv}], + "values", [{column_references_csv}], + "indexlength", {index_columns_count}, + "dtype", {pandas_row_dtype} + ) AS {row_json_column_name} FROM T0 +) +SELECT {select_columns_csv} FROM T1 +""" + ibis_table = self.session.ibis_client.sql(json_sql) + order_for_ibis_table = ordering.ExpressionOrdering.from_offset_col( + ordering_column_name + ) + expr = core.ArrayValue.from_ibis( + self.session, + ibis_table, + [ibis_table[col] for col in select_columns if col != ordering_column_name], + hidden_ordering_columns=[ibis_table[ordering_column_name]], + ordering=order_for_ibis_table, + ) + block = Block( + expr, + index_columns=self.index_columns, + column_labels=[row_json_column_name], + index_labels=self._index_labels, + ) + return block + class BlockIndexProperties: """Accessor for the index-related block properties.""" diff --git a/bigframes/core/sql.py b/bigframes/core/sql.py new file mode 100644 index 0000000000..31ee5f9064 --- /dev/null +++ b/bigframes/core/sql.py @@ -0,0 +1,59 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Utility functions for SQL construction. +""" + +from typing import Iterable + + +def quote(value: str): + """Return quoted input string.""" + + # Let's use repr which also escapes any special characters + # + # >>> for val in [ + # ... "123", + # ... "str with no special chars", + # ... "str with special chars.,'\"/\\" + # ... ]: + # ... print(f"{val} -> {repr(val)}") + # ... + # 123 -> '123' + # str with no special chars -> 'str with no special chars' + # str with special chars.,'"/\ -> 'str with special chars.,\'"/\\' + + return repr(value) + + +def column_reference(column_name: str): + """Return a string representing column reference in a SQL.""" + + return f"`{column_name}`" + + +def cast_as_string(column_name: str): + """Return a string representing string casting of a column.""" + + return f"CAST({column_reference(column_name)} AS STRING)" + + +def csv(values: Iterable[str], quoted=False): + """Return a string of comma separated values.""" + + if quoted: + values = [quote(val) for val in values] + + return ", ".join(values) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 5be28acf53..d3fd39afa7 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -34,6 +34,7 @@ Tuple, Union, ) +import warnings import bigframes_vendored.pandas.core.frame as vendored_pandas_frame import bigframes_vendored.pandas.pandas._typing as vendored_pandas_typing @@ -61,6 +62,7 @@ import bigframes.core.window import bigframes.core.window_spec as window_spec import bigframes.dtypes +import bigframes.exceptions import bigframes.formatting_helpers as formatter import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops @@ -3308,7 +3310,59 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame: ops.RemoteFunctionOp(func=func, apply_on_null=(na_action is None)) ) - def apply(self, func, *, args: typing.Tuple = (), **kwargs): + def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): + if utils.get_axis_number(axis) == 1: + warnings.warn( + "axis=1 scenario is in preview.", + category=bigframes.exceptions.PreviewWarning, + ) + + # Early check whether the dataframe dtypes are currently supported + # in the remote function + # NOTE: Keep in sync with the value converters used in the gcf code + # generated in generate_cloud_function_main_code in remote_function.py + remote_function_supported_dtypes = ( + bigframes.dtypes.INT_DTYPE, + bigframes.dtypes.FLOAT_DTYPE, + bigframes.dtypes.BOOL_DTYPE, + bigframes.dtypes.STRING_DTYPE, + ) + supported_dtypes_types = tuple( + type(dtype) for dtype in remote_function_supported_dtypes + ) + supported_dtypes_hints = tuple( + str(dtype) for dtype in remote_function_supported_dtypes + ) + + for dtype in self.dtypes: + if not isinstance(dtype, supported_dtypes_types): + raise NotImplementedError( + f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1." + f" Supported dtypes are {supported_dtypes_hints}." + ) + + # Check if the function is a remote function + if not hasattr(func, "bigframes_remote_function"): + raise ValueError("For axis=1 a remote function must be used.") + + # Serialize the rows as json values + block = self._get_block() + rows_as_json_series = bigframes.series.Series( + block._get_rows_as_json_values() + ) + + # Apply the function + result_series = rows_as_json_series._apply_unary_op( + ops.RemoteFunctionOp(func=func, apply_on_null=True) + ) + result_series.name = None + + # Return Series with materialized result so that any error in the remote + # function is caught early + materialized_series = result_series.cache() + return materialized_series + + # Per-column apply results = {name: func(col, *args, **kwargs) for name, col in self.items()} if all( [ diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index 3ca6d8e1af..eae021b4cd 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -33,3 +33,7 @@ class CleanupFailedWarning(Warning): class DefaultIndexWarning(Warning): """Default index may cause unexpected costs.""" + + +class PreviewWarning(Warning): + """The feature is in preview.""" diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 9d826d0fa1..6e42ca9f48 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -25,8 +25,10 @@ import tempfile import textwrap from typing import List, NamedTuple, Optional, Sequence, TYPE_CHECKING, Union +import warnings import ibis +import pandas import requests if TYPE_CHECKING: @@ -262,7 +264,7 @@ def generate_udf_code(self, def_, dir): return udf_code_file_name, udf_bytecode_file_name - def generate_cloud_function_main_code(self, def_, dir): + def generate_cloud_function_main_code(self, def_, dir, is_row_processor=False): """Get main.py code for the cloud function for the given user defined function.""" # Pickle the udf with all its dependencies @@ -285,38 +287,120 @@ def generate_cloud_function_main_code(self, def_, dir): # ... # } # https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#input_format - code_template = textwrap.dedent( - """\ - import cloudpickle - import functions_framework - from flask import jsonify - import json - - # original udf code is in {udf_code_file} - # serialized udf code is in {udf_bytecode_file} - with open("{udf_bytecode_file}", "rb") as f: - udf = cloudpickle.load(f) - - def {handler_func_name}(request): - try: - request_json = request.get_json(silent=True) - calls = request_json["calls"] - replies = [] - for call in calls: - reply = udf(*call) - replies.append(reply) - return_json = json.dumps({{"replies" : replies}}) - return return_json - except Exception as e: - return jsonify( {{ "errorMessage": str(e) }} ), 400 - """ - ) - - code = code_template.format( - udf_code_file=udf_code_file, - udf_bytecode_file=udf_bytecode_file, - handler_func_name=handler_func_name, - ) + code = """\ +import cloudpickle +import functions_framework +from flask import jsonify +import json +""" + if is_row_processor: + code += """\ +import ast +import math +import pandas as pd + +def get_pd_series(row): + row_json = json.loads(row) + col_names = row_json["names"] + col_types = row_json["types"] + col_values = row_json["values"] + index_length = row_json["indexlength"] + dtype = row_json["dtype"] + + # At this point we are assuming that col_names, col_types and col_values are + # arrays of the same length, representing column names, types and values for + # one row of data + + # column names are not necessarily strings + # they are serialized as repr(name) at source + evaluated_col_names = [] + for col_name in col_names: + try: + col_name = ast.literal_eval(col_name) + except Exception as ex: + raise NameError(f"Failed to evaluate column name from '{col_name}': {ex}") + evaluated_col_names.append(col_name) + col_names = evaluated_col_names + + # Supported converters for pandas to python types + value_converters = { + "boolean": lambda val: val == "true", + "Int64": int, + "Float64": float, + "string": str, + } + + def convert_value(value, value_type): + value_converter = value_converters.get(value_type) + if value_converter is None: + raise ValueError(f"Don't know how to handle type '{value_type}'") + if value is None: + return None + return value_converter(value) + + index_values = [ + pd.Series([convert_value(col_values[i], col_types[i])], dtype=col_types[i])[0] + for i in range(index_length) + ] + + data_col_names = col_names[index_length:] + data_col_types = col_types[index_length:] + data_col_values = col_values[index_length:] + data_col_values = [ + pd.Series([convert_value(a, data_col_types[i])], dtype=data_col_types[i])[0] + for i, a in enumerate(data_col_values) + ] + + row_index = index_values[0] if len(index_values) == 1 else tuple(index_values) + row_series = pd.Series(data_col_values, index=data_col_names, name=row_index, dtype=dtype) + return row_series +""" + code += f"""\ + +# original udf code is in {udf_code_file} +# serialized udf code is in {udf_bytecode_file} +with open("{udf_bytecode_file}", "rb") as f: + udf = cloudpickle.load(f) + +def {handler_func_name}(request): + try: + request_json = request.get_json(silent=True) + calls = request_json["calls"] + replies = [] + for call in calls: +""" + + if is_row_processor: + code += """\ + reply = udf(get_pd_series(call[0])) + if isinstance(reply, float) and (math.isnan(reply) or math.isinf(reply)): + # json serialization of the special float values (nan, inf, -inf) + # is not in strict compliance of the JSON specification + # https://docs.python.org/3/library/json.html#basic-usage. + # Let's convert them to a quoted string representation ("NaN", + # "Infinity", "-Infinity" respectively) which is handled by + # BigQuery + reply = json.dumps(reply) + elif pd.isna(reply): + # Pandas N/A values are not json serializable, so use a python + # equivalent instead + reply = None + elif hasattr(reply, "item"): + # Numpy types are not json serializable, so use its Python + # value instead + reply = reply.item() +""" + else: + code += """\ + reply = udf(*call) +""" + code += """\ + replies.append(reply) + return_json = json.dumps({"replies" : replies}) + return return_json + except Exception as e: + return jsonify( { "errorMessage": str(e) } ), 400 +""" main_py = os.path.join(dir, "main.py") with open(main_py, "w") as f: @@ -325,11 +409,17 @@ def {handler_func_name}(request): return handler_func_name - def generate_cloud_function_code(self, def_, dir, package_requirements=None): + def generate_cloud_function_code( + self, def_, dir, package_requirements=None, is_row_processor=False + ): """Generate the cloud function code for a given user defined function.""" # requirements.txt requirements = ["cloudpickle >= 2.1.0"] + if is_row_processor: + # bigframes remote function will send an entire row of data as json, + # which would be converted to a pandas series and processed + requirements.append(f"pandas=={pandas.__version__}") if package_requirements: requirements.extend(package_requirements) requirements = sorted(requirements) @@ -338,7 +428,9 @@ def generate_cloud_function_code(self, def_, dir, package_requirements=None): f.write("\n".join(requirements)) # main.py - entry_point = self.generate_cloud_function_main_code(def_, dir) + entry_point = self.generate_cloud_function_main_code( + def_, dir, is_row_processor + ) return entry_point def create_cloud_function( @@ -348,13 +440,14 @@ def create_cloud_function( package_requirements=None, timeout_seconds=600, max_instance_count=None, + is_row_processor=False, ): """Create a cloud function from the given user defined function.""" # Build and deploy folder structure containing cloud function with tempfile.TemporaryDirectory() as dir: entry_point = self.generate_cloud_function_code( - def_, dir, package_requirements + def_, dir, package_requirements, is_row_processor ) archive_path = shutil.make_archive(dir, "zip", dir) @@ -474,6 +567,7 @@ def provision_bq_remote_function( max_batching_rows, cloud_function_timeout, cloud_function_max_instance_count, + is_row_processor, ): """Provision a BigQuery remote function.""" # If reuse of any existing function with the same name (indicated by the @@ -500,6 +594,7 @@ def provision_bq_remote_function( package_requirements, cloud_function_timeout, cloud_function_max_instance_count, + is_row_processor, ) else: logger.info(f"Cloud function {cloud_function_name} already exists.") @@ -700,8 +795,9 @@ def remote_function( Args: input_types (type or sequence(type)): - Input data type, or sequence of input data types in the user - defined function. + For scalar user defined function it should be the input type or + sequence of input types. For row processing user defined function, + type `Series` should be specified. output_type (type): Data type of the output in the user defined function. session (bigframes.Session, Optional): @@ -800,9 +896,25 @@ def remote_function( function's default setting applies. For more details see https://cloud.google.com/functions/docs/configuring/max-instances """ - if isinstance(input_types, type): + is_row_processor = False + + import bigframes.series + + if input_types == bigframes.series.Series: + warnings.warn( + "input_types=Series scenario is in preview.", + stacklevel=1, + category=bigframes.exceptions.PreviewWarning, + ) + + # we will model the row as a json serialized string containing the data + # and the metadata representing the row + input_types = [str] + is_row_processor = True + elif isinstance(input_types, type): input_types = [input_types] + # Some defaults may be used from the session if not provided otherwise import bigframes.pandas as bpd session = session or bpd.get_global_session() @@ -928,6 +1040,7 @@ def wrapper(f): max_batching_rows, cloud_function_timeout, cloud_function_max_instances, + is_row_processor, ) # TODO: Move ibis logic to compiler step diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 5f70fd77f9..473fc4f098 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1416,6 +1416,9 @@ def remote_function( """Decorator to turn a user defined function into a BigQuery remote function. Check out the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes. + .. note:: + ``input_types=Series`` scenario is in preview. + .. note:: Please make sure following is setup before using this API: @@ -1455,8 +1458,9 @@ def remote_function( Args: input_types (type or sequence(type)): - Input data type, or sequence of input data types in the user - defined function. + For scalar user defined function it should be the input type or + sequence of input types. For row processing user defined function, + type `Series` should be specified. output_type (type): Data type of the output in the user defined function. dataset (str, Optional): diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index eb2a0884fe..e086903d03 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import datetime import importlib.util import inspect import math # must keep this at top level to test udf referring global import @@ -28,6 +29,7 @@ import bigframes from bigframes.functions.remote_function import get_cloud_function_name +import bigframes.series from tests.system.utils import ( assert_pandas_df_equal, delete_cloud_function, @@ -1454,3 +1456,251 @@ def square(x): cleanup_remote_function_assets( session.bqclient, session.cloudfunctionsclient, square_remote ) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1(session, scalars_dfs): + columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] + scalars_df, scalars_pandas_df = scalars_dfs + try: + + def serialize_row(row): + custom = { + "name": row.name, + "index": [idx for idx in row.index], + "values": [ + val.item() if hasattr(val, "item") else val for val in row.values + ], + } + + return str( + { + "default": row.to_json(), + "split": row.to_json(orient="split"), + "records": row.to_json(orient="records"), + "index": row.to_json(orient="index"), + "table": row.to_json(orient="table"), + "custom": custom, + } + ) + + serialize_row_remote = session.remote_function( + bigframes.series.Series, str, reuse=False + )(serialize_row) + + bf_result = scalars_df[columns].apply(serialize_row_remote, axis=1).to_pandas() + pd_result = scalars_pandas_df[columns].apply(serialize_row, axis=1) + + # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, serialize_row_remote + ) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1_aggregates(session, scalars_dfs): + columns = ["int64_col", "int64_too", "float64_col"] + scalars_df, scalars_pandas_df = scalars_dfs + + try: + + def analyze(row): + return str( + { + "dtype": row.dtype, + "count": row.count(), + "min": row.max(), + "max": row.max(), + "mean": row.mean(), + "std": row.std(), + "var": row.var(), + } + ) + + analyze_remote = session.remote_function(bigframes.series.Series, str)(analyze) + + bf_result = ( + scalars_df[columns].dropna().apply(analyze_remote, axis=1).to_pandas() + ) + pd_result = scalars_pandas_df[columns].dropna().apply(analyze, axis=1) + + # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, analyze_remote + ) + + +@pytest.mark.parametrize( + ("pd_df"), + [ + pytest.param( + pandas.DataFrame( + { + "2": [1, 2, 3], + 2: [1.5, 3.75, 5], + "name, [with. special'- chars\")/\\": [10, 20, 30], + (3, 4): ["pq", "rs", "tu"], + (5.0, "six", 7): [8, 9, 10], + 'raise Exception("hacked!")': [11, 12, 13], + } + ), + id="all-kinds-of-column-names", + ), + pytest.param( + pandas.DataFrame( + { + "x": [1, 2, 3], + "y": [1.5, 3.75, 5], + "z": ["pq", "rs", "tu"], + }, + index=pandas.MultiIndex.from_tuples( + [ + ("a", 100), + ("a", 200), + ("b", 300), + ] + ), + ), + id="multiindex", + ), + pytest.param( + pandas.DataFrame( + [ + [10, 1.5, "pq"], + [20, 3.75, "rs"], + [30, 8.0, "tu"], + ], + columns=pandas.MultiIndex.from_arrays( + [ + ["first", "last_two", "last_two"], + [1, 2, 3], + ] + ), + ), + id="column-multiindex", + ), + pytest.param( + pandas.DataFrame( + { + datetime.now(): [1, 2, 3], + } + ), + id="column-name-not-supported", + marks=pytest.mark.xfail(raises=NameError), + ), + ], +) +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1_complex(session, pd_df): + bf_df = session.read_pandas(pd_df) + + try: + + def serialize_row(row): + custom = { + "name": row.name, + "index": [idx for idx in row.index], + "values": [ + val.item() if hasattr(val, "item") else val for val in row.values + ], + } + return str( + { + "default": row.to_json(), + "split": row.to_json(orient="split"), + "records": row.to_json(orient="records"), + "index": row.to_json(orient="index"), + "custom": custom, + } + ) + + serialize_row_remote = session.remote_function( + bigframes.series.Series, str, reuse=False + )(serialize_row) + + bf_result = bf_df.apply(serialize_row_remote, axis=1).to_pandas() + pd_result = pd_df.apply(serialize_row, axis=1) + + # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + # + # bf_result.index[0].dtype is 'string[pyarrow]' while + # pd_result.index[0].dtype is 'object', ignore this mismatch by using + # check_index_type=False. + pandas.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, serialize_row_remote + ) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1_na_nan_inf(session): + """This test is for special cases of float values, to make sure any (nan, + inf, -inf) produced by user code is honored. + """ + bf_df = session.read_gbq( + """\ +SELECT "1" AS text, 1 AS num +UNION ALL +SELECT "2.5" AS text, 2.5 AS num +UNION ALL +SELECT "nan" AS text, IEEE_DIVIDE(0, 0) AS num +UNION ALL +SELECT "inf" AS text, IEEE_DIVIDE(1, 0) AS num +UNION ALL +SELECT "-inf" AS text, IEEE_DIVIDE(-1, 0) AS num +UNION ALL +SELECT "numpy nan" AS text, IEEE_DIVIDE(0, 0) AS num +UNION ALL +SELECT "pandas na" AS text, NULL AS num + """ + ) + + pd_df = bf_df.to_pandas() + + try: + + def float_parser(row): + import numpy as mynp + import pandas as mypd + + if row["text"] == "pandas na": + return mypd.NA + if row["text"] == "numpy nan": + return mynp.nan + return float(row["text"]) + + float_parser_remote = session.remote_function( + bigframes.series.Series, float, reuse=False + )(float_parser) + + pd_result = pd_df.apply(float_parser, axis=1) + bf_result = bf_df.apply(float_parser_remote, axis=1).to_pandas() + + # bf_result.dtype is 'Float64' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + + # Let's also assert that the data is consistent in this round trip + # (BQ -> BigFrames -> BQ -> GCF -> BQ -> BigFrames) w.r.t. their + # expected values in BQ + bq_result = bf_df["num"].to_pandas() + bq_result.name = None + pandas.testing.assert_series_equal(bq_result, bf_result) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, float_parser_remote + ) diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index 106638cef3..9c60c821a7 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -12,12 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re + import google.api_core.exceptions from google.cloud import bigquery import pandas as pd import pytest import bigframes +import bigframes.exceptions from bigframes.functions import remote_function as rf from tests.system.utils import assert_pandas_df_equal @@ -685,3 +688,146 @@ def test_read_gbq_function_enforces_explicit_types(bigquery_client, dataset_id): rf.read_gbq_function( str(neither_type_specified.reference), bigquery_client=bigquery_client ) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1(session, scalars_dfs): + columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] + scalars_df, scalars_pandas_df = scalars_dfs + + def add_ints(row): + return row["int64_col"] + row["int64_too"] + + with pytest.warns( + bigframes.exceptions.PreviewWarning, + match="input_types=Series scenario is in preview.", + ): + add_ints_remote = session.remote_function(bigframes.series.Series, int)( + add_ints + ) + + with pytest.warns( + bigframes.exceptions.PreviewWarning, match="axis=1 scenario is in preview." + ): + bf_result = scalars_df[columns].apply(add_ints_remote, axis=1).to_pandas() + + pd_result = scalars_pandas_df[columns].apply(add_ints, axis=1) + + # bf_result.dtype is 'Int64' while pd_result.dtype is 'object', ignore this + # mismatch by using check_dtype=False. + # + # bf_result.to_numpy() produces an array of numpy.float64's + # (in system_prerelease tests), while pd_result.to_numpy() produces an + # array of ints, ignore this mismatch by using check_exact=False. + pd.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_exact=False + ) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1_ordering(session, scalars_dfs): + columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] + ordering_columns = ["bool_col", "int64_col"] + scalars_df, scalars_pandas_df = scalars_dfs + + def add_ints(row): + return row["int64_col"] + row["int64_too"] + + add_ints_remote = session.remote_function(bigframes.series.Series, int)(add_ints) + + bf_result = ( + scalars_df[columns] + .sort_values(ordering_columns) + .apply(add_ints_remote, axis=1) + .to_pandas() + ) + pd_result = ( + scalars_pandas_df[columns].sort_values(ordering_columns).apply(add_ints, axis=1) + ) + + # bf_result.dtype is 'Int64' while pd_result.dtype is 'object', ignore this + # mismatch by using check_dtype=False. + # + # bf_result.to_numpy() produces an array of numpy.float64's + # (in system_prerelease tests), while pd_result.to_numpy() produces an + # array of ints, ignore this mismatch by using check_exact=False. + pd.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_exact=False + ) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1_multiindex(session): + pd_df = pd.DataFrame( + {"x": [1, 2, 3], "y": [1.5, 3.75, 5], "z": ["pq", "rs", "tu"]}, + index=pd.MultiIndex.from_tuples([("a", 100), ("a", 200), ("b", 300)]), + ) + bf_df = session.read_pandas(pd_df) + + def add_numbers(row): + return row["x"] + row["y"] + + add_numbers_remote = session.remote_function(bigframes.series.Series, float)( + add_numbers + ) + + bf_result = bf_df.apply(add_numbers_remote, axis=1).to_pandas() + pd_result = pd_df.apply(add_numbers, axis=1) + + # bf_result.dtype is 'Float64' while pd_result.dtype is 'float64', ignore this + # mismatch by using check_dtype=False. + # + # bf_result.index[0].dtype is 'string[pyarrow]' while + # pd_result.index[0].dtype is 'object', ignore this mismatch by using + # check_index_type=False. + pd.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) + + +def test_df_apply_axis_1_unsupported_callable(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] + + def add_ints(row): + return row["int64_col"] + row["int64_too"] + + # pandas works + scalars_pandas_df.apply(add_ints, axis=1) + + with pytest.raises(ValueError, match="For axis=1 a remote function must be used."): + scalars_df[columns].apply(add_ints, axis=1) + + +@pytest.mark.parametrize( + ("column"), + [ + pytest.param("bytes_col"), + pytest.param("date_col"), + pytest.param("datetime_col"), + pytest.param("geography_col"), + pytest.param("numeric_col"), + pytest.param("time_col"), + pytest.param("timestamp_col"), + ], +) +def test_df_apply_axis_1_unsupported_dtype(scalars_dfs, column): + scalars_df, scalars_pandas_df = scalars_dfs + + # It doesn't matter if it is a remote function or not, the dtype check + # is done even before the function type check with axis=1 + def echo(row): + return row[column] + + # pandas works + scalars_pandas_df[[column]].apply(echo, axis=1) + + dtype = scalars_df[column].dtype + + with pytest.raises( + NotImplementedError, + match=re.escape( + f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1. Supported dtypes are ('Int64', 'Float64', 'boolean', 'string')." + ), + ): + scalars_df[[column]].apply(echo, axis=1) diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 4e17bca54d..31d5e88c7e 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -4200,12 +4200,16 @@ def merge( """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def apply(self, func, *, args=(), **kwargs): + def apply(self, func, *, axis=0, args=(), **kwargs): """Apply a function along an axis of the DataFrame. Objects passed to the function are Series objects whose index is - the DataFrame's index (``axis=0``) the final return type - is inferred from the return type of the applied function. + the DataFrame's index (``axis=0``) or the DataFrame's columns (``axis=1``). + The final return type is inferred from the return type of the applied + function. + + .. note:: + ``axis=1`` scenario is in preview. **Examples:** @@ -4230,9 +4234,28 @@ def apply(self, func, *, args=(), **kwargs): [2 rows x 2 columns] + You could apply a user defined function to every row of the DataFrame by + creating a remote function out of it, and using it with `axis=1`. + + >>> @bpd.remote_function(bpd.Series, int, reuse=False) + ... def foo(row): + ... result = 1 + ... result += row["col1"] + ... result += row["col2"]*row["col2"] + ... return result + + >>> df.apply(foo, axis=1) + 0 11 + 1 19 + dtype: Int64 + Args: func (function): Function to apply to each column or row. + axis ({index (0), columns (1)}): + Axis along which the function is applied. Specify 0 or 'index' + to apply function to each column. Specify 1 or 'columns' to + apply function to each row. args (tuple): Positional arguments to pass to `func` in addition to the array/series.