From d790a7465c52884297f95aabaf27c58a05d13e32 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 7 Jul 2022 21:24:03 -0700 Subject: [PATCH 01/10] feat: Add an experimental lambda-based materialization engine Signed-off-by: Achal Shah --- .../feature_servers/aws_lambda/Dockerfile | 1 - .../infra/materialization/lambda/Dockerfile | 25 +++ .../infra/materialization/lambda/__init__.py | 11 ++ .../feast/infra/materialization/lambda/app.py | 77 ++++++++ .../materialization/lambda/lambda_engine.py | 178 ++++++++++++++++++ .../feast/infra/passthrough_provider.py | 1 + 6 files changed, 292 insertions(+), 1 deletion(-) create mode 100644 sdk/python/feast/infra/materialization/lambda/Dockerfile create mode 100644 sdk/python/feast/infra/materialization/lambda/__init__.py create mode 100644 sdk/python/feast/infra/materialization/lambda/app.py create mode 100644 sdk/python/feast/infra/materialization/lambda/lambda_engine.py diff --git a/sdk/python/feast/infra/feature_servers/aws_lambda/Dockerfile b/sdk/python/feast/infra/feature_servers/aws_lambda/Dockerfile index 929227a8106..5f5038544c6 100644 --- a/sdk/python/feast/infra/feature_servers/aws_lambda/Dockerfile +++ b/sdk/python/feast/infra/feature_servers/aws_lambda/Dockerfile @@ -20,7 +20,6 @@ COPY README.md README.md # https://github.com/pypa/setuptools_scm#usage-from-docker # I think it also assumes that this dockerfile is being built from the root of the directory. RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir -e '.[aws,redis]' -RUN pip3 install -r sdk/python/feast/infra/feature_servers/aws_lambda/requirements.txt --target "${LAMBDA_TASK_ROOT}" # Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile) CMD [ "app.handler" ] diff --git a/sdk/python/feast/infra/materialization/lambda/Dockerfile b/sdk/python/feast/infra/materialization/lambda/Dockerfile new file mode 100644 index 00000000000..bbdb74bdfe1 --- /dev/null +++ b/sdk/python/feast/infra/materialization/lambda/Dockerfile @@ -0,0 +1,25 @@ +FROM public.ecr.aws/lambda/python:3.9 + +RUN yum install -y git + + +# Copy app handler code +COPY sdk/python/feast/infra/materialization/lambda/app.py ${LAMBDA_TASK_ROOT} + +# Copy necessary parts of the Feast codebase +COPY sdk/python sdk/python +COPY protos protos +COPY go go +COPY setup.py setup.py +COPY pyproject.toml pyproject.toml +COPY README.md README.md + +# Install Feast for AWS with Lambda dependencies +# We need this mount thingy because setuptools_scm needs access to the +# git dir to infer the version of feast we're installing. +# https://github.com/pypa/setuptools_scm#usage-from-docker +# I think it also assumes that this dockerfile is being built from the root of the directory. +RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir -e '.[aws,redis]' + +# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile) +CMD [ "app.handler" ] diff --git a/sdk/python/feast/infra/materialization/lambda/__init__.py b/sdk/python/feast/infra/materialization/lambda/__init__.py new file mode 100644 index 00000000000..d21505d91e6 --- /dev/null +++ b/sdk/python/feast/infra/materialization/lambda/__init__.py @@ -0,0 +1,11 @@ +from .lambda_engine import ( + LambdaMaterializationEngine, + LambdaMaterializationEngineConfig, + LambdaMaterializationJob, +) + +__all__ = [ + "LambdaMaterializationEngineConfig", + "LambdaMaterializationJob", + "LambdaMaterializationEngine", +] diff --git a/sdk/python/feast/infra/materialization/lambda/app.py b/sdk/python/feast/infra/materialization/lambda/app.py new file mode 100644 index 00000000000..3dfc2d2db04 --- /dev/null +++ b/sdk/python/feast/infra/materialization/lambda/app.py @@ -0,0 +1,77 @@ +import base64 +import json +import sys +import tempfile +import traceback +from pathlib import Path + +import pyarrow.parquet as pq + +from feast import FeatureStore +from feast.constants import FEATURE_STORE_YAML_ENV_NAME +from feast.infra.materialization.local_engine import DEFAULT_BATCH_SIZE +from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping + + +def handler(event, context): + """Provide an event that contains the following keys: + + - operation: one of the operations in the operations dict below + - tableName: required for operations that interact with DynamoDB + - payload: a parameter to pass to the operation being performed + """ + print("Received event: " + json.dumps(event, indent=2), flush=True) + + try: + + config_base64 = event[FEATURE_STORE_YAML_ENV_NAME] + + config_bytes = base64.b64decode(config_base64) + + # Create a new unique directory for writing feature_store.yaml + repo_path = Path(tempfile.mkdtemp()) + + with open(repo_path / "feature_store.yaml", "wb") as f: + f.write(config_bytes) + + # Initialize the feature store + store = FeatureStore(repo_path=str(repo_path.resolve())) + + view_name = event["view_name"] + view_type = event["view_type"] + path = event["path"] + + bucket = path[len("s3://") :].split("/", 1)[0] + key = path[len("s3://") :].split("/", 1)[1] + print(f"Inferred Bucket: `{bucket}` Key: `{key}`", flush=True) + + if view_type == "batch": + feature_view = store.get_feature_view(view_name) + else: + feature_view = store.get_stream_feature_view(view_name) + + print(f"Got Feature View: `{feature_view}`", flush=True) + + table = pq.read_table(path) + if feature_view.batch_source.field_mapping is not None: + table = _run_pyarrow_field_mapping( + table, feature_view.batch_source.field_mapping + ) + + join_key_to_value_type = { + entity.name: entity.dtype.to_value_type() + for entity in feature_view.entity_columns + } + + for batch in table.to_batches(DEFAULT_BATCH_SIZE): + rows_to_write = _convert_arrow_to_proto( + batch, feature_view, join_key_to_value_type + ) + store._provider.online_write_batch( + store.config, feature_view, rows_to_write, lambda x: None, + ) + except Exception as e: + print(f"Exception: {e}", flush=True) + print("Traceback:", flush=True) + print(traceback.format_exc(), flush=True) + sys.exit(1) diff --git a/sdk/python/feast/infra/materialization/lambda/lambda_engine.py b/sdk/python/feast/infra/materialization/lambda/lambda_engine.py new file mode 100644 index 00000000000..7e3e7639ea6 --- /dev/null +++ b/sdk/python/feast/infra/materialization/lambda/lambda_engine.py @@ -0,0 +1,178 @@ +import base64 +import json +from dataclasses import dataclass +from datetime import datetime +from typing import Callable, List, Literal, Optional, Sequence, Union + +import boto3 +from tqdm import tqdm + +from feast.batch_feature_view import BatchFeatureView +from feast.constants import FEATURE_STORE_YAML_ENV_NAME +from feast.entity import Entity +from feast.feature_view import FeatureView +from feast.infra.materialization.batch_materialization_engine import ( + BatchMaterializationEngine, + MaterializationJob, + MaterializationJobStatus, + MaterializationTask, +) +from feast.infra.offline_stores.offline_store import OfflineStore +from feast.infra.online_stores.online_store import OnlineStore +from feast.registry import BaseRegistry +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.stream_feature_view import StreamFeatureView +from feast.utils import _get_column_names + +DEFAULT_BATCH_SIZE = 10_000 + + +class LambdaMaterializationEngineConfig(FeastConfigBaseModel): + """Batch Materialization Engine config for lambda based engine""" + + type: Literal["lambda"] = "lambda" + """ Type selector""" + + +@dataclass +class LambdaMaterializationJob(MaterializationJob): + def __init__(self, job_id: str, status: MaterializationJobStatus) -> None: + super().__init__() + self._job_id: str = job_id + self._status = status + self._error = None + + def status(self) -> MaterializationJobStatus: + return self._status + + def error(self) -> Optional[BaseException]: + return self._error + + def should_be_retried(self) -> bool: + return False + + def job_id(self) -> str: + return self._job_id + + def url(self) -> Optional[str]: + return None + + +class LambdaMaterializationEngine(BatchMaterializationEngine): + + LAMBDA_NAME = "feast-lambda-consumer" + + def update( + self, + project: str, + views_to_delete: Sequence[ + Union[BatchFeatureView, StreamFeatureView, FeatureView] + ], + views_to_keep: Sequence[ + Union[BatchFeatureView, StreamFeatureView, FeatureView] + ], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + ): + # This should be setting up the lambda function. + pass + + def teardown_infra( + self, + project: str, + fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]], + entities: Sequence[Entity], + ): + # This should be tearing down the lambda function. + pass + + def __init__( + self, + *, + repo_config: RepoConfig, + offline_store: OfflineStore, + online_store: OnlineStore, + **kwargs, + ): + super().__init__( + repo_config=repo_config, + offline_store=offline_store, + online_store=online_store, + **kwargs, + ) + repo_path = self.repo_config.repo_path + assert repo_path + feature_store_path = repo_path / "feature_store.yaml" + self.feature_store_base64 = str( + base64.b64encode(bytes(feature_store_path.read_text(), "UTF-8")), "UTF-8" + ) + + def materialize( + self, registry, tasks: List[MaterializationTask] + ) -> List[MaterializationJob]: + return [ + self._materialize_one( + registry, + task.feature_view, + task.start_time, + task.end_time, + task.project, + task.tqdm_builder, + ) + for task in tasks + ] + + def _materialize_one( + self, + registry: BaseRegistry, + feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView], + start_date: datetime, + end_date: datetime, + project: str, + tqdm_builder: Callable[[int], tqdm], + ): + entities = [] + for entity_name in feature_view.entities: + entities.append(registry.get_entity(entity_name, project)) + + ( + join_key_columns, + feature_name_columns, + timestamp_field, + created_timestamp_column, + ) = _get_column_names(feature_view, entities) + + job_id = f"{feature_view.name}-{start_date}-{end_date}" + + offline_job = self.offline_store.pull_latest_from_table_or_query( + config=self.repo_config, + data_source=feature_view.batch_source, + join_key_columns=join_key_columns, + feature_name_columns=feature_name_columns, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + start_date=start_date, + end_date=end_date, + ) + + paths = offline_job.to_remote_storage() + + for path in paths: + payload = { + FEATURE_STORE_YAML_ENV_NAME: self.feature_store_base64, + "view_name": feature_view.name, + "view_type": "batch", + "path": path, + } + # Invoke a lambda to materialize this file. + lambda_client = boto3.client("lambda") + response = lambda_client.invoke( + FunctionName=self.LAMBDA_NAME, + InvocationType="Event", + Payload=json.dumps(payload), + ) + print(response) + + return LambdaMaterializationJob( + job_id=job_id, status=MaterializationJobStatus.SUCCEEDED + ) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 181d46a5a8c..e7b594effe0 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -36,6 +36,7 @@ BATCH_ENGINE_CLASS_FOR_TYPE = { "local": "feast.infra.materialization.LocalMaterializationEngine", + "lambda": "feast.infra.materialization.lambda.lambda_engine.LambdaMaterializationEngine", } From 36d0f4b6feb69d4b50159a78754e39803cdf4edc Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Fri, 8 Jul 2022 09:33:01 -0700 Subject: [PATCH 02/10] setup and teardown lambda func Signed-off-by: Achal Shah --- .../materialization/lambda/lambda_engine.py | 35 ++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/infra/materialization/lambda/lambda_engine.py b/sdk/python/feast/infra/materialization/lambda/lambda_engine.py index 7e3e7639ea6..ae7858d70d8 100644 --- a/sdk/python/feast/infra/materialization/lambda/lambda_engine.py +++ b/sdk/python/feast/infra/materialization/lambda/lambda_engine.py @@ -5,6 +5,7 @@ from typing import Callable, List, Literal, Optional, Sequence, Union import boto3 +from pydantic import StrictStr from tqdm import tqdm from feast.batch_feature_view import BatchFeatureView @@ -23,6 +24,7 @@ from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.stream_feature_view import StreamFeatureView from feast.utils import _get_column_names +from feast.version import get_version DEFAULT_BATCH_SIZE = 10_000 @@ -33,6 +35,12 @@ class LambdaMaterializationEngineConfig(FeastConfigBaseModel): type: Literal["lambda"] = "lambda" """ Type selector""" + materialization_image: StrictStr + """ The URI of a container image in the Amazon ECR registry, which should be used for materialization. """ + + lambda_role: StrictStr + """ Role that should be used by the materialization lambda """ + @dataclass class LambdaMaterializationJob(MaterializationJob): @@ -59,9 +67,6 @@ def url(self) -> Optional[str]: class LambdaMaterializationEngine(BatchMaterializationEngine): - - LAMBDA_NAME = "feast-lambda-consumer" - def update( self, project: str, @@ -75,7 +80,18 @@ def update( entities_to_keep: Sequence[Entity], ): # This should be setting up the lambda function. - pass + self.lambda_client.create_function( + FunctionName=self.lambda_name, + PackageType="Image", + Role=self.repo_config.offline_store.lambda_role, + Code={"ImageUri": self.repo_config.offline_store.materialization_image}, + Timeout=600, + Tags={ + "feast-owned": "True", + "project": project, + "feast-sdk-version": get_version(), + }, + ) def teardown_infra( self, @@ -84,7 +100,7 @@ def teardown_infra( entities: Sequence[Entity], ): # This should be tearing down the lambda function. - pass + self.lambda_client.delete_function(FunctionName=self.lambda_name) def __init__( self, @@ -107,6 +123,9 @@ def __init__( base64.b64encode(bytes(feature_store_path.read_text(), "UTF-8")), "UTF-8" ) + self.lambda_name = f"feast-materialize-{self.repo_config.project}" + self.lambda_client = boto3.client("lambda") + def materialize( self, registry, tasks: List[MaterializationTask] ) -> List[MaterializationJob]: @@ -165,9 +184,9 @@ def _materialize_one( "path": path, } # Invoke a lambda to materialize this file. - lambda_client = boto3.client("lambda") - response = lambda_client.invoke( - FunctionName=self.LAMBDA_NAME, + + response = self.lambda_client.invoke( + FunctionName=self.lambda_name, InvocationType="Event", Payload=json.dumps(payload), ) From 88b625b36cf4e69348f8c04034baef731f7ee722 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 11 Jul 2022 21:18:13 -0700 Subject: [PATCH 03/10] actually get the test working correctly Signed-off-by: Achal Shah --- sdk/python/feast/infra/aws.py | 3 + .../materialization/lambda/lambda_engine.py | 27 ++- .../feast/infra/online_stores/dynamodb.py | 3 +- .../feast/infra/passthrough_provider.py | 11 +- sdk/python/feast/repo_config.py | 39 +++- sdk/python/tests/conftest.py | 15 ++ .../integration_test_repo_config.py | 9 + .../feature_repos/repo_configuration.py | 18 +- .../materialization/test_lambda.py | 215 ++++++++++++++++++ 9 files changed, 316 insertions(+), 24 deletions(-) create mode 100644 sdk/python/tests/integration/materialization/test_lambda.py diff --git a/sdk/python/feast/infra/aws.py b/sdk/python/feast/infra/aws.py index 14301faf192..efe5716f9a4 100644 --- a/sdk/python/feast/infra/aws.py +++ b/sdk/python/feast/infra/aws.py @@ -106,6 +106,9 @@ def update_infra( self._deploy_feature_server(project, image_uri) + if self.batch_engine: + self.batch_engine.update(project, tables_to_delete, tables_to_keep, entities_to_delete, entities_to_keep) + def _deploy_feature_server(self, project: str, image_uri: str): _logger.info("Deploying feature server...") diff --git a/sdk/python/feast/infra/materialization/lambda/lambda_engine.py b/sdk/python/feast/infra/materialization/lambda/lambda_engine.py index ae7858d70d8..04f04260e7c 100644 --- a/sdk/python/feast/infra/materialization/lambda/lambda_engine.py +++ b/sdk/python/feast/infra/materialization/lambda/lambda_engine.py @@ -1,5 +1,6 @@ import base64 import json +import logging from dataclasses import dataclass from datetime import datetime from typing import Callable, List, Literal, Optional, Sequence, Union @@ -28,6 +29,8 @@ DEFAULT_BATCH_SIZE = 10_000 +logger = logging.getLogger(__name__) + class LambdaMaterializationEngineConfig(FeastConfigBaseModel): """Batch Materialization Engine config for lambda based engine""" @@ -67,6 +70,9 @@ def url(self) -> Optional[str]: class LambdaMaterializationEngine(BatchMaterializationEngine): + """ + WARNING: This engine should be considered "Alpha" functionality. + """ def update( self, project: str, @@ -80,11 +86,11 @@ def update( entities_to_keep: Sequence[Entity], ): # This should be setting up the lambda function. - self.lambda_client.create_function( + r = self.lambda_client.create_function( FunctionName=self.lambda_name, PackageType="Image", - Role=self.repo_config.offline_store.lambda_role, - Code={"ImageUri": self.repo_config.offline_store.materialization_image}, + Role=self.repo_config.batch_engine.lambda_role, + Code={"ImageUri": self.repo_config.batch_engine.materialization_image}, Timeout=600, Tags={ "feast-owned": "True", @@ -92,6 +98,12 @@ def update( "feast-sdk-version": get_version(), }, ) + logger.info("Creating lambda function %s, %s", self.lambda_name, r) + + logger.info("Waiting for function %s to be active", self.lambda_name) + waiter = self.lambda_client.get_waiter('function_active') + waiter.wait(FunctionName=self.lambda_name) + def teardown_infra( self, @@ -100,7 +112,8 @@ def teardown_infra( entities: Sequence[Entity], ): # This should be tearing down the lambda function. - self.lambda_client.delete_function(FunctionName=self.lambda_name) + r = self.lambda_client.delete_function(FunctionName=self.lambda_name) + logger.info("Tearing down lambda %s: %s", self.lambda_name, r) def __init__( self, @@ -124,6 +137,7 @@ def __init__( ) self.lambda_name = f"feast-materialize-{self.repo_config.project}" + # self.lambda_name = "feast-lambda-consumer" self.lambda_client = boto3.client("lambda") def materialize( @@ -187,10 +201,11 @@ def _materialize_one( response = self.lambda_client.invoke( FunctionName=self.lambda_name, - InvocationType="Event", + InvocationType="RequestResponse", Payload=json.dumps(payload), ) - print(response) + logger.info(f"Ingesting {path}; request id {response['ResponseMetadata']['RequestId']}") + print(f"Ingesting {path}; request id {response['ResponseMetadata']['RequestId']}") return LambdaMaterializationJob( job_id=job_id, status=MaterializationJobStatus.SUCCEEDED diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 50709fa3d49..1683a7cff7b 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -229,7 +229,8 @@ def online_read( break batch_entity_ids = { table_instance.name: { - "Keys": [{"entity_id": entity_id} for entity_id in batch] + "Keys": [{"entity_id": entity_id} for entity_id in batch], + "ConsistentRead": True } } with tracing_span(name="remote_call"): diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index e7b594effe0..b33b07d234c 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -22,7 +22,7 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import BaseRegistry -from feast.repo_config import RepoConfig +from feast.repo_config import RepoConfig, BATCH_ENGINE_CLASS_FOR_TYPE from feast.saved_dataset import SavedDataset from feast.stream_feature_view import StreamFeatureView from feast.usage import RatioSampler, log_exceptions_and_usage, set_usage_attribute @@ -34,11 +34,6 @@ DEFAULT_BATCH_SIZE = 10_000 -BATCH_ENGINE_CLASS_FOR_TYPE = { - "local": "feast.infra.materialization.LocalMaterializationEngine", - "lambda": "feast.infra.materialization.lambda.lambda_engine.LambdaMaterializationEngine", -} - class PassthroughProvider(Provider): """ @@ -74,7 +69,7 @@ def batch_engine(self) -> BatchMaterializationEngine: if self._batch_engine: return self._batch_engine else: - engine_config = self.repo_config.batch_engine_config + engine_config = self.repo_config._batch_engine_config config_is_dict = False if isinstance(engine_config, str): engine_config_type = engine_config @@ -130,6 +125,8 @@ def update_infra( entities_to_delete=entities_to_delete, partial=partial, ) + if self.batch_engine: + self.batch_engine.update(project, tables_to_delete, tables_to_keep, entities_to_delete, entities_to_keep) def teardown_infra( self, project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index f315023ee1a..a002e9a668b 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -30,6 +30,12 @@ # These dict exists so that: # - existing values for the online store type in featurestore.yaml files continue to work in a backwards compatible way # - first party and third party implementations can use the same class loading code path. +BATCH_ENGINE_CLASS_FOR_TYPE = { + "local": "feast.infra.materialization.LocalMaterializationEngine", + "lambda": "feast.infra.materialization.lambda.lambda_engine.LambdaMaterializationEngine", +} + + ONLINE_STORE_CLASS_FOR_TYPE = { "sqlite": "feast.infra.online_stores.sqlite.SqliteOnlineStore", "datastore": "feast.infra.online_stores.datastore.DatastoreOnlineStore", @@ -120,7 +126,7 @@ class RepoConfig(FeastBaseModel): _offline_config: Any = Field(alias="offline_store") """ OfflineStoreConfig: Offline store configuration (optional depending on provider) """ - batch_engine_config: Any = Field(alias="batch_engine") + _batch_engine_config: Any = Field(alias="batch_engine") """ BatchMaterializationEngine: Batch materialization configuration (optional depending on provider)""" feature_server: Optional[Any] @@ -160,10 +166,12 @@ def __init__(self, **data: Any): self._batch_engine = None if "batch_engine" in data: - self.batch_engine_config = data["batch_engine"] + self._batch_engine_config = data["batch_engine"] + elif "batch_engine_config" in data: + self._batch_engine_config = data["batch_engine_config"] else: # Defaults to using local in-process materialization engine. - self.batch_engine_config = "local" + self._batch_engine_config = "local" if isinstance(self.feature_server, Dict): self.feature_server = get_feature_server_config_from_type( @@ -205,6 +213,20 @@ def online_store(self): return self._online_store + @property + def batch_engine(self): + if not self._batch_engine: + if isinstance(self._batch_engine_config, Dict): + self._batch_engine = get_batch_engine_config_from_type( + self._batch_engine_config["type"] + )(**self._batch_engine_config) + elif isinstance(self._online_config, str): + self._batch_engine = get_batch_engine_config_from_type(self._online_config)() + elif self._online_config: + self._batch_engine = self._batch_engine + + return self._batch_engine + @root_validator(pre=True) @log_exceptions def _validate_online_store_config(cls, values): @@ -382,6 +404,17 @@ def get_data_source_class_from_type(data_source_type: str): return import_class(module_name, config_class_name, "DataSource") +def get_batch_engine_config_from_type(batch_engine_type: str): + if batch_engine_type in BATCH_ENGINE_CLASS_FOR_TYPE: + batch_engine_type = BATCH_ENGINE_CLASS_FOR_TYPE[batch_engine_type] + else: + assert batch_engine_type.endswith("Engine") + module_name, batch_engine_class_type = batch_engine_type.rsplit(".", 1) + config_class_name = f"{batch_engine_class_type}Config" + + return import_class(module_name, config_class_name, config_class_name) + + def get_online_config_from_type(online_store_type: str): if online_store_type in ONLINE_STORE_CLASS_FOR_TYPE: online_store_type = ONLINE_STORE_CLASS_FOR_TYPE[online_store_type] diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 1700970f1e4..0286a07c9ba 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -45,6 +45,21 @@ logger = logging.getLogger(__name__) +level = logging.INFO +logging.basicConfig( + format="%(asctime)s %(name)s %(levelname)s: %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p", + level=level, +) +# Override the logging level for already created loggers (due to loggers being created at the import time) +# Note, that format & datefmt does not need to be set, because by default child loggers don't override them + +# Also note, that mypy complains that logging.root doesn't have "manager" because of the way it's written. +# So we have to put a type ignore hint for mypy. +for logger_name in logging.root.manager.loggerDict: # type: ignore + if "feast" in logger_name: + logger = logging.getLogger(logger_name) + logger.setLevel(level) def pytest_configure(config): if platform in ["darwin", "windows"]: diff --git a/sdk/python/tests/integration/feature_repos/integration_test_repo_config.py b/sdk/python/tests/integration/feature_repos/integration_test_repo_config.py index 74ce37f17a6..d2e0f70ba2f 100644 --- a/sdk/python/tests/integration/feature_repos/integration_test_repo_config.py +++ b/sdk/python/tests/integration/feature_repos/integration_test_repo_config.py @@ -1,5 +1,6 @@ import hashlib from dataclasses import dataclass +from enum import Enum from typing import Dict, Optional, Type, Union from tests.integration.feature_repos.universal.data_source_creator import ( @@ -13,6 +14,11 @@ ) +class RegistryLocation(Enum): + Local = 1 + S3 = 2 + + @dataclass(frozen=False) class IntegrationTestRepoConfig: """ @@ -25,6 +31,9 @@ class IntegrationTestRepoConfig: offline_store_creator: Type[DataSourceCreator] = FileDataSourceCreator online_store_creator: Optional[Type[OnlineStoreCreator]] = None + batch_engine: Optional[Union[str, Dict]] = "local" + registry_location: RegistryLocation = RegistryLocation.Local + full_feature_names: bool = True infer_features: bool = False python_feature_server: bool = False diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 4dc1db4a135..71d574dc244 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -21,7 +21,7 @@ from feast.infra.feature_servers.local_process.config import LocalFeatureServerConfig from feast.repo_config import RegistryConfig, RepoConfig from tests.integration.feature_repos.integration_test_repo_config import ( - IntegrationTestRepoConfig, + IntegrationTestRepoConfig, RegistryLocation, ) from tests.integration.feature_repos.universal.data_source_creator import ( DataSourceCreator, @@ -381,9 +381,7 @@ def construct_test_environment( online_creator = None online_store = test_repo_config.online_store - repo_dir_name = tempfile.mkdtemp() - - if test_repo_config.python_feature_server and test_repo_config.provider == "aws": + if (test_repo_config.python_feature_server and test_repo_config.provider == "aws"): from feast.infra.feature_servers.aws_lambda.config import ( AwsLambdaFeatureServerConfig, ) @@ -393,22 +391,28 @@ def construct_test_environment( execution_role_name="arn:aws:iam::402087665549:role/lambda_execution_role", ) - registry = ( - f"s3://feast-integration-tests/registries/{project}/registry.db" - ) # type: Union[str, RegistryConfig] else: feature_server = LocalFeatureServerConfig( feature_logging=FeatureLoggingConfig(enabled=True) ) + + repo_dir_name = tempfile.mkdtemp() + if (test_repo_config.python_feature_server and test_repo_config.provider == "aws") or test_repo_config.registry_location == RegistryLocation.S3: + registry: Union[str, RegistryConfig] = ( + f"s3://feast-integration-tests/registries/{project}/registry.db" + ) + else: registry = RegistryConfig( path=str(Path(repo_dir_name) / "registry.db"), cache_ttl_seconds=1, ) + config = RepoConfig( registry=registry, project=project, provider=test_repo_config.provider, offline_store=offline_store_config, online_store=online_store, + batch_engine=test_repo_config.batch_engine, repo_path=repo_dir_name, feature_server=feature_server, go_feature_retrieval=test_repo_config.go_feature_retrieval, diff --git a/sdk/python/tests/integration/materialization/test_lambda.py b/sdk/python/tests/integration/materialization/test_lambda.py new file mode 100644 index 00000000000..bca4f6d633e --- /dev/null +++ b/sdk/python/tests/integration/materialization/test_lambda.py @@ -0,0 +1,215 @@ +import time + +import math +from datetime import datetime, timedelta +from typing import Optional + +import pandas as pd +import pytest +from pytz import utc + +from feast import FeatureStore, FeatureView, Entity, ValueType, RedshiftSource, Feature +from tests.data.data_creator import create_basic_driver_dataset +from tests.integration.feature_repos.integration_test_repo_config import IntegrationTestRepoConfig, RegistryLocation +from tests.integration.feature_repos.repo_configuration import construct_test_environment +from tests.integration.feature_repos.universal.data_sources.redshift import RedshiftDataSourceCreator +from tests.integration.feature_repos.universal.entities import driver +from tests.integration.feature_repos.universal.feature_views import driver_feature_view + + +@pytest.mark.integration +def test_lambda_materialization(): + lambda_config = IntegrationTestRepoConfig( + provider="aws", + online_store={ + "type": "dynamodb", + "region": "us-west-2", + }, + offline_store_creator=RedshiftDataSourceCreator, + batch_engine={ + "type": "lambda", + "materialization_image": "402087665549.dkr.ecr.us-west-2.amazonaws.com/feast-lambda-consumer:v1", + "lambda_role": "arn:aws:iam::402087665549:role/lambda_execution_role" + }, + registry_location=RegistryLocation.S3 + ) + lambda_environment = construct_test_environment(lambda_config, None) + + # local_config = IntegrationTestRepoConfig( + # online_store={ + # "type": "dynamodb", + # "region": "us-west-2", + # }, + # offline_store_creator=RedshiftDataSourceCreator, + # batch_engine="local" + # ) + # local_environment = construct_test_environment(local_config, None) + + df = create_basic_driver_dataset() + ds = lambda_environment.data_source_creator.create_data_source( + df, lambda_environment.feature_store.project, field_mapping={"ts_1": "ts"}, + ) + + fs = lambda_environment.feature_store + driver = Entity( + name="driver_id", + join_key="driver_id", + value_type=ValueType.INT64, + ) + + driver_stats_fv = FeatureView( + name="driver_hourly_stats", + entities=["driver_id"], + ttl=timedelta(weeks=52), + features=[ + Feature(name="value", dtype=ValueType.FLOAT), + ], + batch_source=ds, + ) + + try: + + fs.apply([driver, driver_stats_fv]) + + print(df) + + # materialization is run in two steps and + # we use timestamp from generated dataframe as a split point + split_dt = df["ts_1"][4].to_pydatetime() - timedelta(seconds=1) + + print(f"Split datetime: {split_dt}") + + run_offline_online_store_consistency_test(fs, driver_stats_fv, split_dt) + finally: + fs.teardown() + + +def check_offline_and_online_features( + fs: FeatureStore, + fv: FeatureView, + driver_id: int, + event_timestamp: datetime, + expected_value: Optional[float], + full_feature_names: bool, + check_offline_store: bool = True, +) -> None: + # Check online store + response_dict = fs.get_online_features( + [f"{fv.name}:value"], + [{"driver_id": driver_id}], + full_feature_names=full_feature_names, + ).to_dict() + + if full_feature_names: + + if expected_value: + assert response_dict[f"{fv.name}__value"][0], f"Response: {response_dict}" + assert ( + abs(response_dict[f"{fv.name}__value"][0] - expected_value) < 1e-6 + ), f"Response: {response_dict}, Expected: {expected_value}" + else: + assert response_dict[f"{fv.name}__value"][0] is None + else: + if expected_value: + assert response_dict["value"][0], f"Response: {response_dict}" + assert ( + abs(response_dict["value"][0] - expected_value) < 1e-6 + ), f"Response: {response_dict}, Expected: {expected_value}" + else: + assert response_dict["value"][0] is None + + # Check offline store + if check_offline_store: + df = fs.get_historical_features( + entity_df=pd.DataFrame.from_dict( + {"driver_id": [driver_id], "event_timestamp": [event_timestamp]} + ), + features=[f"{fv.name}:value"], + full_feature_names=full_feature_names, + ).to_df() + + if full_feature_names: + if expected_value: + assert ( + abs( + df.to_dict(orient="list")[f"{fv.name}__value"][0] + - expected_value + ) + < 1e-6 + ) + else: + assert not df.to_dict(orient="list")[f"{fv.name}__value"] or math.isnan( + df.to_dict(orient="list")[f"{fv.name}__value"][0] + ) + else: + if expected_value: + assert ( + abs(df.to_dict(orient="list")["value"][0] - expected_value) < 1e-6 + ) + else: + assert not df.to_dict(orient="list")["value"] or math.isnan( + df.to_dict(orient="list")["value"][0] + ) + + +def run_offline_online_store_consistency_test( + fs: FeatureStore, fv: FeatureView, split_dt: datetime +) -> None: + now = datetime.utcnow() + + full_feature_names = True + check_offline_store: bool = True + + # Run materialize() + # use both tz-naive & tz-aware timestamps to test that they're both correctly handled + start_date = (now - timedelta(hours=5)).replace(tzinfo=utc) + end_date = split_dt + fs.materialize(feature_views=[fv.name], start_date=start_date, end_date=end_date) + + time.sleep(10) + + # check result of materialize() + check_offline_and_online_features( + fs=fs, + fv=fv, + driver_id=1, + event_timestamp=end_date, + expected_value=0.3, + full_feature_names=full_feature_names, + check_offline_store=check_offline_store, + ) + + check_offline_and_online_features( + fs=fs, + fv=fv, + driver_id=2, + event_timestamp=end_date, + expected_value=None, + full_feature_names=full_feature_names, + check_offline_store=check_offline_store, + ) + + # check prior value for materialize_incremental() + check_offline_and_online_features( + fs=fs, + fv=fv, + driver_id=3, + event_timestamp=end_date, + expected_value=4, + full_feature_names=full_feature_names, + check_offline_store=check_offline_store, + ) + + # run materialize_incremental() + fs.materialize_incremental(feature_views=[fv.name], end_date=now) + + # check result of materialize_incremental() + check_offline_and_online_features( + fs=fs, + fv=fv, + driver_id=3, + event_timestamp=now, + expected_value=5, + full_feature_names=full_feature_names, + check_offline_store=check_offline_store, + ) From f2927e9f9151f4dbedfd4a311c1387c123e87476 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 12 Jul 2022 09:38:11 -0700 Subject: [PATCH 04/10] actually get the test working correctly Signed-off-by: Achal Shah --- sdk/python/feast/infra/aws.py | 8 +- .../materialization/lambda/lambda_engine.py | 12 ++- .../feast/infra/online_stores/dynamodb.py | 2 +- .../feast/infra/passthrough_provider.py | 10 ++- sdk/python/feast/repo_config.py | 4 +- sdk/python/tests/conftest.py | 1 + .../feature_repos/repo_configuration.py | 9 ++- .../materialization/test_lambda.py | 81 ++++++++----------- 8 files changed, 67 insertions(+), 60 deletions(-) diff --git a/sdk/python/feast/infra/aws.py b/sdk/python/feast/infra/aws.py index efe5716f9a4..dcf8194f820 100644 --- a/sdk/python/feast/infra/aws.py +++ b/sdk/python/feast/infra/aws.py @@ -107,7 +107,13 @@ def update_infra( self._deploy_feature_server(project, image_uri) if self.batch_engine: - self.batch_engine.update(project, tables_to_delete, tables_to_keep, entities_to_delete, entities_to_keep) + self.batch_engine.update( + project, + tables_to_delete, + tables_to_keep, + entities_to_delete, + entities_to_keep, + ) def _deploy_feature_server(self, project: str, image_uri: str): _logger.info("Deploying feature server...") diff --git a/sdk/python/feast/infra/materialization/lambda/lambda_engine.py b/sdk/python/feast/infra/materialization/lambda/lambda_engine.py index 04f04260e7c..625a171157e 100644 --- a/sdk/python/feast/infra/materialization/lambda/lambda_engine.py +++ b/sdk/python/feast/infra/materialization/lambda/lambda_engine.py @@ -73,6 +73,7 @@ class LambdaMaterializationEngine(BatchMaterializationEngine): """ WARNING: This engine should be considered "Alpha" functionality. """ + def update( self, project: str, @@ -101,10 +102,9 @@ def update( logger.info("Creating lambda function %s, %s", self.lambda_name, r) logger.info("Waiting for function %s to be active", self.lambda_name) - waiter = self.lambda_client.get_waiter('function_active') + waiter = self.lambda_client.get_waiter("function_active") waiter.wait(FunctionName=self.lambda_name) - def teardown_infra( self, project: str, @@ -204,8 +204,12 @@ def _materialize_one( InvocationType="RequestResponse", Payload=json.dumps(payload), ) - logger.info(f"Ingesting {path}; request id {response['ResponseMetadata']['RequestId']}") - print(f"Ingesting {path}; request id {response['ResponseMetadata']['RequestId']}") + logger.info( + f"Ingesting {path}; request id {response['ResponseMetadata']['RequestId']}" + ) + print( + f"Ingesting {path}; request id {response['ResponseMetadata']['RequestId']}" + ) return LambdaMaterializationJob( job_id=job_id, status=MaterializationJobStatus.SUCCEEDED diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 1683a7cff7b..6919f2cc298 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -230,7 +230,7 @@ def online_read( batch_entity_ids = { table_instance.name: { "Keys": [{"entity_id": entity_id} for entity_id in batch], - "ConsistentRead": True + "ConsistentRead": True, } } with tracing_span(name="remote_call"): diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index b33b07d234c..4105e8e5603 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -22,7 +22,7 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import BaseRegistry -from feast.repo_config import RepoConfig, BATCH_ENGINE_CLASS_FOR_TYPE +from feast.repo_config import BATCH_ENGINE_CLASS_FOR_TYPE, RepoConfig from feast.saved_dataset import SavedDataset from feast.stream_feature_view import StreamFeatureView from feast.usage import RatioSampler, log_exceptions_and_usage, set_usage_attribute @@ -126,7 +126,13 @@ def update_infra( partial=partial, ) if self.batch_engine: - self.batch_engine.update(project, tables_to_delete, tables_to_keep, entities_to_delete, entities_to_keep) + self.batch_engine.update( + project, + tables_to_delete, + tables_to_keep, + entities_to_delete, + entities_to_keep, + ) def teardown_infra( self, project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index a002e9a668b..061091da575 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -221,7 +221,9 @@ def batch_engine(self): self._batch_engine_config["type"] )(**self._batch_engine_config) elif isinstance(self._online_config, str): - self._batch_engine = get_batch_engine_config_from_type(self._online_config)() + self._batch_engine = get_batch_engine_config_from_type( + self._online_config + )() elif self._online_config: self._batch_engine = self._batch_engine diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 0286a07c9ba..ac30149cfad 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -61,6 +61,7 @@ logger = logging.getLogger(logger_name) logger.setLevel(level) + def pytest_configure(config): if platform in ["darwin", "windows"]: multiprocessing.set_start_method("spawn") diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 71d574dc244..4300ca64b67 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -21,7 +21,8 @@ from feast.infra.feature_servers.local_process.config import LocalFeatureServerConfig from feast.repo_config import RegistryConfig, RepoConfig from tests.integration.feature_repos.integration_test_repo_config import ( - IntegrationTestRepoConfig, RegistryLocation, + IntegrationTestRepoConfig, + RegistryLocation, ) from tests.integration.feature_repos.universal.data_source_creator import ( DataSourceCreator, @@ -381,7 +382,7 @@ def construct_test_environment( online_creator = None online_store = test_repo_config.online_store - if (test_repo_config.python_feature_server and test_repo_config.provider == "aws"): + if test_repo_config.python_feature_server and test_repo_config.provider == "aws": from feast.infra.feature_servers.aws_lambda.config import ( AwsLambdaFeatureServerConfig, ) @@ -397,7 +398,9 @@ def construct_test_environment( ) repo_dir_name = tempfile.mkdtemp() - if (test_repo_config.python_feature_server and test_repo_config.provider == "aws") or test_repo_config.registry_location == RegistryLocation.S3: + if ( + test_repo_config.python_feature_server and test_repo_config.provider == "aws" + ) or test_repo_config.registry_location == RegistryLocation.S3: registry: Union[str, RegistryConfig] = ( f"s3://feast-integration-tests/registries/{project}/registry.db" ) diff --git a/sdk/python/tests/integration/materialization/test_lambda.py b/sdk/python/tests/integration/materialization/test_lambda.py index bca4f6d633e..66cd2c5eb97 100644 --- a/sdk/python/tests/integration/materialization/test_lambda.py +++ b/sdk/python/tests/integration/materialization/test_lambda.py @@ -1,6 +1,5 @@ -import time - import math +import time from datetime import datetime, timedelta from typing import Optional @@ -8,62 +7,48 @@ import pytest from pytz import utc -from feast import FeatureStore, FeatureView, Entity, ValueType, RedshiftSource, Feature +from feast import Entity, Feature, FeatureStore, FeatureView, ValueType from tests.data.data_creator import create_basic_driver_dataset -from tests.integration.feature_repos.integration_test_repo_config import IntegrationTestRepoConfig, RegistryLocation -from tests.integration.feature_repos.repo_configuration import construct_test_environment -from tests.integration.feature_repos.universal.data_sources.redshift import RedshiftDataSourceCreator -from tests.integration.feature_repos.universal.entities import driver -from tests.integration.feature_repos.universal.feature_views import driver_feature_view +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, + RegistryLocation, +) +from tests.integration.feature_repos.repo_configuration import ( + construct_test_environment, +) +from tests.integration.feature_repos.universal.data_sources.redshift import ( + RedshiftDataSourceCreator, +) @pytest.mark.integration def test_lambda_materialization(): lambda_config = IntegrationTestRepoConfig( provider="aws", - online_store={ - "type": "dynamodb", - "region": "us-west-2", - }, + online_store={"type": "dynamodb", "region": "us-west-2"}, offline_store_creator=RedshiftDataSourceCreator, batch_engine={ "type": "lambda", "materialization_image": "402087665549.dkr.ecr.us-west-2.amazonaws.com/feast-lambda-consumer:v1", - "lambda_role": "arn:aws:iam::402087665549:role/lambda_execution_role" + "lambda_role": "arn:aws:iam::402087665549:role/lambda_execution_role", }, - registry_location=RegistryLocation.S3 + registry_location=RegistryLocation.S3, ) lambda_environment = construct_test_environment(lambda_config, None) - # local_config = IntegrationTestRepoConfig( - # online_store={ - # "type": "dynamodb", - # "region": "us-west-2", - # }, - # offline_store_creator=RedshiftDataSourceCreator, - # batch_engine="local" - # ) - # local_environment = construct_test_environment(local_config, None) - df = create_basic_driver_dataset() ds = lambda_environment.data_source_creator.create_data_source( df, lambda_environment.feature_store.project, field_mapping={"ts_1": "ts"}, ) fs = lambda_environment.feature_store - driver = Entity( - name="driver_id", - join_key="driver_id", - value_type=ValueType.INT64, - ) + driver = Entity(name="driver_id", join_key="driver_id", value_type=ValueType.INT64,) driver_stats_fv = FeatureView( name="driver_hourly_stats", entities=["driver_id"], ttl=timedelta(weeks=52), - features=[ - Feature(name="value", dtype=ValueType.FLOAT), - ], + features=[Feature(name="value", dtype=ValueType.FLOAT)], batch_source=ds, ) @@ -85,13 +70,13 @@ def test_lambda_materialization(): def check_offline_and_online_features( - fs: FeatureStore, - fv: FeatureView, - driver_id: int, - event_timestamp: datetime, - expected_value: Optional[float], - full_feature_names: bool, - check_offline_store: bool = True, + fs: FeatureStore, + fv: FeatureView, + driver_id: int, + event_timestamp: datetime, + expected_value: Optional[float], + full_feature_names: bool, + check_offline_store: bool = True, ) -> None: # Check online store response_dict = fs.get_online_features( @@ -105,7 +90,7 @@ def check_offline_and_online_features( if expected_value: assert response_dict[f"{fv.name}__value"][0], f"Response: {response_dict}" assert ( - abs(response_dict[f"{fv.name}__value"][0] - expected_value) < 1e-6 + abs(response_dict[f"{fv.name}__value"][0] - expected_value) < 1e-6 ), f"Response: {response_dict}, Expected: {expected_value}" else: assert response_dict[f"{fv.name}__value"][0] is None @@ -113,7 +98,7 @@ def check_offline_and_online_features( if expected_value: assert response_dict["value"][0], f"Response: {response_dict}" assert ( - abs(response_dict["value"][0] - expected_value) < 1e-6 + abs(response_dict["value"][0] - expected_value) < 1e-6 ), f"Response: {response_dict}, Expected: {expected_value}" else: assert response_dict["value"][0] is None @@ -131,11 +116,11 @@ def check_offline_and_online_features( if full_feature_names: if expected_value: assert ( - abs( - df.to_dict(orient="list")[f"{fv.name}__value"][0] - - expected_value - ) - < 1e-6 + abs( + df.to_dict(orient="list")[f"{fv.name}__value"][0] + - expected_value + ) + < 1e-6 ) else: assert not df.to_dict(orient="list")[f"{fv.name}__value"] or math.isnan( @@ -144,7 +129,7 @@ def check_offline_and_online_features( else: if expected_value: assert ( - abs(df.to_dict(orient="list")["value"][0] - expected_value) < 1e-6 + abs(df.to_dict(orient="list")["value"][0] - expected_value) < 1e-6 ) else: assert not df.to_dict(orient="list")["value"] or math.isnan( @@ -153,7 +138,7 @@ def check_offline_and_online_features( def run_offline_online_store_consistency_test( - fs: FeatureStore, fv: FeatureView, split_dt: datetime + fs: FeatureStore, fv: FeatureView, split_dt: datetime ) -> None: now = datetime.utcnow() From 20f8a5422fe3ff72dc915bf8c2efbbb068f9e2ec Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 12 Jul 2022 11:15:58 -0700 Subject: [PATCH 05/10] parallelize with threads Signed-off-by: Achal Shah --- sdk/python/feast/infra/aws.py | 2 + .../feast/infra/materialization/lambda/app.py | 4 ++ .../materialization/lambda/lambda_engine.py | 39 ++++++++++++++----- .../feast/infra/passthrough_provider.py | 2 + 4 files changed, 37 insertions(+), 10 deletions(-) diff --git a/sdk/python/feast/infra/aws.py b/sdk/python/feast/infra/aws.py index dcf8194f820..83edaa16ee8 100644 --- a/sdk/python/feast/infra/aws.py +++ b/sdk/python/feast/infra/aws.py @@ -229,6 +229,8 @@ def teardown_infra( if api is not None: _logger.info(" Tearing down AWS API Gateway...") aws_utils.delete_api_gateway(api_gateway_client, api["ApiId"]) + if self.batch_engine: + self.batch_engine.teardown_infra(project, tables, entities) @log_exceptions_and_usage(provider="AwsProvider") def get_feature_server_endpoint(self) -> Optional[str]: diff --git a/sdk/python/feast/infra/materialization/lambda/app.py b/sdk/python/feast/infra/materialization/lambda/app.py index 3dfc2d2db04..194313f4a07 100644 --- a/sdk/python/feast/infra/materialization/lambda/app.py +++ b/sdk/python/feast/infra/materialization/lambda/app.py @@ -63,6 +63,8 @@ def handler(event, context): for entity in feature_view.entity_columns } + written_rows = 0 + for batch in table.to_batches(DEFAULT_BATCH_SIZE): rows_to_write = _convert_arrow_to_proto( batch, feature_view, join_key_to_value_type @@ -70,6 +72,8 @@ def handler(event, context): store._provider.online_write_batch( store.config, feature_view, rows_to_write, lambda x: None, ) + written_rows += len(rows_to_write) + return {"written_rows": written_rows} except Exception as e: print(f"Exception: {e}", flush=True) print("Traceback:", flush=True) diff --git a/sdk/python/feast/infra/materialization/lambda/lambda_engine.py b/sdk/python/feast/infra/materialization/lambda/lambda_engine.py index 625a171157e..2b83805752a 100644 --- a/sdk/python/feast/infra/materialization/lambda/lambda_engine.py +++ b/sdk/python/feast/infra/materialization/lambda/lambda_engine.py @@ -1,6 +1,7 @@ import base64 import json import logging +from concurrent.futures import ThreadPoolExecutor, wait from dataclasses import dataclass from datetime import datetime from typing import Callable, List, Literal, Optional, Sequence, Union @@ -137,7 +138,6 @@ def __init__( ) self.lambda_name = f"feast-materialize-{self.repo_config.project}" - # self.lambda_name = "feast-lambda-consumer" self.lambda_client = boto3.client("lambda") def materialize( @@ -189,6 +189,9 @@ def _materialize_one( ) paths = offline_job.to_remote_storage() + max_workers = len(paths) if len(paths) <= 20 else 20 + executor = ThreadPoolExecutor(max_workers=max_workers) + futures = [] for path in paths: payload = { @@ -199,18 +202,34 @@ def _materialize_one( } # Invoke a lambda to materialize this file. - response = self.lambda_client.invoke( - FunctionName=self.lambda_name, - InvocationType="RequestResponse", - Payload=json.dumps(payload), + logger.info("Invoking materialization for %s", path) + futures.append( + executor.submit( + self.lambda_client.invoke, + FunctionName=self.lambda_name, + InvocationType="RequestResponse", + Payload=json.dumps(payload), + ) ) + + done, not_done = wait(futures) + logger.info("Done: %s Not Done: %s", done, not_done) + for f in done: + response = f.result() + output = json.loads(response["Payload"].read()) + logger.info( - f"Ingesting {path}; request id {response['ResponseMetadata']['RequestId']}" - ) - print( - f"Ingesting {path}; request id {response['ResponseMetadata']['RequestId']}" + f"Ingested task; request id {response['ResponseMetadata']['RequestId']}, " + f"rows written: {output['written_rows']}" ) + for f in not_done: + response = f.result() + logger.error(f"Ingestion failed: {response}") + return LambdaMaterializationJob( - job_id=job_id, status=MaterializationJobStatus.SUCCEEDED + job_id=job_id, + status=MaterializationJobStatus.SUCCEEDED + if not not_done + else MaterializationJobStatus.ERROR, ) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 4105e8e5603..e31eb1e1773 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -140,6 +140,8 @@ def teardown_infra( set_usage_attribute("provider", self.__class__.__name__) if self.online_store: self.online_store.teardown(self.repo_config, tables, entities) + if self.batch_engine: + self.batch_engine.teardown_infra(project, tables, entities) def online_write_batch( self, From 23e67b4e177655d4fb58cdab769311dd13fad747 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 12 Jul 2022 12:55:51 -0700 Subject: [PATCH 06/10] super call Signed-off-by: Achal Shah --- sdk/python/feast/infra/aws.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/aws.py b/sdk/python/feast/infra/aws.py index 83edaa16ee8..4109856e609 100644 --- a/sdk/python/feast/infra/aws.py +++ b/sdk/python/feast/infra/aws.py @@ -207,8 +207,7 @@ def _deploy_feature_server(self, project: str, image_uri: str): def teardown_infra( self, project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], ) -> None: - if self.online_store: - self.online_store.teardown(self.repo_config, tables, entities) + super(AwsProvider, self).teardown_infra(project, tables, entities) if ( self.repo_config.feature_server is not None @@ -229,8 +228,6 @@ def teardown_infra( if api is not None: _logger.info(" Tearing down AWS API Gateway...") aws_utils.delete_api_gateway(api_gateway_client, api["ApiId"]) - if self.batch_engine: - self.batch_engine.teardown_infra(project, tables, entities) @log_exceptions_and_usage(provider="AwsProvider") def get_feature_server_endpoint(self) -> Optional[str]: From b12e89d76269ce5aa901e9d4a06f722d887f9b9b Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 12 Jul 2022 16:07:44 -0700 Subject: [PATCH 07/10] fix bugs Signed-off-by: Achal Shah --- sdk/python/feast/repo_config.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 061091da575..f7f564df6ff 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -220,11 +220,11 @@ def batch_engine(self): self._batch_engine = get_batch_engine_config_from_type( self._batch_engine_config["type"] )(**self._batch_engine_config) - elif isinstance(self._online_config, str): + elif isinstance(self._batch_engine_config, str): self._batch_engine = get_batch_engine_config_from_type( - self._online_config + self._batch_engine_config )() - elif self._online_config: + elif self._batch_engine_config: self._batch_engine = self._batch_engine return self._batch_engine From 252354045959025d5768ceecde74cd50fe5d9110 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 13 Jul 2022 09:01:00 -0700 Subject: [PATCH 08/10] fix tests Signed-off-by: Achal Shah --- sdk/python/feast/infra/materialization/lambda/app.py | 1 + sdk/python/feast/infra/materialization/lambda/lambda_engine.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/materialization/lambda/app.py b/sdk/python/feast/infra/materialization/lambda/app.py index 194313f4a07..ebed4c96e06 100644 --- a/sdk/python/feast/infra/materialization/lambda/app.py +++ b/sdk/python/feast/infra/materialization/lambda/app.py @@ -46,6 +46,7 @@ def handler(event, context): print(f"Inferred Bucket: `{bucket}` Key: `{key}`", flush=True) if view_type == "batch": + # TODO: This probably needs to be become `store.get_batch_feature_view` at some point. feature_view = store.get_feature_view(view_name) else: feature_view = store.get_stream_feature_view(view_name) diff --git a/sdk/python/feast/infra/materialization/lambda/lambda_engine.py b/sdk/python/feast/infra/materialization/lambda/lambda_engine.py index 2b83805752a..60f2f9ffbbd 100644 --- a/sdk/python/feast/infra/materialization/lambda/lambda_engine.py +++ b/sdk/python/feast/infra/materialization/lambda/lambda_engine.py @@ -113,8 +113,9 @@ def teardown_infra( entities: Sequence[Entity], ): # This should be tearing down the lambda function. + logger.info("Tearing down lambda %s: %s", self.lambda_name) r = self.lambda_client.delete_function(FunctionName=self.lambda_name) - logger.info("Tearing down lambda %s: %s", self.lambda_name, r) + logger.info("Finished tearing down lambda %s: %s", self.lambda_name, r) def __init__( self, From 0f93f306c9dac3a82656564695f070db49d2cfbd Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 13 Jul 2022 09:30:16 -0700 Subject: [PATCH 09/10] fix tests Signed-off-by: Achal Shah --- .../feast/infra/materialization/lambda/lambda_engine.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/materialization/lambda/lambda_engine.py b/sdk/python/feast/infra/materialization/lambda/lambda_engine.py index 60f2f9ffbbd..89a5f1a4f43 100644 --- a/sdk/python/feast/infra/materialization/lambda/lambda_engine.py +++ b/sdk/python/feast/infra/materialization/lambda/lambda_engine.py @@ -113,7 +113,7 @@ def teardown_infra( entities: Sequence[Entity], ): # This should be tearing down the lambda function. - logger.info("Tearing down lambda %s: %s", self.lambda_name) + logger.info("Tearing down lambda %s", self.lambda_name) r = self.lambda_client.delete_function(FunctionName=self.lambda_name) logger.info("Finished tearing down lambda %s: %s", self.lambda_name, r) @@ -139,6 +139,8 @@ def __init__( ) self.lambda_name = f"feast-materialize-{self.repo_config.project}" + if len(self.lambda_name) > 64: + self.lambda_name = self.lambda_name[:64] self.lambda_client = boto3.client("lambda") def materialize( From 37aae59d098956185ed5aea0d656a4af85d2a198 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 13 Jul 2022 11:25:02 -0700 Subject: [PATCH 10/10] undo unintended changes Signed-off-by: Achal Shah --- sdk/python/feast/infra/feature_servers/aws_lambda/Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/infra/feature_servers/aws_lambda/Dockerfile b/sdk/python/feast/infra/feature_servers/aws_lambda/Dockerfile index 5f5038544c6..929227a8106 100644 --- a/sdk/python/feast/infra/feature_servers/aws_lambda/Dockerfile +++ b/sdk/python/feast/infra/feature_servers/aws_lambda/Dockerfile @@ -20,6 +20,7 @@ COPY README.md README.md # https://github.com/pypa/setuptools_scm#usage-from-docker # I think it also assumes that this dockerfile is being built from the root of the directory. RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir -e '.[aws,redis]' +RUN pip3 install -r sdk/python/feast/infra/feature_servers/aws_lambda/requirements.txt --target "${LAMBDA_TASK_ROOT}" # Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile) CMD [ "app.handler" ]