From 0c98a5b2c65fa3b528f230687776cb57dedd456a Mon Sep 17 00:00:00 2001 From: Alex Vinnik Date: Wed, 17 Jan 2024 17:25:45 -0600 Subject: [PATCH 1/3] SAASMLOPS-1058-2 Support s3gov schema by snowflake offline store during materialization Signed-off-by: Alex Vinnik --- sdk/python/feast/infra/offline_stores/snowflake.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 38568ce79b2..0fd9e7a29c7 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -588,8 +588,10 @@ def to_remote_storage(self) -> List[str]: file_name_column_index = [ idx for idx, rm in enumerate(cursor.description) if rm.name == "FILE_NAME" ][0] + # s3gov schema is used by Snowflake in AWS govcloud regions + native_export_path = self.export_path.replace("s3gov://", "s3://") return [ - f"{self.export_path}/{row[file_name_column_index]}" + f"{native_export_path}/{row[file_name_column_index]}" for row in cursor.fetchall() ] From 4fb6a7924ee3de168c94e9df6d6f2b1b243d1906 Mon Sep 17 00:00:00 2001 From: Alex Vinnik Date: Fri, 19 Jan 2024 20:35:32 -0600 Subject: [PATCH 2/3] SAASMLOPS-1058-2 Add unit test for to_remote_storage Signed-off-by: Alex Vinnik --- .../feast/infra/offline_stores/snowflake.py | 8 ++- .../infra/offline_stores/test_snowflake.py | 53 +++++++++++++++++++ 2 files changed, 59 insertions(+), 2 deletions(-) create mode 100644 sdk/python/tests/unit/infra/offline_stores/test_snowflake.py diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 0fd9e7a29c7..1ce482ff507 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -584,12 +584,16 @@ def to_remote_storage(self) -> List[str]: HEADER = TRUE """ cursor = execute_snowflake_statement(self.snowflake_conn, query) + # s3gov schema is used by Snowflake in AWS govcloud regions + # remove gov portion from schema and pass it to online store upload + native_export_path = self.export_path.replace("s3gov://", "s3://") + return self._get_file_names_from_copy_into(cursor, native_export_path) + + def _get_file_names_from_copy_into(self, cursor, native_export_path) -> List[str]: file_name_column_index = [ idx for idx, rm in enumerate(cursor.description) if rm.name == "FILE_NAME" ][0] - # s3gov schema is used by Snowflake in AWS govcloud regions - native_export_path = self.export_path.replace("s3gov://", "s3://") return [ f"{native_export_path}/{row[file_name_column_index]}" for row in cursor.fetchall() diff --git a/sdk/python/tests/unit/infra/offline_stores/test_snowflake.py b/sdk/python/tests/unit/infra/offline_stores/test_snowflake.py new file mode 100644 index 00000000000..1b43ad9cf6a --- /dev/null +++ b/sdk/python/tests/unit/infra/offline_stores/test_snowflake.py @@ -0,0 +1,53 @@ +from unittest.mock import Mock, MagicMock, patch, ANY +import pytest + +from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig + +from feast.infra.offline_stores.snowflake import ( + SnowflakeOfflineStoreConfig, + SnowflakeRetrievalJob, +) + +from feast.repo_config import RepoConfig + +import re + +@pytest.fixture(params=['s3','s3gov']) +def retrieval_job(request): + offline_store_config = SnowflakeOfflineStoreConfig( + type="snowflake.offline", + account="snow", + user="snow", + password="snow", + role="snow", + warehouse="snow", + database="FEAST", + schema="OFFLINE", + storage_integration_name="FEAST_S3", + blob_export_location=f"{request.param}://feast-snowflake-offload/export", + ) + retrieval_job = SnowflakeRetrievalJob( + query="SELECT * FROM snowflake", + snowflake_conn=MagicMock(), + config=RepoConfig( + registry="s3://ml-test/repo/registry.db", + project="test", + provider="snowflake.offline", + online_store=SqliteOnlineStoreConfig(type="sqlite"), + offline_store=offline_store_config, + ), + full_feature_names=True, + on_demand_feature_views=[], + ) + return retrieval_job + + +def test_to_remote_storage(retrieval_job): + stored_files = ["just a path", "maybe another"] + with patch.object(retrieval_job, "to_snowflake", return_value=None) as mock_to_snowflake, \ + patch.object(retrieval_job, "_get_file_names_from_copy_into", return_value=stored_files) as mock_get_file_names_from_copy: + assert retrieval_job.to_remote_storage() == stored_files, "should return the list of files" + mock_to_snowflake.assert_called_once() + mock_get_file_names_from_copy.assert_called_once_with(ANY, ANY) + native_path = mock_get_file_names_from_copy.call_args[0][1] + assert re.match(f"^s3://.*", native_path), "path should be s3://*" From 00be0e47e65309c6be197aa234c426d9d11f7258 Mon Sep 17 00:00:00 2001 From: Alex Vinnik Date: Fri, 19 Jan 2024 20:50:36 -0600 Subject: [PATCH 3/3] SAASMLOPS-1058-2 Reformat code Signed-off-by: Alex Vinnik --- .../feast/infra/offline_stores/snowflake.py | 1 - .../infra/offline_stores/test_snowflake.py | 24 +++++++++++-------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 1ce482ff507..d3e84f12268 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -589,7 +589,6 @@ def to_remote_storage(self) -> List[str]: native_export_path = self.export_path.replace("s3gov://", "s3://") return self._get_file_names_from_copy_into(cursor, native_export_path) - def _get_file_names_from_copy_into(self, cursor, native_export_path) -> List[str]: file_name_column_index = [ idx for idx, rm in enumerate(cursor.description) if rm.name == "FILE_NAME" diff --git a/sdk/python/tests/unit/infra/offline_stores/test_snowflake.py b/sdk/python/tests/unit/infra/offline_stores/test_snowflake.py index 1b43ad9cf6a..afc3ae97aef 100644 --- a/sdk/python/tests/unit/infra/offline_stores/test_snowflake.py +++ b/sdk/python/tests/unit/infra/offline_stores/test_snowflake.py @@ -1,18 +1,17 @@ -from unittest.mock import Mock, MagicMock, patch, ANY -import pytest +import re +from unittest.mock import ANY, MagicMock, patch -from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig +import pytest from feast.infra.offline_stores.snowflake import ( SnowflakeOfflineStoreConfig, SnowflakeRetrievalJob, ) - +from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig from feast.repo_config import RepoConfig -import re -@pytest.fixture(params=['s3','s3gov']) +@pytest.fixture(params=["s3", "s3gov"]) def retrieval_job(request): offline_store_config = SnowflakeOfflineStoreConfig( type="snowflake.offline", @@ -44,10 +43,15 @@ def retrieval_job(request): def test_to_remote_storage(retrieval_job): stored_files = ["just a path", "maybe another"] - with patch.object(retrieval_job, "to_snowflake", return_value=None) as mock_to_snowflake, \ - patch.object(retrieval_job, "_get_file_names_from_copy_into", return_value=stored_files) as mock_get_file_names_from_copy: - assert retrieval_job.to_remote_storage() == stored_files, "should return the list of files" + with patch.object( + retrieval_job, "to_snowflake", return_value=None + ) as mock_to_snowflake, patch.object( + retrieval_job, "_get_file_names_from_copy_into", return_value=stored_files + ) as mock_get_file_names_from_copy: + assert ( + retrieval_job.to_remote_storage() == stored_files + ), "should return the list of files" mock_to_snowflake.assert_called_once() mock_get_file_names_from_copy.assert_called_once_with(ANY, ANY) native_path = mock_get_file_names_from_copy.call_args[0][1] - assert re.match(f"^s3://.*", native_path), "path should be s3://*" + assert re.match("^s3://.*", native_path), "path should be s3://*"