From 126971e4d4bd9af603f80964485b5fab48846eb7 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 26 Apr 2022 11:29:48 -0700 Subject: [PATCH 1/3] ci: Add a postgres testcontainer for tests Signed-off-by: Achal Shah --- Makefile | 2 +- .../feast/infra/online_stores/contrib/postgres.py | 2 +- .../universal/data_sources/postgres.py | 13 ++++++++----- setup.py | 2 +- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/Makefile b/Makefile index 0ee6fcb6857..5bdec1f7831 100644 --- a/Makefile +++ b/Makefile @@ -79,7 +79,7 @@ test-python-universal-postgres: FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.postgres_repo_configuration \ FEAST_USAGE=False \ IS_TEST=True \ - python -m pytest --integration --universal \ + python -m pytest -x --integration --universal \ -k "not test_historical_retrieval_fails_on_validation and \ not test_historical_retrieval_with_validation and \ not test_historical_features_persisting and \ diff --git a/sdk/python/feast/infra/online_stores/contrib/postgres.py b/sdk/python/feast/infra/online_stores/contrib/postgres.py index 479b6018062..14ff806e4bd 100644 --- a/sdk/python/feast/infra/online_stores/contrib/postgres.py +++ b/sdk/python/feast/infra/online_stores/contrib/postgres.py @@ -64,7 +64,7 @@ def online_write_batch( created_ts, ) ) - # Controll the batch so that we can update the progress + # Control the batch so that we can update the progress batch_size = 5000 for i in range(0, len(insert_values), batch_size): cur_batch = insert_values[i : i + batch_size] diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/postgres.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/postgres.py index de5df6496f3..688b6a45077 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/postgres.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/postgres.py @@ -1,6 +1,7 @@ from typing import Dict, List, Optional import pandas as pd +from testcontainers.postgres import PostgresContainer from feast.data_source import DataSource from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( @@ -20,15 +21,16 @@ class PostgreSQLDataSourceCreator(DataSourceCreator): def __init__(self, project_name: str, *args, **kwargs): super().__init__(project_name) self.project_name = project_name - + self.container = PostgresContainer(user="postgres") + self.container.start() self.offline_store_config = PostgreSQLOfflineStoreConfig( type="postgres", host="localhost", - port=5432, - database="postgres", + port=self.container.get_exposed_port(5432), + database=self.container.POSTGRES_DB, db_schema="public", - user="postgres", - password="docker", + user=self.container.POSTGRES_USER, + password=self.container.POSTGRES_PASSWORD, ) def create_data_source( @@ -69,3 +71,4 @@ def teardown(self): with _get_conn(self.offline_store_config) as conn, conn.cursor() as cur: for table in self.tables: cur.execute("DROP TABLE IF EXISTS " + table) + self.container.stop() diff --git a/setup.py b/setup.py index 1ecc3df8209..8736482f669 100644 --- a/setup.py +++ b/setup.py @@ -150,7 +150,7 @@ "pytest-mock==1.10.4", "Sphinx!=4.0.0,<4.4.0", "sphinx-rtd-theme", - "testcontainers>=3.5", + "testcontainers[postgresql]>=3.5", "adlfs==0.5.9", "firebase-admin==4.5.2", "pre-commit", From 99ef7d2df6e6c965f14c1872e00b8bd748343335 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Sun, 8 May 2022 22:47:57 -0700 Subject: [PATCH 2/3] ci: Update postgres tests to use test containers Signed-off-by: Achal Shah --- Makefile | 5 +- .../contrib/postgres_repo_configuration.py | 11 +- sdk/python/tests/conftest.py | 52 +++++- .../universal/data_sources/postgres.py | 161 ++++++++++++++---- 4 files changed, 185 insertions(+), 44 deletions(-) diff --git a/Makefile b/Makefile index 5bdec1f7831..dec93545297 100644 --- a/Makefile +++ b/Makefile @@ -84,7 +84,10 @@ test-python-universal-postgres: not test_historical_retrieval_with_validation and \ not test_historical_features_persisting and \ not test_historical_retrieval_fails_on_validation and \ - not test_universal_cli" \ + not test_universal_cli and \ + not test_go_feature_server and \ + not test_feature_logging and \ + not test_universal_types" \ sdk/python/tests test-python-universal-local: diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_repo_configuration.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_repo_configuration.py index 91ab6868959..2b94600471d 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_repo_configuration.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_repo_configuration.py @@ -5,20 +5,11 @@ PostgreSQLDataSourceCreator, ) -POSTGRES_ONLINE_CONFIG = { - "type": "postgres", - "host": "localhost", - "port": "5432", - "database": "postgres", - "db_schema": "feature_store", - "user": "postgres", - "password": "docker", -} FULL_REPO_CONFIGS = [ IntegrationTestRepoConfig( provider="local", offline_store_creator=PostgreSQLDataSourceCreator, - online_store=POSTGRES_ONLINE_CONFIG, + online_store_creator=PostgreSQLDataSourceCreator, ), ] diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 2ff97649d64..e2ade6850bb 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -204,16 +204,66 @@ def teardown(): return TrinoContainerSingleton +class PostgresContainerSingleton: + container = None + is_running = False + + postgres_user = "test" + postgres_password = "test" + postgres_db = "test" + + @classmethod + def get_singleton(cls): + if not cls.is_running: + cls.container = ( + DockerContainer("postgres:latest") + .with_exposed_ports(5432) + .with_env("POSTGRES_USER", cls.postgres_user) + .with_env("POSTGRES_PASSWORD", cls.postgres_password) + .with_env("POSTGRES_DB", cls.postgres_db) + ) + + cls.container.start() + log_string_to_wait_for = "database system is ready to accept connections" + waited = wait_for_logs( + container=cls.container, predicate=log_string_to_wait_for, timeout=30, interval=10 + ) + logger.info("Waited for %s seconds until postgres container was up", waited) + cls.is_running = True + return cls.container + + @classmethod + def teardown(cls): + if cls.container: + cls.container.stop() + + +@pytest.fixture(scope="session") +def postgres_fixture(request): + def teardown(): + PostgresContainerSingleton.teardown() + + request.addfinalizer(teardown) + return PostgresContainerSingleton + + + @pytest.fixture( params=FULL_REPO_CONFIGS, scope="session", ids=[str(c) for c in FULL_REPO_CONFIGS] ) -def environment(request, worker_id: str, trino_fixture): +def environment(request, worker_id: str, trino_fixture, postgres_fixture): if "TrinoSourceCreator" in request.param.offline_store_creator.__name__: e = construct_test_environment( request.param, worker_id=worker_id, offline_container=trino_fixture.get_singleton(), ) + elif "PostgresSourceCreator" in request.param.offline_store_creator.__name__: + e = construct_test_environment( + request.param, + worker_id=worker_id, + offline_container=postgres_fixture.get_singleton(), + ) else: e = construct_test_environment(request.param, worker_id=worker_id) proc = Process( diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/postgres.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/postgres.py index 688b6a45077..12d1869d7ef 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/postgres.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/postgres.py @@ -1,53 +1,92 @@ -from typing import Dict, List, Optional +import logging +from typing import Dict, Optional import pandas as pd -from testcontainers.postgres import PostgresContainer +from testcontainers.core.container import DockerContainer +from testcontainers.core.waiting_utils import wait_for_logs from feast.data_source import DataSource from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( PostgreSQLOfflineStoreConfig, PostgreSQLSource, ) -from feast.infra.utils.postgres.connection_utils import _get_conn, df_to_postgres_table +from feast.infra.utils.postgres.connection_utils import df_to_postgres_table from feast.repo_config import FeastConfigBaseModel from tests.integration.feature_repos.universal.data_source_creator import ( DataSourceCreator, ) +from tests.integration.feature_repos.universal.online_store_creator import OnlineStoreCreator +logger = logging.getLogger(__name__) -class PostgreSQLDataSourceCreator(DataSourceCreator): - tables: List[str] = [] - def __init__(self, project_name: str, *args, **kwargs): - super().__init__(project_name) - self.project_name = project_name - self.container = PostgresContainer(user="postgres") - self.container.start() - self.offline_store_config = PostgreSQLOfflineStoreConfig( +class PostgresSourceCreatorSingleton: + postgres_user = "test" + postgres_password = "test" + postgres_db = "test" + + running = False + + project_name = None + container = None + provided_container = None + + offline_store_config = None + + @classmethod + def initialize(cls, project_name: str, *args, **kwargs): + cls.project_name = project_name + + if "offline_container" not in kwargs or not kwargs.get( + "offline_container", None + ): + # If we don't get an offline container provided, we try to create it on the fly. + # the problem here is that each test creates its own container, which basically + # browns out developer laptops. + cls.container = ( + DockerContainer("postgres:latest") + .with_exposed_ports(5432) + .with_env("POSTGRES_USER", cls.postgres_user) + .with_env("POSTGRES_PASSWORD", cls.postgres_password) + .with_env("POSTGRES_DB", cls.postgres_db) + ) + + cls.container.start() + cls.provided_container = False + log_string_to_wait_for = "database system is ready to accept connections" + waited = wait_for_logs( + container=cls.container, predicate=log_string_to_wait_for, timeout=30, interval=10 + ) + logger.info("Waited for %s seconds until postgres container was up", waited) + cls.running = True + else: + cls.provided_container = True + cls.container = kwargs["offline_container"] + + cls.offline_store_config = PostgreSQLOfflineStoreConfig( type="postgres", host="localhost", - port=self.container.get_exposed_port(5432), - database=self.container.POSTGRES_DB, + port=cls.container.get_exposed_port(5432), + database=cls.container.env["POSTGRES_DB"], db_schema="public", - user=self.container.POSTGRES_USER, - password=self.container.POSTGRES_PASSWORD, + user=cls.container.env["POSTGRES_USER"], + password=cls.container.env["POSTGRES_PASSWORD"], ) + @classmethod def create_data_source( - self, - df: pd.DataFrame, - destination_name: str, - suffix: Optional[str] = None, - timestamp_field="ts", - created_timestamp_column="created_ts", - field_mapping: Dict[str, str] = None, + cls, + df: pd.DataFrame, + destination_name: str, + suffix: Optional[str] = None, + timestamp_field="ts", + created_timestamp_column="created_ts", + field_mapping: Dict[str, str] = None, ) -> DataSource: - destination_name = self.get_prefixed_table_name(destination_name) - - df_to_postgres_table(self.offline_store_config, df, destination_name) + destination_name = cls.get_prefixed_table_name(destination_name) - self.tables.append(destination_name) + df_to_postgres_table(cls.offline_store_config, df, destination_name) return PostgreSQLSource( name=destination_name, @@ -57,18 +96,76 @@ def create_data_source( field_mapping=field_mapping or {"ts_1": "ts"}, ) + @classmethod + def create_offline_store_config(cls) -> FeastConfigBaseModel: + return cls.offline_store_config + + @classmethod + def get_prefixed_table_name(cls, suffix: str) -> str: + return f"{cls.project_name}_{suffix}" + + @classmethod + def create_online_store(cls) -> Dict[str, str]: + return { + "type": "postgres", + "host": "localhost", + "port": cls.container.get_exposed_port(5432), + "database": cls.postgres_db, + "db_schema": "feature_store", + "user": cls.postgres_user, + "password": cls.postgres_password, + } + + @classmethod + def create_saved_dataset_destination(cls): + # FIXME: ... + return None + + @classmethod + def teardown(cls): + if not cls.provided_container and cls.running: + cls.container.stop() + cls.running = False + cls.container = None + cls.project = None + + +class PostgreSQLDataSourceCreator(DataSourceCreator, OnlineStoreCreator): + + postgres_user = "test" + postgres_password = "test" + postgres_db = "test" + + running = False + + def __init__(self, project_name: str, *args, **kwargs): + super().__init__(project_name) + PostgresSourceCreatorSingleton.initialize(project_name, args, kwargs) + + def create_data_source( + self, + df: pd.DataFrame, + destination_name: str, + suffix: Optional[str] = None, + timestamp_field="ts", + created_timestamp_column="created_ts", + field_mapping: Dict[str, str] = None, + ) -> DataSource: + + return PostgresSourceCreatorSingleton.create_data_source(df, destination_name, suffix, timestamp_field, created_timestamp_column, field_mapping) + def create_offline_store_config(self) -> FeastConfigBaseModel: - return self.offline_store_config + return PostgresSourceCreatorSingleton.create_offline_store_config() def get_prefixed_table_name(self, suffix: str) -> str: - return f"{self.project_name}_{suffix}" + return PostgresSourceCreatorSingleton.get_prefixed_table_name(suffix) + + def create_online_store(self) -> Dict[str, str]: + return PostgresSourceCreatorSingleton.create_online_store() def create_saved_dataset_destination(self): # FIXME: ... return None def teardown(self): - with _get_conn(self.offline_store_config) as conn, conn.cursor() as cur: - for table in self.tables: - cur.execute("DROP TABLE IF EXISTS " + table) - self.container.stop() + PostgresSourceCreatorSingleton.teardown() From 53a01706c23b2844eaafd58d891a3a36e3680247 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Sun, 8 May 2022 22:48:05 -0700 Subject: [PATCH 3/3] ci: Update postgres tests to use test containers Signed-off-by: Achal Shah --- CONTRIBUTING.md | 1 + .../contrib/postgres_repo_configuration.py | 1 - sdk/python/tests/conftest.py | 14 +++--- .../universal/data_sources/postgres.py | 49 ++++++++++++------- 4 files changed, 41 insertions(+), 24 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 249ea5afcec..726ef1a5bad 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -178,6 +178,7 @@ The services with containerized replacements currently implemented are: - Redis - Trino - HBase +- Postgres You can run `make test-python-integration-container` to run tests against the containerized versions of dependencies. diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_repo_configuration.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_repo_configuration.py index 2b94600471d..bb7fd0caf5e 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_repo_configuration.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_repo_configuration.py @@ -5,7 +5,6 @@ PostgreSQLDataSourceCreator, ) - FULL_REPO_CONFIGS = [ IntegrationTestRepoConfig( provider="local", diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index e2ade6850bb..b57152c82d9 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -217,16 +217,19 @@ def get_singleton(cls): if not cls.is_running: cls.container = ( DockerContainer("postgres:latest") - .with_exposed_ports(5432) - .with_env("POSTGRES_USER", cls.postgres_user) - .with_env("POSTGRES_PASSWORD", cls.postgres_password) - .with_env("POSTGRES_DB", cls.postgres_db) + .with_exposed_ports(5432) + .with_env("POSTGRES_USER", cls.postgres_user) + .with_env("POSTGRES_PASSWORD", cls.postgres_password) + .with_env("POSTGRES_DB", cls.postgres_db) ) cls.container.start() log_string_to_wait_for = "database system is ready to accept connections" waited = wait_for_logs( - container=cls.container, predicate=log_string_to_wait_for, timeout=30, interval=10 + container=cls.container, + predicate=log_string_to_wait_for, + timeout=30, + interval=10, ) logger.info("Waited for %s seconds until postgres container was up", waited) cls.is_running = True @@ -247,7 +250,6 @@ def teardown(): return PostgresContainerSingleton - @pytest.fixture( params=FULL_REPO_CONFIGS, scope="session", ids=[str(c) for c in FULL_REPO_CONFIGS] ) diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/postgres.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/postgres.py index 12d1869d7ef..134234f93c8 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/postgres.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/postgres.py @@ -15,7 +15,9 @@ from tests.integration.feature_repos.universal.data_source_creator import ( DataSourceCreator, ) -from tests.integration.feature_repos.universal.online_store_creator import OnlineStoreCreator +from tests.integration.feature_repos.universal.online_store_creator import ( + OnlineStoreCreator, +) logger = logging.getLogger(__name__) @@ -38,24 +40,27 @@ def initialize(cls, project_name: str, *args, **kwargs): cls.project_name = project_name if "offline_container" not in kwargs or not kwargs.get( - "offline_container", None + "offline_container", None ): # If we don't get an offline container provided, we try to create it on the fly. # the problem here is that each test creates its own container, which basically # browns out developer laptops. cls.container = ( DockerContainer("postgres:latest") - .with_exposed_ports(5432) - .with_env("POSTGRES_USER", cls.postgres_user) - .with_env("POSTGRES_PASSWORD", cls.postgres_password) - .with_env("POSTGRES_DB", cls.postgres_db) + .with_exposed_ports(5432) + .with_env("POSTGRES_USER", cls.postgres_user) + .with_env("POSTGRES_PASSWORD", cls.postgres_password) + .with_env("POSTGRES_DB", cls.postgres_db) ) cls.container.start() cls.provided_container = False log_string_to_wait_for = "database system is ready to accept connections" waited = wait_for_logs( - container=cls.container, predicate=log_string_to_wait_for, timeout=30, interval=10 + container=cls.container, + predicate=log_string_to_wait_for, + timeout=30, + interval=10, ) logger.info("Waited for %s seconds until postgres container was up", waited) cls.running = True @@ -75,18 +80,19 @@ def initialize(cls, project_name: str, *args, **kwargs): @classmethod def create_data_source( - cls, - df: pd.DataFrame, - destination_name: str, - suffix: Optional[str] = None, - timestamp_field="ts", - created_timestamp_column="created_ts", - field_mapping: Dict[str, str] = None, + cls, + df: pd.DataFrame, + destination_name: str, + suffix: Optional[str] = None, + timestamp_field="ts", + created_timestamp_column="created_ts", + field_mapping: Dict[str, str] = None, ) -> DataSource: destination_name = cls.get_prefixed_table_name(destination_name) - df_to_postgres_table(cls.offline_store_config, df, destination_name) + if cls.offline_store_config: + df_to_postgres_table(cls.offline_store_config, df, destination_name) return PostgreSQLSource( name=destination_name, @@ -97,7 +103,8 @@ def create_data_source( ) @classmethod - def create_offline_store_config(cls) -> FeastConfigBaseModel: + def create_offline_store_config(cls) -> PostgreSQLOfflineStoreConfig: + assert cls.offline_store_config return cls.offline_store_config @classmethod @@ -106,6 +113,7 @@ def get_prefixed_table_name(cls, suffix: str) -> str: @classmethod def create_online_store(cls) -> Dict[str, str]: + assert cls.container return { "type": "postgres", "host": "localhost", @@ -152,7 +160,14 @@ def create_data_source( field_mapping: Dict[str, str] = None, ) -> DataSource: - return PostgresSourceCreatorSingleton.create_data_source(df, destination_name, suffix, timestamp_field, created_timestamp_column, field_mapping) + return PostgresSourceCreatorSingleton.create_data_source( + df, + destination_name, + suffix, + timestamp_field, + created_timestamp_column, + field_mapping, + ) def create_offline_store_config(self) -> FeastConfigBaseModel: return PostgresSourceCreatorSingleton.create_offline_store_config()