From d31690602151a7ad90914418f63982be42c1ff64 Mon Sep 17 00:00:00 2001 From: Youngkyu OH Date: Sun, 30 Oct 2022 14:42:51 +0900 Subject: [PATCH 1/3] Update the template on how to use AWS Athena Signed-off-by: Youngkyu OH --- .../athena/feature_repo/feature_store.yaml | 7 +-- .../athena/feature_repo/test_workflow.py | 45 +++++++++++-------- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/sdk/python/feast/templates/athena/feature_repo/feature_store.yaml b/sdk/python/feast/templates/athena/feature_repo/feature_store.yaml index 13e7898e861..bd12e906d1f 100644 --- a/sdk/python/feast/templates/athena/feature_repo/feature_store.yaml +++ b/sdk/python/feast/templates/athena/feature_repo/feature_store.yaml @@ -6,8 +6,9 @@ online_store: path: online_store.db offline_store: type: athena - region: ap-northeast-2 - database: sampledb + region: {AWS region} + database: {The database in the data catalog to be used by Athena} data_source: AwsDataCatalog - s3_staging_location: s3://sagemaker-yelo-test + s3_staging_location: s3://{S3 bucket to be used by Feast} + workgroup: {Workgroup for Athena} entity_key_serialization_version: 2 \ No newline at end of file diff --git a/sdk/python/feast/templates/athena/feature_repo/test_workflow.py b/sdk/python/feast/templates/athena/feature_repo/test_workflow.py index 7d7daff8650..985377b3395 100644 --- a/sdk/python/feast/templates/athena/feature_repo/test_workflow.py +++ b/sdk/python/feast/templates/athena/feature_repo/test_workflow.py @@ -1,18 +1,28 @@ import os -from datetime import datetime, timedelta - import pandas as pd +import importlib +from datetime import datetime, timedelta +from feast import FeatureStore, Entity,Field, Feature, FeatureView, ValueType +from feast.infra.offline_stores.contrib.athena_offline_store.athena_source import AthenaSource +from feast.types import Float32, Float64, Int64, Int32 -from feast import Entity, Feature, FeatureStore, FeatureView, ValueType -from feast.infra.offline_stores.contrib.athena_offline_store.athena_source import ( - AthenaSource, -) def test_end_to_end(): try: - fs = FeatureStore(".") + + # Before running this test method + # 1. Upload the driver_stats.parquet file to your S3 bucket. + # (https://github.com/feast-dev/feast-custom-offline-store-demo/tree/main/feature_repo/data) + # 2. Using AWS Glue Crawler, create a table in the data catalog. The generated table can be queried through Athena. + # 3. Specify the S3 bucket name, data source(AwsDataCatalog), database name, Athena's workgroup, etc. in feature_store.yaml + + fs = FeatureStore("./feature_repo") + + # Partition pruning has a significant impact on Athena's query performance and cost. + # If offline feature dataset is large, it is highly recommended to create partitions using date columns such as ('created','event_timestamp') + # The date_partition_column must be in form of YYYY-MM-DD(string) as in the beginning of the date column. driver_hourly_stats = AthenaSource( timestamp_field="event_timestamp", @@ -21,31 +31,29 @@ def test_end_to_end(): database="sampledb", data_source="AwsDataCatalog", created_timestamp_column="created", - # date_partition_column="std_date" + # date_partition_column="std_date" #YYYY-MM-DD ) driver = Entity( name="driver_id", - value_type=ValueType.INT64, description="driver id", ) driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=["driver_id"], - ttl=timedelta(days=365), - features=[ - Feature(name="conv_rate", dtype=ValueType.FLOAT), - Feature(name="acc_rate", dtype=ValueType.FLOAT), - Feature(name="avg_daily_trips", dtype=ValueType.INT64), + entities=[driver], + ttl=timedelta(days=500), + schema=[ + Field(name="conv_rate",dtype=Float32), + Field(name="acc_rate",dtype=Float32), + Field(name="avg_daily_trips",dtype=Int32), ], online=True, - batch_source=driver_hourly_stats, + source=driver_hourly_stats, ) # apply repository fs.apply([driver_hourly_stats, driver, driver_hourly_stats_view]) - print(fs.list_data_sources()) print(fs.list_feature_views()) @@ -54,7 +62,6 @@ def test_end_to_end(): ) # Read features from offline store - feature_vector = ( fs.get_historical_features( features=["driver_hourly_stats:conv_rate"], entity_df=entity_df @@ -105,4 +112,4 @@ def test_cli(): if __name__ == "__main__": # pass test_end_to_end() - test_cli() + test_cli() \ No newline at end of file From b443e09a00f0caf6bce7301be29bff89065e066f Mon Sep 17 00:00:00 2001 From: Youngkyu OH Date: Sun, 30 Oct 2022 15:01:20 +0900 Subject: [PATCH 2/3] Remove unnecessary imports Signed-off-by: Youngkyu OH --- .../templates/athena/feature_repo/test_workflow.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/templates/athena/feature_repo/test_workflow.py b/sdk/python/feast/templates/athena/feature_repo/test_workflow.py index 985377b3395..2bc152b4c61 100644 --- a/sdk/python/feast/templates/athena/feature_repo/test_workflow.py +++ b/sdk/python/feast/templates/athena/feature_repo/test_workflow.py @@ -1,10 +1,9 @@ import os import pandas as pd -import importlib from datetime import datetime, timedelta -from feast import FeatureStore, Entity,Field, Feature, FeatureView, ValueType +from feast import FeatureStore, Entity,Field, FeatureView from feast.infra.offline_stores.contrib.athena_offline_store.athena_source import AthenaSource -from feast.types import Float32, Float64, Int64, Int32 +from feast.types import Float64, Int64 @@ -44,9 +43,9 @@ def test_end_to_end(): entities=[driver], ttl=timedelta(days=500), schema=[ - Field(name="conv_rate",dtype=Float32), - Field(name="acc_rate",dtype=Float32), - Field(name="avg_daily_trips",dtype=Int32), + Field(name="conv_rate",dtype=Float64), + Field(name="acc_rate",dtype=Float64), + Field(name="avg_daily_trips",dtype=Int64), ], online=True, source=driver_hourly_stats, From b58c536d6ea036fb87ece0a1f7ea24e42f89b60e Mon Sep 17 00:00:00 2001 From: Youngkyu OH Date: Sun, 30 Oct 2022 15:06:21 +0900 Subject: [PATCH 3/3] lint & format Signed-off-by: Youngkyu OH --- .../athena/feature_repo/test_workflow.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/templates/athena/feature_repo/test_workflow.py b/sdk/python/feast/templates/athena/feature_repo/test_workflow.py index 2bc152b4c61..bf69a4bff05 100644 --- a/sdk/python/feast/templates/athena/feature_repo/test_workflow.py +++ b/sdk/python/feast/templates/athena/feature_repo/test_workflow.py @@ -1,10 +1,13 @@ import os -import pandas as pd from datetime import datetime, timedelta -from feast import FeatureStore, Entity,Field, FeatureView -from feast.infra.offline_stores.contrib.athena_offline_store.athena_source import AthenaSource -from feast.types import Float64, Int64 +import pandas as pd + +from feast import Entity, FeatureStore, FeatureView, Field +from feast.infra.offline_stores.contrib.athena_offline_store.athena_source import ( + AthenaSource, +) +from feast.types import Float64, Int64 def test_end_to_end(): @@ -43,9 +46,9 @@ def test_end_to_end(): entities=[driver], ttl=timedelta(days=500), schema=[ - Field(name="conv_rate",dtype=Float64), - Field(name="acc_rate",dtype=Float64), - Field(name="avg_daily_trips",dtype=Int64), + Field(name="conv_rate", dtype=Float64), + Field(name="acc_rate", dtype=Float64), + Field(name="avg_daily_trips", dtype=Int64), ], online=True, source=driver_hourly_stats, @@ -111,4 +114,4 @@ def test_cli(): if __name__ == "__main__": # pass test_end_to_end() - test_cli() \ No newline at end of file + test_cli()