From 05fdd0fdd4778528d1e1a0040942966af967fda9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CVarun?= Date: Tue, 27 Sep 2022 14:31:47 +0800 Subject: [PATCH 1/2] adding_billing_project_in_config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Varun --- .../feast/infra/offline_stores/bigquery.py | 96 ++++++++++--------- 1 file changed, 52 insertions(+), 44 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index c570b8d38ab..96e1f9d2aa5 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -19,7 +19,7 @@ import pandas as pd import pyarrow import pyarrow.parquet -from pydantic import StrictStr +from pydantic import StrictStr, validator from pydantic.typing import Literal from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed @@ -83,7 +83,8 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel): project_id: Optional[StrictStr] = None """ (optional) GCP project name used for the BigQuery offline store """ - + billing_project_id: Optional[StrictStr] = None + """ (optional) GCP project name used to run the bigquery jobs at """ location: Optional[StrictStr] = None """ (optional) GCP location name used for the BigQuery offline store. Examples of location names include ``US``, ``EU``, ``us-central1``, ``us-west4``. @@ -94,6 +95,14 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel): gcs_staging_location: Optional[str] = None """ (optional) GCS location used for offloading BigQuery results as parquet files.""" + @validator("billing_project_id") + def project_id_exists(cls, v, values, **kwargs): + if v and not values["project_id"]: + raise ValueError( + "please specify project_id if billing_project_id is specified" + ) + return v + class BigQueryOfflineStore(OfflineStore): @staticmethod @@ -122,10 +131,11 @@ def pull_latest_from_table_or_query( timestamps.append(created_timestamp_column) timestamp_desc_string = " DESC, ".join(timestamps) + " DESC" field_string = ", ".join(join_key_columns + feature_name_columns + timestamps) - + project_id = ( + config.offline_store.billing_project_id or config.offline_store.project_id + ) client = _get_bigquery_client( - project=config.offline_store.project_id, - location=config.offline_store.location, + project=project_id, location=config.offline_store.location, ) query = f""" SELECT @@ -142,10 +152,7 @@ def pull_latest_from_table_or_query( # When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized return BigQueryRetrievalJob( - query=query, - client=client, - config=config, - full_feature_names=False, + query=query, client=client, config=config, full_feature_names=False, ) @staticmethod @@ -162,10 +169,11 @@ def pull_all_from_table_or_query( assert isinstance(config.offline_store, BigQueryOfflineStoreConfig) assert isinstance(data_source, BigQuerySource) from_expression = data_source.get_table_query_string() - + project_id = ( + config.offline_store.billing_project_id or config.offline_store.project_id + ) client = _get_bigquery_client( - project=config.offline_store.project_id, - location=config.offline_store.location, + project=project_id, location=config.offline_store.location, ) field_string = ", ".join( join_key_columns + feature_name_columns + [timestamp_field] @@ -176,10 +184,7 @@ def pull_all_from_table_or_query( WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}') """ return BigQueryRetrievalJob( - query=query, - client=client, - config=config, - full_feature_names=False, + query=query, client=client, config=config, full_feature_names=False, ) @staticmethod @@ -197,42 +202,39 @@ def get_historical_features( assert isinstance(config.offline_store, BigQueryOfflineStoreConfig) for fv in feature_views: assert isinstance(fv.batch_source, BigQuerySource) - + project_id = ( + config.offline_store.billing_project_id or config.offline_store.project_id + ) client = _get_bigquery_client( - project=config.offline_store.project_id, - location=config.offline_store.location, + project=project_id, location=config.offline_store.location, ) assert isinstance(config.offline_store, BigQueryOfflineStoreConfig) - + if config.offline_store.billing_project_id: + dataset_project = str(config.offline_store.project_id) + else: + dataset_project = client.project table_reference = _get_table_reference_for_new_entity( client, - client.project, + dataset_project, config.offline_store.dataset, config.offline_store.location, ) - entity_schema = _get_entity_schema( - client=client, - entity_df=entity_df, - ) + entity_schema = _get_entity_schema(client=client, entity_df=entity_df,) - entity_df_event_timestamp_col = ( - offline_utils.infer_event_timestamp_from_entity_df(entity_schema) + entity_df_event_timestamp_col = offline_utils.infer_event_timestamp_from_entity_df( + entity_schema ) entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range( - entity_df, - entity_df_event_timestamp_col, - client, + entity_df, entity_df_event_timestamp_col, client, ) @contextlib.contextmanager def query_generator() -> Iterator[str]: _upload_entity_df( - client=client, - table_name=table_reference, - entity_df=entity_df, + client=client, table_name=table_reference, entity_df=entity_df, ) expected_join_keys = offline_utils.get_expected_join_keys( @@ -295,10 +297,11 @@ def write_logged_features( ): destination = logging_config.destination assert isinstance(destination, BigQueryLoggingDestination) - + project_id = ( + config.offline_store.billing_project_id or config.offline_store.project_id + ) client = _get_bigquery_client( - project=config.offline_store.project_id, - location=config.offline_store.location, + project=project_id, location=config.offline_store.location, ) job_config = bigquery.LoadJobConfig( @@ -353,10 +356,11 @@ def offline_write_batch( if table.schema != pa_schema: table = table.cast(pa_schema) - + project_id = ( + config.offline_store.billing_project_id or config.offline_store.project_id + ) client = _get_bigquery_client( - project=config.offline_store.project_id, - location=config.offline_store.location, + project=project_id, location=config.offline_store.location, ) job_config = bigquery.LoadJobConfig( @@ -451,7 +455,10 @@ def to_bigquery( if not job_config: today = date.today().strftime("%Y%m%d") rand_id = str(uuid.uuid4())[:7] - path = f"{self.client.project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}" + if self.config.offline_store.billing_project_id: + path = f"{self.config.offline_store.project_id}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}" + else: + path = f"{self.client.project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}" job_config = bigquery.QueryJobConfig(destination=path) if not job_config.dry_run and self.on_demand_feature_views: @@ -525,7 +532,10 @@ def to_remote_storage(self) -> List[str]: bucket: str prefix: str - storage_client = StorageClient(project=self.client.project) + if self.config.offline_store.billing_project_id: + storage_client = StorageClient(project=self.config.offline_store.project_id) + else: + storage_client = StorageClient(project=self.client.project) bucket, prefix = self._gcs_path[len("gs://") :].split("/", 1) prefix = prefix.rsplit("/", 1)[0] if prefix.startswith("/"): @@ -608,9 +618,7 @@ def _get_table_reference_for_new_entity( def _upload_entity_df( - client: Client, - table_name: str, - entity_df: Union[pd.DataFrame, str], + client: Client, table_name: str, entity_df: Union[pd.DataFrame, str], ) -> Table: """Uploads a Pandas entity dataframe into a BigQuery table and returns the resulting table""" job: Union[bigquery.job.query.QueryJob, bigquery.job.load.LoadJob] From 9bdbab9922df0c5c49a956efb09ecf799ec6098d Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Fri, 30 Sep 2022 16:07:25 -0500 Subject: [PATCH 2/2] Fix lint Signed-off-by: Danny Chiao --- .../feast/infra/offline_stores/bigquery.py | 46 +++++++++++++------ 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 96e1f9d2aa5..bf010c82aaf 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -135,7 +135,8 @@ def pull_latest_from_table_or_query( config.offline_store.billing_project_id or config.offline_store.project_id ) client = _get_bigquery_client( - project=project_id, location=config.offline_store.location, + project=project_id, + location=config.offline_store.location, ) query = f""" SELECT @@ -152,7 +153,10 @@ def pull_latest_from_table_or_query( # When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized return BigQueryRetrievalJob( - query=query, client=client, config=config, full_feature_names=False, + query=query, + client=client, + config=config, + full_feature_names=False, ) @staticmethod @@ -173,7 +177,8 @@ def pull_all_from_table_or_query( config.offline_store.billing_project_id or config.offline_store.project_id ) client = _get_bigquery_client( - project=project_id, location=config.offline_store.location, + project=project_id, + location=config.offline_store.location, ) field_string = ", ".join( join_key_columns + feature_name_columns + [timestamp_field] @@ -184,7 +189,10 @@ def pull_all_from_table_or_query( WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}') """ return BigQueryRetrievalJob( - query=query, client=client, config=config, full_feature_names=False, + query=query, + client=client, + config=config, + full_feature_names=False, ) @staticmethod @@ -206,7 +214,8 @@ def get_historical_features( config.offline_store.billing_project_id or config.offline_store.project_id ) client = _get_bigquery_client( - project=project_id, location=config.offline_store.location, + project=project_id, + location=config.offline_store.location, ) assert isinstance(config.offline_store, BigQueryOfflineStoreConfig) @@ -221,20 +230,27 @@ def get_historical_features( config.offline_store.location, ) - entity_schema = _get_entity_schema(client=client, entity_df=entity_df,) + entity_schema = _get_entity_schema( + client=client, + entity_df=entity_df, + ) - entity_df_event_timestamp_col = offline_utils.infer_event_timestamp_from_entity_df( - entity_schema + entity_df_event_timestamp_col = ( + offline_utils.infer_event_timestamp_from_entity_df(entity_schema) ) entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range( - entity_df, entity_df_event_timestamp_col, client, + entity_df, + entity_df_event_timestamp_col, + client, ) @contextlib.contextmanager def query_generator() -> Iterator[str]: _upload_entity_df( - client=client, table_name=table_reference, entity_df=entity_df, + client=client, + table_name=table_reference, + entity_df=entity_df, ) expected_join_keys = offline_utils.get_expected_join_keys( @@ -301,7 +317,8 @@ def write_logged_features( config.offline_store.billing_project_id or config.offline_store.project_id ) client = _get_bigquery_client( - project=project_id, location=config.offline_store.location, + project=project_id, + location=config.offline_store.location, ) job_config = bigquery.LoadJobConfig( @@ -360,7 +377,8 @@ def offline_write_batch( config.offline_store.billing_project_id or config.offline_store.project_id ) client = _get_bigquery_client( - project=project_id, location=config.offline_store.location, + project=project_id, + location=config.offline_store.location, ) job_config = bigquery.LoadJobConfig( @@ -618,7 +636,9 @@ def _get_table_reference_for_new_entity( def _upload_entity_df( - client: Client, table_name: str, entity_df: Union[pd.DataFrame, str], + client: Client, + table_name: str, + entity_df: Union[pd.DataFrame, str], ) -> Table: """Uploads a Pandas entity dataframe into a BigQuery table and returns the resulting table""" job: Union[bigquery.job.query.QueryJob, bigquery.job.load.LoadJob]