From a4c6a4a8ebb170b82c278f4d6d156b0776233832 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Thu, 20 Jun 2024 23:39:52 +0000 Subject: [PATCH 1/4] feat: expose gcf memory param in `remote_function` --- bigframes/functions/remote_function.py | 17 +++++- bigframes/pandas/__init__.py | 2 + bigframes/session/__init__.py | 11 ++++ tests/system/large/test_remote_function.py | 60 ++++++++++++++++++++++ 4 files changed, 89 insertions(+), 1 deletion(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 920dc7c039..fab41038a2 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -315,6 +315,7 @@ def create_cloud_function( max_instance_count=None, is_row_processor=False, vpc_connector=None, + memory_mib=1024, ): """Create a cloud function from the given user defined function. @@ -394,7 +395,8 @@ def create_cloud_function( self._cloud_function_docker_repository ) function.service_config = functions_v2.ServiceConfig() - function.service_config.available_memory = "1024M" + if memory_mib is not None: + function.service_config.available_memory = f"{memory_mib}Mi" if timeout_seconds is not None: if timeout_seconds > 1200: raise ValueError( @@ -457,6 +459,7 @@ def provision_bq_remote_function( cloud_function_max_instance_count, is_row_processor, cloud_function_vpc_connector, + cloud_function_memory_mib, ): """Provision a BigQuery remote function.""" # If reuse of any existing function with the same name (indicated by the @@ -487,6 +490,7 @@ def provision_bq_remote_function( max_instance_count=cloud_function_max_instance_count, is_row_processor=is_row_processor, vpc_connector=cloud_function_vpc_connector, + memory_mib=cloud_function_memory_mib, ) else: logger.info(f"Cloud function {cloud_function_name} already exists.") @@ -650,6 +654,7 @@ def remote_function( cloud_function_timeout: Optional[int] = 600, cloud_function_max_instances: Optional[int] = None, cloud_function_vpc_connector: Optional[str] = None, + cloud_function_memory_mib: Optional[int] = 1024, ): """Decorator to turn a user defined function into a BigQuery remote function. @@ -800,6 +805,15 @@ def remote_function( function. This is useful if your code needs access to data or service(s) that are on a VPC network. See for more details https://cloud.google.com/functions/docs/networking/connecting-vpc. + cloud_function_memory_mib (int, Optional): + The amounts of memory (in mebibytes) to allocate for the cloud + function (2nd gen) created. This also dictates a corresponding + amount of allocated CPU for the function. By default a memory of + 1024 MiB is set for the cloud functions created to support + BigQuery DataFrames remote function. If you want to let the + default memory of cloud functions be allocated, pass `None`. See + for more details + https://cloud.google.com/functions/docs/configuring/memory. """ # Some defaults may be used from the session if not provided otherwise import bigframes.exceptions as bf_exceptions @@ -1010,6 +1024,7 @@ def try_delattr(attr): cloud_function_max_instance_count=cloud_function_max_instances, is_row_processor=is_row_processor, cloud_function_vpc_connector=cloud_function_vpc_connector, + cloud_function_memory_mib=cloud_function_memory_mib, ) # TODO: Move ibis logic to compiler step diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index f6f9aec800..faba0f3aa3 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -666,6 +666,7 @@ def remote_function( cloud_function_timeout: Optional[int] = 600, cloud_function_max_instances: Optional[int] = None, cloud_function_vpc_connector: Optional[str] = None, + cloud_function_memory_mib: Optional[int] = 1024, ): return global_session.with_default_session( bigframes.session.Session.remote_function, @@ -683,6 +684,7 @@ def remote_function( cloud_function_timeout=cloud_function_timeout, cloud_function_max_instances=cloud_function_max_instances, cloud_function_vpc_connector=cloud_function_vpc_connector, + cloud_function_memory_mib=cloud_function_memory_mib, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index b0b2a3c418..5d04f76cdc 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1534,6 +1534,7 @@ def remote_function( cloud_function_timeout: Optional[int] = 600, cloud_function_max_instances: Optional[int] = None, cloud_function_vpc_connector: Optional[str] = None, + cloud_function_memory_mib: Optional[int] = 1024, ): """Decorator to turn a user defined function into a BigQuery remote function. Check out the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes. @@ -1665,6 +1666,15 @@ def remote_function( function. This is useful if your code needs access to data or service(s) that are on a VPC network. See for more details https://cloud.google.com/functions/docs/networking/connecting-vpc. + cloud_function_memory_mib (int, Optional): + The amounts of memory (in mebibytes) to allocate for the cloud + function (2nd gen) created. This also dictates a corresponding + amount of allocated CPU for the function. By default a memory of + 1024 MiB is set for the cloud functions created to support + BigQuery DataFrames remote function. If you want to let the + default memory of cloud functions be allocated, pass `None`. See + for more details + https://cloud.google.com/functions/docs/configuring/memory. Returns: callable: A remote function object pointing to the cloud assets created in the background to support the remote execution. The cloud assets can be @@ -1690,6 +1700,7 @@ def remote_function( cloud_function_timeout=cloud_function_timeout, cloud_function_max_instances=cloud_function_max_instances, cloud_function_vpc_connector=cloud_function_vpc_connector, + cloud_function_memory_mib=cloud_function_memory_mib, ) def read_gbq_function( diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 6bfc9f0da3..9fb6243651 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1800,3 +1800,63 @@ def float_parser(row): cleanup_remote_function_assets( session.bqclient, session.cloudfunctionsclient, float_parser_remote ) + + +@pytest.mark.parametrize( + ("memory_mib_args", "expected_memory"), + [ + pytest.param({}, "1024Mi", id="no-set"), + pytest.param({"cloud_function_memory_mib": None}, "256M", id="set-None"), + pytest.param({"cloud_function_memory_mib": 128}, "128Mi", id="set-128"), + pytest.param({"cloud_function_memory_mib": 1024}, "1024Mi", id="set-1024"), + pytest.param({"cloud_function_memory_mib": 4096}, "4096Mi", id="set-4096"), + pytest.param({"cloud_function_memory_mib": 32768}, "32768Mi", id="set-32768"), + ], +) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_gcf_memory( + session, scalars_dfs, memory_mib_args, expected_memory +): + try: + + def square(x: int) -> int: + return x * x + + square_remote = session.remote_function(reuse=False, **memory_mib_args)(square) + + # Assert that the GCF is created with the intended memory + gcf = session.cloudfunctionsclient.get_function( + name=square_remote.bigframes_cloud_function + ) + assert gcf.service_config.available_memory == expected_memory + + scalars_df, scalars_pandas_df = scalars_dfs + + bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() + pd_result = scalars_pandas_df["int64_too"].apply(square) + + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, square_remote + ) + + +@pytest.mark.parametrize( + ("memory_mib",), + [ + pytest.param(127, id="127-too-low"), + pytest.param(32769, id="set-32769-too-high"), + ], +) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_gcf_memory_unsupported(session, memory_mib): + with pytest.raises( + google.api_core.exceptions.InvalidArgument, + match="Invalid value specified for container memory", + ): + + @session.remote_function(reuse=False, cloud_function_memory_mib=memory_mib) + def square(x: int) -> int: + return x * x From a0e778d8b9b3d86a19974124c28bf607795f2886 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 25 Jun 2024 23:53:46 +0000 Subject: [PATCH 2/4] add the reported broken usecase as a test --- tests/system/large/test_remote_function.py | 80 ++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 9fb6243651..1ad1de6fdb 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1860,3 +1860,83 @@ def test_remote_function_gcf_memory_unsupported(session, memory_mib): @session.remote_function(reuse=False, cloud_function_memory_mib=memory_mib) def square(x: int) -> int: return x * x + + +def test_remote_function_gcf_oom_prevented_with_more_memory(session): + # This test fails without the explicitly set high memory to run the udf code + try: + + @session.remote_function( + packages=["argostranslate", "google-cloud-storage"], + cloud_function_memory_mib=2048, + ) + def translate_to_english(review: str, language: str) -> str: + import json + import tempfile + + import argostranslate.package + import argostranslate.translate + import google.cloud.storage + + # Load pretrained models from GCS. + storage_client = google.cloud.storage.Client() + + tmpdir = tempfile.TemporaryDirectory() + index_path = f"{tmpdir.name}/index.json" + with open(index_path, "wb") as index_file: + storage_client.download_blob_to_file( + "gs://bigframes-samples/argos-translate/index.json", index_file + ) + + with open(index_path, "r") as index_file: + index = json.load(index_file) + + from_code = language + to_code = "en" + + package_to_install = next( + filter( + lambda x: x["from_code"] == from_code and x["to_code"] == to_code, + index, + ) + ) + + package_gcs = package_to_install["gcs_link"][0] + package_filename = package_gcs.split("/")[-1] + package_path = f"{tmpdir.name}/{package_filename}" + + with open(package_path, "wb") as package_file: + storage_client.download_blob_to_file(package_gcs, package_file) + + # Translate using the model file. + argostranslate.package.install_from_path(package_path) + return argostranslate.translate.translate(review, from_code, to_code) + + # Now run the remote function on bigframes dataframe + pd_df = pandas.DataFrame( + [ + ["Das ist ein großartiger Film!", "de"], + ["¡Esta es una película genial!", "es"], + ] + * 5, + columns=["review_text", "review_language"], + ) + bf_df = session.read_pandas(pd_df) + + pd_result = pandas.Series( + ["This is a great movie!", "This is a great movie!"] * 5 + ) + bf_result = ( + bf_df["review_text"] + .combine(bf_df["review_language"], translate_to_english) + .to_pandas() + ) + + pandas.testing.assert_series_equal( + bf_result, pd_result, check_dtype=False, check_index_type=False + ) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, translate_to_english + ) From 7daa34591b3168d59498b41365bd1b47f3acef01 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 26 Jun 2024 05:51:58 +0000 Subject: [PATCH 3/4] fix mypy failure --- tests/system/large/test_remote_function.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 1834f9979a..570de7baf7 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1874,9 +1874,9 @@ def translate_to_english(review: str, language: str) -> str: import json import tempfile - import argostranslate.package - import argostranslate.translate - import google.cloud.storage + import argostranslate.package # type: ignore + import argostranslate.translate # type: ignore + import google.cloud.storage # type: ignore # Load pretrained models from GCS. storage_client = google.cloud.storage.Client() From c9916e8ccd79633539fc96871c180c84b63655b1 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Thu, 27 Jun 2024 01:20:50 +0000 Subject: [PATCH 4/4] revert test that is prone to timing out during deployment --- tests/system/large/test_remote_function.py | 80 ---------------------- 1 file changed, 80 deletions(-) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 570de7baf7..ef8b9811df 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1860,83 +1860,3 @@ def test_remote_function_gcf_memory_unsupported(session, memory_mib): @session.remote_function(reuse=False, cloud_function_memory_mib=memory_mib) def square(x: int) -> int: return x * x - - -def test_remote_function_gcf_oom_prevented_with_more_memory(session): - # This test fails without the explicitly set high memory to run the udf code - try: - - @session.remote_function( - packages=["argostranslate", "google-cloud-storage"], - cloud_function_memory_mib=2048, - ) - def translate_to_english(review: str, language: str) -> str: - import json - import tempfile - - import argostranslate.package # type: ignore - import argostranslate.translate # type: ignore - import google.cloud.storage # type: ignore - - # Load pretrained models from GCS. - storage_client = google.cloud.storage.Client() - - tmpdir = tempfile.TemporaryDirectory() - index_path = f"{tmpdir.name}/index.json" - with open(index_path, "wb") as index_file: - storage_client.download_blob_to_file( - "gs://bigframes-samples/argos-translate/index.json", index_file - ) - - with open(index_path, "r") as index_file: - index = json.load(index_file) - - from_code = language - to_code = "en" - - package_to_install = next( - filter( - lambda x: x["from_code"] == from_code and x["to_code"] == to_code, - index, - ) - ) - - package_gcs = package_to_install["gcs_link"][0] - package_filename = package_gcs.split("/")[-1] - package_path = f"{tmpdir.name}/{package_filename}" - - with open(package_path, "wb") as package_file: - storage_client.download_blob_to_file(package_gcs, package_file) - - # Translate using the model file. - argostranslate.package.install_from_path(package_path) - return argostranslate.translate.translate(review, from_code, to_code) - - # Now run the remote function on bigframes dataframe - pd_df = pandas.DataFrame( - [ - ["Das ist ein großartiger Film!", "de"], - ["¡Esta es una película genial!", "es"], - ] - * 5, - columns=["review_text", "review_language"], - ) - bf_df = session.read_pandas(pd_df) - - pd_result = pandas.Series( - ["This is a great movie!", "This is a great movie!"] * 5 - ) - bf_result = ( - bf_df["review_text"] - .combine(bf_df["review_language"], translate_to_english) - .to_pandas() - ) - - pandas.testing.assert_series_equal( - bf_result, pd_result, check_dtype=False, check_index_type=False - ) - finally: - # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, translate_to_english - )