diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml index 48354b92d4a..d2d3d6e4977 100644 --- a/.github/workflows/linter.yml +++ b/.github/workflows/linter.yml @@ -64,6 +64,6 @@ jobs: run: | pip install --upgrade "pip>=21.3.1" - name: Install dependencies - run: make install-go-ci-dependencies + run: make install-go-proto-dependencies - name: Lint go run: make lint-go \ No newline at end of file diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index b8b3a126c0d..696a186ed8d 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -84,7 +84,7 @@ jobs: with: go-version: 1.17.7 - name: Install dependencies - run: make install-go-ci-dependencies + run: make install-go-proto-dependencies - name: Compile protos run: make compile-protos-go - name: Test diff --git a/Makefile b/Makefile index a51e6331b5f..15ac9304723 100644 --- a/Makefile +++ b/Makefile @@ -35,7 +35,7 @@ build: protos build-java build-docker build-html # Python SDK -install-python-ci-dependencies: install-go-ci-dependencies +install-python-ci-dependencies: install-go-proto-dependencies cd sdk/python && python -m piptools sync requirements/py$(PYTHON)-ci-requirements.txt cd sdk/python && COMPILE_GO=true python setup.py develop @@ -125,19 +125,21 @@ build-java-no-tests: # Go SDK -install-go-ci-dependencies: +install-go-proto-dependencies: go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.26.0 go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.1.0 -compile-protos-go: install-go-ci-dependencies +install-protoc-dependencies: pip install grpcio-tools==1.34.0 + +compile-protos-go: install-go-proto-dependencies install-protoc-dependencies python sdk/python/setup.py build_go_protos compile-go-feature-server: compile-protos-go go mod tidy go build -o ${ROOT_DIR}/sdk/python/feast/binaries/goserver github.com/feast-dev/feast/go/cmd/goserver -test-go: install-go-ci-dependencies +test-go: compile-protos-go go test ./... format-go: diff --git a/go/cmd/goserver/main.go b/go/cmd/goserver/main.go index e06c71ab23c..f2e59b878e3 100644 --- a/go/cmd/goserver/main.go +++ b/go/cmd/goserver/main.go @@ -23,7 +23,6 @@ type FeastEnvConfig struct { } // TODO: Add a proper logging library such as https://github.com/Sirupsen/logrus - func main() { var feastEnvConfig FeastEnvConfig diff --git a/sdk/python/$/Users/achal/tecton/feast/sdk/python/feast/binaries/goserver b/sdk/python/$/Users/achal/tecton/feast/sdk/python/feast/binaries/goserver deleted file mode 100755 index 0b18e4d6d3a..00000000000 Binary files a/sdk/python/$/Users/achal/tecton/feast/sdk/python/feast/binaries/goserver and /dev/null differ diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 8641037d353..cf39f7dca36 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -108,10 +108,7 @@ class FeatureStore: @log_exceptions def __init__( - self, - repo_path: Optional[str] = None, - config: Optional[RepoConfig] = None, - go_server_use_thread: bool = False, + self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None, ): """ Creates a FeatureStore object. @@ -135,7 +132,6 @@ def __init__( self._registry._initialize_registry() self._provider = get_provider(self.config, self.repo_path) self._go_server = None - self._go_server_use_thread = go_server_use_thread @log_exceptions def version(self) -> str: @@ -733,6 +729,10 @@ def apply( service.name, project=self.project, commit=False ) + # If a go server is running, kill it so that it can be recreated in `update_infra` with + # the latest registry state. + self.kill_go_server() + self._get_provider().update_infra( project=self.project, tables_to_delete=views_to_delete if not partial else [], @@ -754,6 +754,8 @@ def teardown(self): entities = self.list_entities() + self.kill_go_server() + self._get_provider().teardown_infra(self.project, tables, entities) self._registry.teardown() @@ -1233,11 +1235,8 @@ def get_online_features( if self.config.go_feature_server: # Lazily start the go server on the first request if self._go_server is None: - self._go_server = GoServer( - str(self.repo_path.absolute()), - self.config, - self._go_server_use_thread, - ) + self._go_server = GoServer(str(self.repo_path.absolute()), self.config,) + self._go_server._shared_connection._check_grpc_connection() return self._go_server.get_online_features( features, columnar, full_feature_names ) @@ -1860,12 +1859,7 @@ def serve_transformations(self, port: int) -> None: def kill_go_server(self): if self._go_server: self._go_server.kill_go_server_explicitly() - - def set_go_server_use_thread(self, use: bool): - if self._go_server: - self._go_server.set_use_thread(use) - else: - self._go_server_use_thread = use + self._go_server = None def _validate_entity_values(join_key_values: Dict[str, List[Value]]): diff --git a/sdk/python/feast/go_server.py b/sdk/python/feast/go_server.py index a9ddd9bb825..9e94d713710 100644 --- a/sdk/python/feast/go_server.py +++ b/sdk/python/feast/go_server.py @@ -12,13 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. import atexit -import ctypes +import logging import os -import pathlib import platform import random -import shutil -import signal import string import subprocess import tempfile @@ -44,6 +41,8 @@ from feast.repo_config import RepoConfig from feast.type_map import python_values_to_proto_values +_logger = logging.getLogger(__name__) + class GoServerConnection: def __init__(self, config: RepoConfig, repo_path: str): @@ -51,6 +50,13 @@ def __init__(self, config: RepoConfig, repo_path: str): self._config = config self._repo_path = repo_path self.temp_dir = tempfile.TemporaryDirectory() + self._client: Optional[ServingServiceStub] = None + + @property + def client(self): + if self._client: + return self._client + raise RuntimeError("Client not established with go subprocess") def _get_unix_domain_file_path(self) -> Path: # This method should return a file that go server should listen on and that the python channel @@ -81,7 +87,7 @@ def connect(self) -> bool: self._process = Popen([executable], cwd=cwd, env=env,) channel = grpc.insecure_channel(f"unix:{self.sock_file}") - self.client: ServingServiceStub = ServingServiceStub(channel) + self._client = ServingServiceStub(channel) try: self._check_grpc_connection() @@ -91,11 +97,10 @@ def connect(self) -> bool: def kill_process(self): if self._process: - # self._process.terminate() - self._process.send_signal(signal.SIGINT) + self._process.terminate() def is_process_alive(self): - return self._process and self._process.poll() + return self._process and self._process.poll() is None def wait_for_process(self, timeout): self._process.wait(timeout) @@ -107,7 +112,7 @@ def wait_for_process(self, timeout): wait=wait_exponential(multiplier=0.1, min=0.1, max=5), ) def _check_grpc_connection(self): - self.client.GetFeastServingInfo(request=GetFeastServingInfoRequest()) + return self.client.GetFeastServingInfo(request=GetFeastServingInfoRequest()) class GoServer: @@ -118,57 +123,20 @@ class GoServer: Attributes: _repo_path: The path to the Feast repo for which this go server is defined. _config: The RepoConfig for the Feast repo for which this go server is defined. - _go_server_use_thread: set whether or not to use a background thread to monitor go server """ _repo_path: str _config: RepoConfig - _go_server_use_thread: bool - def __init__( - self, repo_path: str, config: RepoConfig, go_server_use_thread: bool = False, - ): + def __init__(self, repo_path: str, config: RepoConfig): """Creates a GoServer object.""" self._repo_path = repo_path self._config = config self._go_server_started = threading.Event() - self._use_thread = go_server_use_thread self._shared_connection = GoServerConnection(config, repo_path) self._dev_mode = "dev" in feast.__version__ - if not is_test() and self._dev_mode: - self._build_binaries() - - if self._check_use_thread(): - self._start_go_server_use_thread() - else: - self._start_go_server() - - def _check_use_thread(self): - return self._use_thread - - def set_use_thread(self, use: bool): - self._use_thread = use - def _build_binaries(self): - - goos = platform.system().lower() - goarch = "amd64" if platform.machine() == "x86_64" else "arm64" - binaries_path = (pathlib.Path(__file__).parent / "../feast/binaries").resolve() - binaries_path_abs = str(binaries_path.absolute()) - if binaries_path.exists(): - shutil.rmtree(binaries_path_abs) - os.mkdir(binaries_path_abs) - - subprocess.check_output( - [ - "go", - "build", - "-o", - f"{binaries_path_abs}/goserver_{goos}_{goarch}", - "github.com/feast-dev/feast/go/cmd/goserver", - ], - env={"GOOS": goos, "GOARCH": goarch, **os.environ}, - ) + self._start_go_server_use_thread() def get_online_features( self, @@ -198,7 +166,7 @@ def get_online_features( ValueError: If some other error occurs. """ # Wait for go server subprocess to restart before asking for features - if self._check_use_thread() and not self._go_server_started.is_set(): + if not self._go_server_started.is_set(): self._go_server_started.wait() request = GetOnlineFeaturesRequest(full_feature_names=full_feature_names) @@ -220,9 +188,7 @@ def get_online_features( if rpc_error.code() == grpc.StatusCode.UNAVAILABLE: # If the server became unavailable, it could mean that the subprocess died or fell # into a bad state, so the resolution is to wait for go server to restart in the background - if not self._check_use_thread(): - self._start_go_server() - elif not self._go_server_started.is_set(): + if not self._go_server_started.is_set(): self._go_server_started.wait() # Retry request with the new Go subprocess response = self._shared_connection.client.GetOnlineFeatures( @@ -249,96 +215,84 @@ def get_online_features( def _start_go_server_use_thread(self): - self._go_server_background_thread = GoServerBackgroundThread( - "GoServerBackgroundThread", - self._shared_connection, - self._go_server_started, + self._go_server_background_thread = GoServerMonitorThread( + "GoServerMonitorThread", self._shared_connection, self._go_server_started ) self._go_server_background_thread.start() - atexit.register(lambda: self._go_server_background_thread.stop_go_server()) - signal.signal( - signal.SIGTERM, - lambda sig, frame: self._go_server_background_thread.stop_go_server(), - ) - signal.signal( - signal.SIGINT, - lambda sig, frame: self._go_server_background_thread.stop_go_server(), - ) + atexit.register(lambda: self._go_server_background_thread.stop()) # Wait for go server subprocess to start for the first time before returning self._go_server_started.wait() - def _start_go_server(self): - if self._shared_connection.is_process_alive(): - self._shared_connection.kill_process() - - self._shared_connection.connect() - atexit.register(lambda: self._shared_connection.kill_process()) - signal.signal( - signal.SIGTERM, lambda sig, frame: self._shared_connection.kill_process() - ) - signal.signal( - signal.SIGINT, lambda sig, frame: self._shared_connection.kill_process() - ) - def kill_go_server_explicitly(self): - if self._check_use_thread(): - self._go_server_background_thread.stop_go_server() - else: - self._shared_connection.kill_process() + self._go_server_background_thread._is_cancelled.set() + self._go_server_background_thread.stop() + self._go_server_background_thread.join() -# https://www.geeksforgeeks.org/python-different-ways-to-kill-a-thread/ -class GoServerBackgroundThread(threading.Thread): +class GoServerMonitorThread(threading.Thread): def __init__( self, name: str, shared_connection: GoServerConnection, - go_server_started: threading.Event, + go_server_first_started: threading.Event, ): threading.Thread.__init__(self) self.name = name self._shared_connection = shared_connection - self._go_server_started = go_server_started + self._is_cancelled = threading.Event() + self.daemon = True + self._go_server_started = go_server_first_started def run(self): # Target function of the thread class + _logger.debug( + "%s Started monitoring thread to keep go feature server alive", self.ident + ) try: - while True: - self._go_server_started.clear() + while not self._is_cancelled.is_set(): # If we fail to connect to grpc stub, terminate subprocess and repeat + _logger.debug("%s Connecting to subprocess", self.ident) if not self._shared_connection.connect(): + _logger.debug( + "%s Failed to connect, killing and retrying", self.ident + ) self._shared_connection.kill_process() continue - self._go_server_started.set() - while True: + else: + _logger.debug( + "%s Go feature server started, process: %s", + self.ident, + self._shared_connection._process.pid, + ) + self._go_server_started.set() + _logger.debug( + "%s is_cancelled status: %s", self.ident, self._is_cancelled + ) + while not self._is_cancelled.is_set(): try: # Making a blocking wait by setting timeout to a very long time so we don't waste cpu cycle self._shared_connection.wait_for_process(3600) except subprocess.TimeoutExpired: pass + _logger.debug( + "%s No longer waiting for process: %s, %s, %s", + self.ident, + self._shared_connection._process.pid, + self._shared_connection._process.returncode, + self._shared_connection.is_process_alive(), + ) if not self._shared_connection.is_process_alive(): break finally: # Main thread exits self._shared_connection.kill_process() - def stop_go_server(self): - thread_id = self._get_id() - res = ctypes.pythonapi.PyThreadState_SetAsyncExc( - thread_id, ctypes.py_object(SystemExit) + def stop(self): + _logger.debug( + "%s Stopping monitoring thread and terminating go feature server", + self.ident, ) - # TODO: Review that kill process here but run also has to stop - if res > 1: - ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0) - else: - self._shared_connection.kill_process() - - def _get_id(self): - # returns id of the respective thread - if hasattr(self, "_thread_id"): - return self._thread_id - for id, thread in threading._active.items(): - if thread is self: - return id + self._is_cancelled.set() + self._shared_connection.kill_process() diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index c5a15c8a91b..7249d247a22 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -63,4 +63,5 @@ def _write_registry(self, registry_proto: RegistryProto): registry_proto.last_updated.FromDatetime(datetime.utcnow()) file_dir = self._filepath.parent file_dir.mkdir(exist_ok=True) - self._filepath.write_bytes(registry_proto.SerializeToString()) + with open(self._filepath, mode="wb", buffering=0) as f: + f.write(registry_proto.SerializeToString()) diff --git a/sdk/python/feast/protos/__init__.py b/sdk/python/feast/protos/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/sdk/python/feast/protos/feast/__init__.py b/sdk/python/feast/protos/feast/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/sdk/python/feast/protos/feast/core/__init__.py b/sdk/python/feast/protos/feast/core/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/sdk/python/feast/protos/feast/serving/__init__.py b/sdk/python/feast/protos/feast/serving/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/sdk/python/feast/protos/feast/storage/__init__.py b/sdk/python/feast/protos/feast/storage/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/sdk/python/feast/protos/feast/types/__init__.py b/sdk/python/feast/protos/feast/types/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/sdk/python/setup.py b/sdk/python/setup.py index d25059933e0..e4fe7ea9468 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -196,7 +196,7 @@ def finalize_options(self): def _generate_python_protos(self, path: str): proto_files = glob.glob(os.path.join(self.proto_folder, path)) - + Path(self.python_folder).mkdir(exist_ok=True) subprocess.check_call( self.python_protoc + [ diff --git a/sdk/python/tests/integration/feature_repos/integration_test_repo_config.py b/sdk/python/tests/integration/feature_repos/integration_test_repo_config.py index 4364a168338..25650eced9f 100644 --- a/sdk/python/tests/integration/feature_repos/integration_test_repo_config.py +++ b/sdk/python/tests/integration/feature_repos/integration_test_repo_config.py @@ -24,15 +24,19 @@ class IntegrationTestRepoConfig: infer_features: bool = False python_feature_server: bool = False go_feature_server: bool = False - go_server_use_thread: bool = False def __repr__(self) -> str: - return "-".join( + if isinstance(self.online_store, str): + online_store_type = self.online_store + elif self.online_store["type"] == "redis": + online_store_type = self.online_store.get("redis_type", "redis") + else: + online_store_type = self.online_store["type"] + + return ":".join( [ - f"{self.provider.upper()}:", + f"{self.provider.upper()}", f"{self.offline_store_creator.__name__.split('.')[-1].replace('DataSourceCreator', '')}", - self.online_store - if isinstance(self.online_store, str) - else self.online_store["type"], + online_store_type, ] ) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index cb77d7dc2ad..760d2affdc5 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -117,16 +117,10 @@ GO_REPO_CONFIGS = [ IntegrationTestRepoConfig(online_store=REDIS_CONFIG, go_feature_server=True,), - IntegrationTestRepoConfig( - online_store=REDIS_CONFIG, go_feature_server=True, go_server_use_thread=True, - ), ] GO_CYCLE_REPO_CONFIGS = [ IntegrationTestRepoConfig(online_store=REDIS_CONFIG, go_feature_server=True,), - IntegrationTestRepoConfig( - online_store=REDIS_CONFIG, go_feature_server=True, go_server_use_thread=True, - ), ] diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 33e146aa199..569d9f92a51 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -1,6 +1,7 @@ import datetime import itertools import os +import signal import time import unittest from datetime import timedelta @@ -607,12 +608,12 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name entity_rows = [ { - "driver": driver, - "customer_id": customer, + "driver": _driver, + "customer_id": _customer, "origin_id": origin, "destination_id": destination, } - for (driver, customer, origin, destination) in zip( + for (_driver, _customer, origin, destination) in zip( sample_drivers, sample_customers, *sample_location_pairs ) ] @@ -866,12 +867,12 @@ def test_online_retrieval_with_go_server( entity_rows = [ { - "driver": driver, - "customer_id": customer, + "driver": _driver, + "customer_id": _customer, "origin_id": origin, "destination_id": destination, } - for (driver, customer, origin, destination) in zip( + for (_driver, _customer, origin, destination) in zip( sample_drivers, sample_customers, *sample_location_pairs ) ] @@ -890,50 +891,18 @@ def test_online_retrieval_with_go_server( @pytest.mark.integration @pytest.mark.goserver -@pytest.mark.skip( - "todo(achals) goserver processes don't actually clean up properly. Need to redo this component of the server." -) def test_online_store_cleanup_with_go_server(go_environment, go_data_sources): """ This test mirrors test_online_store_cleanup for the Go feature server. It removes on demand feature views since the Go feature server doesn't support them. """ - fs = go_environment.feature_store - entities, datasets, data_sources = go_data_sources - driver_stats_fv = construct_universal_feature_views( - data_sources, with_odfv=False - ).driver - - driver_entities = entities.driver_vals - df = pd.DataFrame( - { - "ts_1": [go_environment.end_date] * len(driver_entities), - "created_ts": [go_environment.end_date] * len(driver_entities), - "driver_id": driver_entities, - "value": np.random.random(size=len(driver_entities)), - } - ) - - ds = go_environment.data_source_creator.create_data_source( - df, destination_name="simple_driver_dataset" - ) - - simple_driver_fv = driver_feature_view( - data_source=ds, name="test_universal_online_simple_driver" - ) - - fs.apply([driver(), simple_driver_fv, driver_stats_fv]) - - fs.materialize( - go_environment.start_date - timedelta(days=1), - go_environment.end_date + timedelta(days=1), + driver_entities, fs, simple_driver_fv, driver_stats_fv, df = setup_feature_store( + go_environment, go_data_sources ) expected_values = df.sort_values(by="driver_id") features = [f"{simple_driver_fv.name}:value"] entity_rows = [{"driver": driver_id} for driver_id in sorted(driver_entities)] - time.sleep(3) - online_features = fs.get_online_features( features=features, entity_rows=entity_rows ).to_dict() @@ -970,71 +939,41 @@ def eventually_apply() -> Tuple[None, bool]: @pytest.mark.integration @pytest.mark.goserverlifecycle -@pytest.mark.skip( - "todo(achals) os.fork doesn't work with pytest-xdist. Need to redo this test." -) def test_go_server_life_cycle(go_cycle_environment, go_data_sources): import threading import psutil - fs = go_cycle_environment.feature_store - fs.set_go_server_use_thread( - go_cycle_environment.test_repo_config.go_server_use_thread - ) - - entities, datasets, data_sources = go_data_sources - driver_stats_fv = construct_universal_feature_views( - data_sources, with_odfv=False - ).driver - - driver_entities = entities.driver_vals - df = pd.DataFrame( - { - "ts_1": [go_cycle_environment.end_date] * len(driver_entities), - "created_ts": [go_cycle_environment.end_date] * len(driver_entities), - "driver_id": driver_entities, - "value": np.random.random(size=len(driver_entities)), - } - ) - - ds = go_cycle_environment.data_source_creator.create_data_source( - df, destination_name="simple_driver_dataset" - ) - - simple_driver_fv = driver_feature_view( - data_source=ds, name="test_universal_online_simple_driver" + driver_entities, fs, simple_driver_fv, _, _ = setup_feature_store( + go_cycle_environment, go_data_sources ) - - fs.apply([driver(), simple_driver_fv, driver_stats_fv]) - - fs.materialize( - go_cycle_environment.start_date - timedelta(days=1), - go_cycle_environment.end_date + timedelta(days=1), - ) - expected_values = df.sort_values(by="driver_id") features = [f"{simple_driver_fv.name}:value"] entity_rows = [{"driver": driver_id} for driver_id in sorted(driver_entities)] # Start go server process that calls get_online_features and return and check if at any time go server # fails to clean up resources - import os - import signal - - # Duplicate the current test suit in the child process - child_pid = os.fork() - if child_pid == 0: - online_features = fs.get_online_features( - features=features, entity_rows=entity_rows - ).to_dict() - assert np.allclose(expected_values["value"], online_features["value"]) - os.kill(os.getpid(), signal.SIGTERM) - os._exit(0) - os.wait() + fs.get_online_features(features=features, entity_rows=entity_rows).to_dict() + + assert ( + fs._go_server + and fs._go_server._go_server_started.is_set() + and fs._go_server._shared_connection._process + ) + go_fs_pid = fs._go_server._shared_connection._process.pid + + os.kill(go_fs_pid, signal.SIGTERM) # At the same time checking that resources are clean up properly once child process is killed # Check that background thread has terminated - for id, thread in threading._active.items(): - assert thread.name != "GoServerBackgroundThread" + monitor_thread_alive = False + monitor_thread = fs._go_server._go_server_background_thread + assert monitor_thread.daemon + + print(f"Monitor thread: {monitor_thread}, {monitor_thread.ident}") + + for thread in threading.enumerate(): + if thread.ident == monitor_thread.ident and thread.is_alive(): + monitor_thread_alive = True + assert monitor_thread_alive # Check if go server subprocess is still active even if background thread and process are killed go_server_still_alive = False @@ -1043,7 +982,7 @@ def test_go_server_life_cycle(go_cycle_environment, go_data_sources): # Get process name & pid from process object. process_name = proc.name() ppid = proc.ppid() - if "goserver" in process_name and ppid == child_pid: + if "goserver" in process_name and ppid == go_fs_pid: # Kill process first and raise exception later go_server_still_alive = True proc.terminate() @@ -1052,6 +991,63 @@ def test_go_server_life_cycle(go_cycle_environment, go_data_sources): pass assert not go_server_still_alive + # Yield control to monitor thread to restart process. + time.sleep(1) + + # Make sure the background thread has created a new subprocess. + assert ( + fs._go_server + and fs._go_server._go_server_started.is_set() + and fs._go_server._shared_connection._process + ) + new_go_fs_pid = fs._go_server._shared_connection._process.pid + assert new_go_fs_pid != go_fs_pid + fs._go_server._shared_connection._check_grpc_connection() + + # Ensure process is still running. + assert fs._go_server._shared_connection._process.poll() is None + + # And we can still get feature values. + fs.get_online_features(features=features, entity_rows=entity_rows).to_dict() + + fs.kill_go_server() + + # Ensure process is dead. + assert fs._go_server is None + # Ensure monitoring thread is also dead. + live_threads = [t.ident for t in threading.enumerate()] + assert monitor_thread.ident not in live_threads + + +def setup_feature_store(environment, go_data_sources): + fs = environment.feature_store + fs.kill_go_server() + entities, datasets, data_sources = go_data_sources + driver_stats_fv = construct_universal_feature_views( + data_sources, with_odfv=False + ).driver + driver_entities = entities.driver_vals + df = pd.DataFrame( + { + "ts_1": [environment.end_date] * len(driver_entities), + "created_ts": [environment.end_date] * len(driver_entities), + "driver_id": driver_entities, + "value": np.random.random(size=len(driver_entities)), + } + ) + ds = environment.data_source_creator.create_data_source( + df, destination_name="simple_driver_dataset" + ) + simple_driver_fv = driver_feature_view( + data_source=ds, name="test_universal_online_simple_driver" + ) + fs.apply([driver(), simple_driver_fv, driver_stats_fv]) + fs.materialize( + environment.start_date - timedelta(days=1), + environment.end_date + timedelta(days=1), + ) + return driver_entities, fs, simple_driver_fv, driver_stats_fv, df + def response_feature_name(feature: str, full_feature_names: bool) -> str: if (