Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Documentation

* `BigQuery DataFrames source code (GitHub) <https://github.com/googleapis/python-bigquery-dataframes>`_
* `BigQuery DataFrames sample notebooks <https://github.com/googleapis/python-bigquery-dataframes/tree/main/notebooks>`_
* `BigQuery DataFrames API reference <https://cloud.google.com/python/docs/reference/bigframes/latest>`_
* `BigQuery DataFrames API reference <https://cloud.google.com/python/docs/reference/bigframes/latest/summary_overview>`_
* `BigQuery documentation <https://cloud.google.com/bigquery/docs/>`_


Expand Down
35 changes: 13 additions & 22 deletions bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import random
import shutil
import string
import subprocess
import sys
import tempfile
import textwrap
Expand Down Expand Up @@ -87,19 +86,6 @@ def _get_hash(def_, package_requirements=None):
return hashlib.md5(def_repr).hexdigest()


def _run_system_command(command):
program = subprocess.Popen(
[command], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
)
stdout, stderr = program.communicate()
exit_code = program.wait()
if exit_code:
raise RuntimeError(
f"Command: {command}\nOutput: {stdout.decode()}\nError: {stderr.decode()}"
f"{constants.FEEDBACK_LINK}"
)


def routine_ref_to_string_for_query(routine_ref: bigquery.RoutineReference) -> str:
return f"`{routine_ref.project}.{routine_ref.dataset_id}`.{routine_ref.routine_id}"

Expand Down Expand Up @@ -281,6 +267,8 @@ def generate_cloud_function_main_code(self, def_, dir):
code_template = textwrap.dedent(
"""\
import cloudpickle
import functions_framework
from flask import jsonify
import json

# original udf code is in {udf_code_file}
Expand All @@ -289,14 +277,17 @@ def generate_cloud_function_main_code(self, def_, dir):
udf = cloudpickle.load(f)

def {handler_func_name}(request):
request_json = request.get_json(silent=True)
calls = request_json["calls"]
replies = []
for call in calls:
reply = udf(*call)
replies.append(reply)
return_json = json.dumps({{"replies" : replies}})
return return_json
try:
request_json = request.get_json(silent=True)
calls = request_json["calls"]
replies = []
for call in calls:
reply = udf(*call)
replies.append(reply)
return_json = json.dumps({{"replies" : replies}})
return return_json
except Exception as e:
return jsonify( {{ "errorMessage": str(e) }} ), 400
"""
)

Expand Down
18 changes: 16 additions & 2 deletions bigframes/ml/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ class PaLM2TextEmbeddingGenerator(base.Predictor):
The model for text embedding. “textembedding-gecko” returns model embeddings for text inputs.
"textembedding-gecko-multilingual" returns model embeddings for text inputs which support over 100 languages
Default to "textembedding-gecko".
version (str or None):
Model version. Accepted values are "001", "002", "003", "latest" etc. Will use the default version if unset.
See https://cloud.google.com/vertex-ai/docs/generative-ai/learn/model-versioning for details.
session (bigframes.Session or None):
BQ session to create the model. If None, use the global default session.
connection_name (str or None):
Expand All @@ -279,10 +282,12 @@ def __init__(
model_name: Literal[
"textembedding-gecko", "textembedding-gecko-multilingual"
] = "textembedding-gecko",
version: Optional[str] = None,
session: Optional[bigframes.Session] = None,
connection_name: Optional[str] = None,
):
self.model_name = model_name
self.version = version
self.session = session or bpd.get_global_session()
self._bq_connection_manager = clients.BqConnectionManager(
self.session.bqconnectionclient, self.session.resourcemanagerclient
Expand Down Expand Up @@ -321,8 +326,11 @@ def _create_bqml_model(self):
f"Model name {self.model_name} is not supported. We only support {', '.join(_EMBEDDING_GENERATOR_ENDPOINTS)}."
)

