diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index bc352322ef..c1878b6c31 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -331,6 +331,7 @@ def create_cloud_function( max_instance_count=None, is_row_processor=False, vpc_connector=None, + memory_mib=1024, ): """Create a cloud function from the given user defined function. @@ -410,7 +411,8 @@ def create_cloud_function( self._cloud_function_docker_repository ) function.service_config = functions_v2.ServiceConfig() - function.service_config.available_memory = "1024M" + if memory_mib is not None: + function.service_config.available_memory = f"{memory_mib}Mi" if timeout_seconds is not None: if timeout_seconds > 1200: raise ValueError( @@ -473,6 +475,7 @@ def provision_bq_remote_function( cloud_function_max_instance_count, is_row_processor, cloud_function_vpc_connector, + cloud_function_memory_mib, ): """Provision a BigQuery remote function.""" # If reuse of any existing function with the same name (indicated by the @@ -504,6 +507,7 @@ def provision_bq_remote_function( max_instance_count=cloud_function_max_instance_count, is_row_processor=is_row_processor, vpc_connector=cloud_function_vpc_connector, + memory_mib=cloud_function_memory_mib, ) else: logger.info(f"Cloud function {cloud_function_name} already exists.") @@ -667,6 +671,7 @@ def remote_function( cloud_function_timeout: Optional[int] = 600, cloud_function_max_instances: Optional[int] = None, cloud_function_vpc_connector: Optional[str] = None, + cloud_function_memory_mib: Optional[int] = 1024, ): """Decorator to turn a user defined function into a BigQuery remote function. @@ -817,6 +822,15 @@ def remote_function( function. This is useful if your code needs access to data or service(s) that are on a VPC network. See for more details https://cloud.google.com/functions/docs/networking/connecting-vpc. + cloud_function_memory_mib (int, Optional): + The amounts of memory (in mebibytes) to allocate for the cloud + function (2nd gen) created. This also dictates a corresponding + amount of allocated CPU for the function. By default a memory of + 1024 MiB is set for the cloud functions created to support + BigQuery DataFrames remote function. If you want to let the + default memory of cloud functions be allocated, pass `None`. See + for more details + https://cloud.google.com/functions/docs/configuring/memory. """ # Some defaults may be used from the session if not provided otherwise import bigframes.exceptions as bf_exceptions @@ -1027,6 +1041,7 @@ def try_delattr(attr): cloud_function_max_instance_count=cloud_function_max_instances, is_row_processor=is_row_processor, cloud_function_vpc_connector=cloud_function_vpc_connector, + cloud_function_memory_mib=cloud_function_memory_mib, ) # TODO: Move ibis logic to compiler step diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index f6f9aec800..faba0f3aa3 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -666,6 +666,7 @@ def remote_function( cloud_function_timeout: Optional[int] = 600, cloud_function_max_instances: Optional[int] = None, cloud_function_vpc_connector: Optional[str] = None, + cloud_function_memory_mib: Optional[int] = 1024, ): return global_session.with_default_session( bigframes.session.Session.remote_function, @@ -683,6 +684,7 @@ def remote_function( cloud_function_timeout=cloud_function_timeout, cloud_function_max_instances=cloud_function_max_instances, cloud_function_vpc_connector=cloud_function_vpc_connector, + cloud_function_memory_mib=cloud_function_memory_mib, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index a4c926de72..3aba3581aa 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1537,6 +1537,7 @@ def remote_function( cloud_function_timeout: Optional[int] = 600, cloud_function_max_instances: Optional[int] = None, cloud_function_vpc_connector: Optional[str] = None, + cloud_function_memory_mib: Optional[int] = 1024, ): """Decorator to turn a user defined function into a BigQuery remote function. Check out the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes. @@ -1670,6 +1671,15 @@ def remote_function( function. This is useful if your code needs access to data or service(s) that are on a VPC network. See for more details https://cloud.google.com/functions/docs/networking/connecting-vpc. + cloud_function_memory_mib (int, Optional): + The amounts of memory (in mebibytes) to allocate for the cloud + function (2nd gen) created. This also dictates a corresponding + amount of allocated CPU for the function. By default a memory of + 1024 MiB is set for the cloud functions created to support + BigQuery DataFrames remote function. If you want to let the + default memory of cloud functions be allocated, pass `None`. See + for more details + https://cloud.google.com/functions/docs/configuring/memory. Returns: callable: A remote function object pointing to the cloud assets created in the background to support the remote execution. The cloud assets can be @@ -1695,6 +1705,7 @@ def remote_function( cloud_function_timeout=cloud_function_timeout, cloud_function_max_instances=cloud_function_max_instances, cloud_function_vpc_connector=cloud_function_vpc_connector, + cloud_function_memory_mib=cloud_function_memory_mib, ) def read_gbq_function( diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 3f4bfea97e..ef8b9811df 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1800,3 +1800,63 @@ def float_parser(row): cleanup_remote_function_assets( session.bqclient, session.cloudfunctionsclient, float_parser_remote ) + + +@pytest.mark.parametrize( + ("memory_mib_args", "expected_memory"), + [ + pytest.param({}, "1024Mi", id="no-set"), + pytest.param({"cloud_function_memory_mib": None}, "256M", id="set-None"), + pytest.param({"cloud_function_memory_mib": 128}, "128Mi", id="set-128"), + pytest.param({"cloud_function_memory_mib": 1024}, "1024Mi", id="set-1024"), + pytest.param({"cloud_function_memory_mib": 4096}, "4096Mi", id="set-4096"), + pytest.param({"cloud_function_memory_mib": 32768}, "32768Mi", id="set-32768"), + ], +) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_gcf_memory( + session, scalars_dfs, memory_mib_args, expected_memory +): + try: + + def square(x: int) -> int: + return x * x + + square_remote = session.remote_function(reuse=False, **memory_mib_args)(square) + + # Assert that the GCF is created with the intended memory + gcf = session.cloudfunctionsclient.get_function( + name=square_remote.bigframes_cloud_function + ) + assert gcf.service_config.available_memory == expected_memory + + scalars_df, scalars_pandas_df = scalars_dfs + + bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() + pd_result = scalars_pandas_df["int64_too"].apply(square) + + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, square_remote + ) + + +@pytest.mark.parametrize( + ("memory_mib",), + [ + pytest.param(127, id="127-too-low"), + pytest.param(32769, id="set-32769-too-high"), + ], +) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_gcf_memory_unsupported(session, memory_mib): + with pytest.raises( + google.api_core.exceptions.InvalidArgument, + match="Invalid value specified for container memory", + ): + + @session.remote_function(reuse=False, cloud_function_memory_mib=memory_mib) + def square(x: int) -> int: + return x * x