endpoint = (
self.model_name + "@" + self.version if self.version else self.model_name
)
options = {
"endpoint": self.model_name,
"endpoint": endpoint,
}
return self._bqml_model_factory.create_remote_model(
session=self.session, connection_name=self.connection_name, options=options
Expand All @@ -342,8 +350,14 @@ def _from_bq(
model_connection = model._properties["remoteModelInfo"]["connection"]
model_endpoint = bqml_endpoint.split("/")[-1]

model_name, version = utils.parse_model_endpoint(model_endpoint)

embedding_generator_model = cls(
session=session, model_name=model_endpoint, connection_name=model_connection
session=session,
# str to literals
model_name=model_name, # type: ignore
version=version,
connection_name=model_connection,
)
embedding_generator_model._bqml_model = core.BqmlModel(session, model)
return embedding_generator_model
Expand Down
7 changes: 5 additions & 2 deletions bigframes/ml/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
linear_model,
llm,
pipeline,
utils,
)

_BQML_MODEL_TYPE_MAPPING = MappingProxyType(
Expand Down Expand Up @@ -106,8 +107,10 @@ def _model_from_bq(session: bigframes.Session, bq_model: bigquery.Model):
):
# Parse the remote model endpoint
bqml_endpoint = bq_model._properties["remoteModelInfo"]["endpoint"]
endpoint_model = bqml_endpoint.split("/")[-1]
return _BQML_ENDPOINT_TYPE_MAPPING[endpoint_model]._from_bq( # type: ignore
model_endpoint = bqml_endpoint.split("/")[-1]
model_name, _ = utils.parse_model_endpoint(model_endpoint)

return _BQML_ENDPOINT_TYPE_MAPPING[model_name]._from_bq( # type: ignore
session=session, model=bq_model
)

Expand Down
16 changes: 16 additions & 0 deletions bigframes/ml/metrics/pairwise.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,19 @@ def paired_manhattan_distance(
paired_manhattan_distance.__doc__ = inspect.getdoc(
vendored_metrics_pairwise.paired_manhattan_distance
)


def paired_euclidean_distances(
X: Union[bpd.DataFrame, bpd.Series], Y: Union[bpd.DataFrame, bpd.Series]
) -> bpd.DataFrame:
X, Y = utils.convert_to_dataframe(X, Y)
if len(X.columns) != 1 or len(Y.columns) != 1:
raise ValueError("Inputs X and Y can only contain 1 column.")

base_bqml = core.BaseBqml(session=X._session)
return base_bqml.distance(X, Y, type="EUCLIDEAN", name="euclidean_distance")


paired_euclidean_distances.__doc__ = inspect.getdoc(
vendored_metrics_pairwise.paired_euclidean_distances
)
15 changes: 14 additions & 1 deletion bigframes/ml/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import typing
from typing import Iterable, Union
from typing import Iterable, Optional, Union

import bigframes.constants as constants
from bigframes.core import blocks
Expand Down Expand Up @@ -56,3 +56,16 @@ def _convert_to_series(frame: ArrayType) -> bpd.Series:
raise ValueError(
f"Unsupported type {type(frame)} to convert to Series. {constants.FEEDBACK_LINK}"
)


def parse_model_endpoint(model_endpoint: str) -> tuple[str, Optional[str]]:
"""Parse model endpoint string to model_name and version."""
model_name = model_endpoint
version = None

at_idx = model_endpoint.find("@")
if at_idx != -1:
version = model_endpoint[at_idx + 1 :]
model_name = model_endpoint[:at_idx]

return model_name, version
100 changes: 83 additions & 17 deletions samples/snippets/bqml_getting_started_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


def test_bqml_getting_started(random_model_id):
your_model_id = random_model_id
your_model_id = random_model_id # for example: bqml_tutorial.sample_model

# [START bigquery_dataframes_bqml_getting_started_tutorial]
from bigframes.ml.linear_model import LogisticRegression
Expand All @@ -29,8 +29,8 @@ def test_bqml_getting_started(random_model_id):
df = bpd.read_gbq_table(
"bigquery-public-data.google_analytics_sample.ga_sessions_*",
filters=[
("_table_suffix", ">=", "20170701"),
("_table_suffix", "<=", "20170801"),
("_table_suffix", ">=", "20160801"),
("_table_suffix", "<=", "20170630"),
],
)

Expand Down Expand Up @@ -68,7 +68,7 @@ def test_bqml_getting_started(random_model_id):
features = bpd.DataFrame(
{
"os": operating_system,
"isMobile": is_mobile,
"is_mobile": is_mobile,
"country": country,
"pageviews": pageviews,
}
Expand Down Expand Up @@ -96,9 +96,7 @@ def test_bqml_getting_started(random_model_id):
your_model_id, # For example: "bqml_tutorial.sample_model",
)

# The WHERE clause — _TABLE_SUFFIX BETWEEN '20170701' AND '20170801' —
# limits the number of tables scanned by the query. The date range scanned is
# July 1, 2017 to August 1, 2017. This is the data you're using to evaluate the predictive performance
# July 1, 2017 to August 1, 2017 is the data you're using to evaluate the predictive performance
# of the model. It was collected in the month immediately following the time
# period spanned by the training data.

Expand All @@ -109,6 +107,7 @@ def test_bqml_getting_started(random_model_id):
("_table_suffix", "<=", "20170801"),
],
)

transactions = df["totals"].struct.field("transactions")
label = transactions.notnull().map({True: 1, False: 0})
operating_system = df["device"].struct.field("operatingSystem")
Expand All @@ -119,7 +118,7 @@ def test_bqml_getting_started(random_model_id):
features = bpd.DataFrame(
{
"os": operating_system,
"isMobile": is_mobile,
"is_mobile": is_mobile,
"country": country,
"pageviews": pageviews,
}
Expand Down Expand Up @@ -155,7 +154,14 @@ def test_bqml_getting_started(random_model_id):
# [1 rows x 6 columns]
# [END bigquery_dataframes_bqml_getting_started_tutorial_evaluate]

# [START bigquery_dataframes_bqml_getting_started_tutorial_predict]
# [START bigquery_dataframes_bqml_getting_started_tutorial_predict_by_country]
import bigframes.pandas as bpd

# Select model you'll use for training. `read_gbq_model` loads model data from a
# BigQuery, but you could also use the `model` object from the previous steps.
model = bpd.read_gbq_model(
your_model_id, # For example: "bqml_tutorial.sample_model",
)
df = bpd.read_gbq_table(
"bigquery-public-data.google_analytics_sample.ga_sessions_*",
filters=[
Expand All @@ -172,24 +178,84 @@ def test_bqml_getting_started(random_model_id):
features = bpd.DataFrame(
{
"os": operating_system,
"isMobile": is_mobile,
"is_mobile": is_mobile,
"country": country,
"pageviews": pageviews,
}
)
# Use Logistic Regression predict method to, find more information here in
# [BigFrames](/bigframes/latest/bigframes.ml.linear_model.LogisticRegression#bigframes_ml_linear_model_LogisticRegression_predict)
# [BigFrames](https://cloud.google.com/python/docs/reference/bigframes/latest/bigframes.ml.linear_model.LogisticRegression#bigframes_ml_linear_model_LogisticRegression_predict)

predictions = model.predict(features)
countries = predictions.groupby(["country"])[["predicted_transactions"]].sum()
total_predicted_purchases = predictions.groupby(["country"])[
["predicted_label"]
].sum()
total_predicted_purchases.sort_values(ascending=False).head(10)

countries.sort_values(ascending=False).head(10)
# country # total_predicted_purchases
# United States 220
# Taiwan 8
# Canada 7
# India 2
# Japan 2
# Turkey 2
# Australia 1
# Brazil 1
# Germany 1
# Guyana 1
# Name: predicted_label, dtype: Int64

predictions = model.predict(features)
# [END bigquery_dataframes_bqml_getting_started_tutorial_predict_by_country]

total_predicted_purchases = predictions.groupby(["country"])[
["predicted_transactions"]
# [START bigquery_dataframes_bqml_getting_started_tutorial_predict_by_visitor]

model = bpd.read_gbq_model(
your_model_id, # For example: "bqml_tutorial.sample_model",
)
df = bpd.read_gbq_table(
"bigquery-public-data.google_analytics_sample.ga_sessions_*",
filters=[
("_table_suffix", ">=", "20170701"),
("_table_suffix", "<=", "20170801"),
],
)

operating_system = df["device"].struct.field("operatingSystem")
operating_system = operating_system.fillna("")
is_mobile = df["device"].struct.field("isMobile")
country = df["geoNetwork"].struct.field("country").fillna("")
pageviews = df["totals"].struct.field("pageviews").fillna(0)
full_visitor_id = df["fullVisitorId"]

features = bpd.DataFrame(
{
"os": operating_system,
"is_mobile": is_mobile,
"country": country,
"pageviews": pageviews,
"fullVisitorId": full_visitor_id,
}
)

predictions = model.predict(features)
total_predicted_purchases = predictions.groupby(["fullVisitorId"])[
["predicted_label"]
].sum()

total_predicted_purchases.sort_values(ascending=False).head(10)

# [END bigquery_dataframes_bqml_getting_started_tutorial_predict]
# fullVisitorId # total_predicted_purchases
# 9417857471295131045 4
# 0376394056092189113 2
# 0456807427403774085 2
# 057693500927581077 2
# 112288330928895942 2
# 1280993661204347450 2
# 2105122376016897629 2
# 2158257269735455737 2
# 2969418676126258798 2
# 489038402765684003 2
# Name: predicted_label, dtype: Int64


# [END bigquery_dataframes_bqml_getting_started_tutorial_predict_by_visitor]
24 changes: 23 additions & 1 deletion tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import tempfile
import textwrap

from google.api_core.exceptions import NotFound, ResourceExhausted
from google.api_core.exceptions import BadRequest, NotFound, ResourceExhausted
from google.cloud import bigquery, functions_v2
import pandas
import pytest
Expand Down Expand Up @@ -1214,6 +1214,28 @@ def square(x):
)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_runtime_error(session, scalars_dfs, dataset_id):
try:

@session.remote_function([int], int, dataset=dataset_id)
def square(x):
return x * x

scalars_df, _ = scalars_dfs

with pytest.raises(
BadRequest, match="400.*errorMessage.*unsupported operand type"
):
# int64_col has nulls which should cause error in square
scalars_df["int64_col"].apply(square).to_pandas()
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, square
)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_anonymous_dataset(session, scalars_dfs):
try:
Expand Down
Loading