diff --git a/docs/reference/feature-servers/python-feature-server.md b/docs/reference/feature-servers/python-feature-server.md index 2e5792b0a6f..bf288b191ef 100644 --- a/docs/reference/feature-servers/python-feature-server.md +++ b/docs/reference/feature-servers/python-feature-server.md @@ -311,6 +311,120 @@ requests.post( data=json.dumps(materialize_data)) ``` +## Prometheus Metrics + +The Python feature server can expose Prometheus-compatible metrics on a dedicated +HTTP endpoint (default port `8000`). Metrics are **opt-in** and carry zero overhead +when disabled. + +### Enabling metrics + +**Option 1 — CLI flag** (useful for one-off runs): + +```bash +feast serve --metrics +``` + +**Option 2 — `feature_store.yaml`** (recommended for production): + +```yaml +feature_server: + type: local + metrics: + enabled: true +``` + +Either option is sufficient. When both are set, metrics are enabled. + +### Per-category control + +By default, enabling metrics turns on **all** categories. You can selectively +disable individual categories within the same `metrics` block: + +```yaml +feature_server: + type: local + metrics: + enabled: true + resource: true # CPU / memory gauges + request: false # disable endpoint latency & request counters + online_features: true # online feature retrieval counters + push: true # push request counters + materialization: true # materialization counters & duration + freshness: true # feature freshness gauges +``` + +Any category set to `false` will emit no metrics and start no background +threads (e.g., setting `freshness: false` prevents the registry polling +thread from starting). All categories default to `true`. + +### Available metrics + +| Metric | Type | Labels | Description | +|--------|------|--------|-------------| +| `feast_feature_server_cpu_usage` | Gauge | — | Process CPU usage % | +| `feast_feature_server_memory_usage` | Gauge | — | Process memory usage % | +| `feast_feature_server_request_total` | Counter | `endpoint`, `status` | Total requests per endpoint | +| `feast_feature_server_request_latency_seconds` | Histogram | `endpoint`, `feature_count`, `feature_view_count` | Request latency with p50/p95/p99 support | +| `feast_online_features_request_total` | Counter | — | Total online feature retrieval requests | +| `feast_online_features_entity_count` | Histogram | — | Entity rows per online feature request | +| `feast_push_request_total` | Counter | `push_source`, `mode` | Push requests by source and mode | +| `feast_materialization_total` | Counter | `feature_view`, `status` | Materialization runs (success/failure) | +| `feast_materialization_duration_seconds` | Histogram | `feature_view` | Materialization duration per feature view | +| `feast_feature_freshness_seconds` | Gauge | `feature_view`, `project` | Seconds since last materialization | + +### Scraping with Prometheus + +```yaml +scrape_configs: + - job_name: feast + static_configs: + - targets: ["localhost:8000"] +``` + +### Kubernetes / Feast Operator + +Set `metrics: true` in your FeatureStore CR: + +```yaml +spec: + services: + onlineStore: + server: + metrics: true +``` + +The operator automatically exposes port 8000 and creates the corresponding +Service port so Prometheus can discover it. + +### Multi-worker and multi-replica (HPA) support + +Feast uses Prometheus **multiprocess mode** so that metrics are correct +regardless of the number of Gunicorn workers or Kubernetes replicas. + +**How it works:** + +* Each Gunicorn worker writes metric values to shared files in a + temporary directory (`PROMETHEUS_MULTIPROCESS_DIR`). Feast creates + this directory automatically; you can override it by setting the + environment variable yourself. +* The metrics HTTP server on port 8000 aggregates all workers' + metric files using `MultiProcessCollector`, so a single scrape + returns accurate totals. +* Gunicorn hooks clean up dead-worker files automatically + (`child_exit` → `mark_process_dead`). +* CPU and memory gauges use `multiprocess_mode=liveall` — Prometheus + shows per-worker values distinguished by a `pid` label. +* Feature freshness gauges use `multiprocess_mode=max` — Prometheus + shows the worst-case staleness (all workers compute the same value). +* Counters and histograms (request counts, latency, materialization) + are automatically summed across workers. + +**Multiple replicas (HPA):** Each pod runs its own metrics endpoint. +Prometheus adds an `instance` label per pod, so there is no +duplication. Use `sum(rate(...))` or `histogram_quantile(...)` across +instances as usual. + ## Starting the feature server in TLS(SSL) mode Enabling TLS mode ensures that data between the Feast client and server is transmitted securely. For an ideal production environment, it is recommended to start the feature server in TLS mode. diff --git a/docs/reference/feature-store-yaml.md b/docs/reference/feature-store-yaml.md index 820731064fc..7411c673576 100644 --- a/docs/reference/feature-store-yaml.md +++ b/docs/reference/feature-store-yaml.md @@ -36,6 +36,14 @@ An example configuration: ```yaml feature_server: type: local + metrics: # Prometheus metrics configuration. Also achievable via `feast serve --metrics`. + enabled: true # Enable Prometheus metrics server on port 8000 + resource: true # CPU / memory gauges + request: true # endpoint latency histograms & request counters + online_features: true # online feature retrieval counters + push: true # push request counters + materialization: true # materialization counters & duration histograms + freshness: true # per-feature-view freshness gauges offline_push_batching_enabled: true # Enables batching of offline writes processed by /push. Online writes are unaffected. offline_push_batching_batch_size: 100 # Maximum number of buffered rows before writing to the offline store. offline_push_batching_batch_interval_seconds: 5 # Maximum time rows may remain buffered before a forced flush. diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 2ff46bad34d..c0ba3051df0 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -26,7 +26,6 @@ from typing import Any, DefaultDict, Dict, List, NamedTuple, Optional, Set, Union import pandas as pd -import psutil from dateutil import parser from fastapi import ( Depends, @@ -42,10 +41,10 @@ from fastapi.responses import JSONResponse, ORJSONResponse from fastapi.staticfiles import StaticFiles from google.protobuf.json_format import MessageToDict -from prometheus_client import Gauge, start_http_server from pydantic import BaseModel import feast +from feast import metrics as feast_metrics from feast import proto_json, utils from feast.constants import DEFAULT_FEATURE_SERVER_REGISTRY_TTL from feast.data_source import PushMode @@ -64,14 +63,6 @@ str_to_auth_manager_type, ) -# Define prometheus metrics -cpu_usage_gauge = Gauge( - "feast_feature_server_cpu_usage", "CPU usage of the Feast feature server" -) -memory_usage_gauge = Gauge( - "feast_feature_server_memory_usage", "Memory usage of the Feast feature server" -) - # TODO: deprecate this in favor of push features class WriteToFeatureStoreRequest(BaseModel): @@ -129,6 +120,30 @@ class ChatRequest(BaseModel): messages: List[ChatMessage] +def _resolve_feature_counts( + features: Union[List[str], "feast.FeatureService"], +) -> tuple: + """Return (feature_count, feature_view_count) from the resolved features. + + ``features`` is either a list of ``"feature_view:feature"`` strings or + a ``FeatureService`` with ``feature_view_projections``. + """ + from feast.feature_service import FeatureService + + if isinstance(features, FeatureService): + projections = features.feature_view_projections + fv_count = len(projections) + feat_count = sum(len(p.features) for p in projections) + elif isinstance(features, list): + feat_count = len(features) + fv_names = {ref.split(":")[0] for ref in features if ":" in ref} + fv_count = len(fv_names) + else: + feat_count = 0 + fv_count = 0 + return str(feat_count), str(fv_count) + + async def _get_features( request: Union[GetOnlineFeaturesRequest, GetOnlineDocumentsRequest], store: "feast.FeatureStore", @@ -325,30 +340,37 @@ async def lifespan(app: FastAPI): dependencies=[Depends(inject_user_details)], ) async def get_online_features(request: GetOnlineFeaturesRequest) -> ORJSONResponse: - # Initialize parameters for FeatureStore.get_online_features(...) call - features = await _get_features(request, store) + with feast_metrics.track_request_latency( + "/get-online-features", + ) as metrics_ctx: + features = await _get_features(request, store) + feat_count, fv_count = _resolve_feature_counts(features) + metrics_ctx.feature_count = feat_count + metrics_ctx.feature_view_count = fv_count + + entity_count = len(next(iter(request.entities.values()), [])) + feast_metrics.track_online_features_entities(entity_count) + + read_params = dict( + features=features, + entity_rows=request.entities, + full_feature_names=request.full_feature_names, + ) - read_params = dict( - features=features, - entity_rows=request.entities, - full_feature_names=request.full_feature_names, - ) + if store._get_provider().async_supported.online.read: + response = await store.get_online_features_async(**read_params) # type: ignore + else: + response = await run_in_threadpool( + lambda: store.get_online_features(**read_params) # type: ignore + ) - if store._get_provider().async_supported.online.read: - response = await store.get_online_features_async(**read_params) # type: ignore - else: - response = await run_in_threadpool( - lambda: store.get_online_features(**read_params) # type: ignore + response_dict = await run_in_threadpool( + MessageToDict, + response.proto, + preserving_proto_field_name=True, + float_precision=18, ) - - # Convert Protobuf to dict, then use ORJSONResponse for faster JSON serialization - response_dict = await run_in_threadpool( - MessageToDict, - response.proto, - preserving_proto_field_name=True, - float_precision=18, - ) - return ORJSONResponse(content=response_dict) + return ORJSONResponse(content=response_dict) @app.post( "/retrieve-online-documents", @@ -357,127 +379,132 @@ async def get_online_features(request: GetOnlineFeaturesRequest) -> ORJSONRespon async def retrieve_online_documents( request: GetOnlineDocumentsRequest, ) -> ORJSONResponse: - logger.warning( - "This endpoint is in alpha and will be moved to /get-online-features when stable." - ) - # Initialize parameters for FeatureStore.retrieve_online_documents_v2(...) call - features = await _get_features(request, store) - - read_params = dict(features=features, query=request.query, top_k=request.top_k) - if request.api_version == 2 and request.query_string is not None: - read_params["query_string"] = request.query_string - - if request.api_version == 2: - response = await run_in_threadpool( - lambda: store.retrieve_online_documents_v2(**read_params) # type: ignore + with feast_metrics.track_request_latency("/retrieve-online-documents"): + logger.warning( + "This endpoint is in alpha and will be moved to /get-online-features when stable." ) - else: - response = await run_in_threadpool( - lambda: store.retrieve_online_documents(**read_params) # type: ignore + features = await _get_features(request, store) + + read_params = dict( + features=features, query=request.query, top_k=request.top_k ) + if request.api_version == 2 and request.query_string is not None: + read_params["query_string"] = request.query_string - # Convert Protobuf to dict, then use ORJSONResponse for faster JSON serialization - response_dict = await run_in_threadpool( - MessageToDict, - response.proto, - preserving_proto_field_name=True, - float_precision=18, - ) - return ORJSONResponse(content=response_dict) + if request.api_version == 2: + response = await run_in_threadpool( + lambda: store.retrieve_online_documents_v2(**read_params) # type: ignore + ) + else: + response = await run_in_threadpool( + lambda: store.retrieve_online_documents(**read_params) # type: ignore + ) + + response_dict = await run_in_threadpool( + MessageToDict, + response.proto, + preserving_proto_field_name=True, + float_precision=18, + ) + return ORJSONResponse(content=response_dict) @app.post("/push", dependencies=[Depends(inject_user_details)]) async def push(request: PushFeaturesRequest) -> Response: - df = pd.DataFrame(request.df) - actions = [] - if request.to == "offline": - to = PushMode.OFFLINE - actions = [AuthzedAction.WRITE_OFFLINE] - elif request.to == "online": - to = PushMode.ONLINE - actions = [AuthzedAction.WRITE_ONLINE] - elif request.to == "online_and_offline": - to = PushMode.ONLINE_AND_OFFLINE - actions = WRITE - else: - raise ValueError( - f"{request.to} is not a supported push format. Please specify one of these ['online', 'offline', 'online_and_offline']." - ) + with feast_metrics.track_request_latency("/push"): + df = pd.DataFrame(request.df) + actions = [] + if request.to == "offline": + to = PushMode.OFFLINE + actions = [AuthzedAction.WRITE_OFFLINE] + elif request.to == "online": + to = PushMode.ONLINE + actions = [AuthzedAction.WRITE_ONLINE] + elif request.to == "online_and_offline": + to = PushMode.ONLINE_AND_OFFLINE + actions = WRITE + else: + raise ValueError( + f"{request.to} is not a supported push format. Please specify one of these ['online', 'offline', 'online_and_offline']." + ) - from feast.data_source import PushSource - - all_fvs = store.list_feature_views( - allow_cache=request.allow_registry_cache - ) + store.list_stream_feature_views(allow_cache=request.allow_registry_cache) - fvs_with_push_sources = { - fv - for fv in all_fvs - if ( - fv.stream_source is not None - and isinstance(fv.stream_source, PushSource) - and fv.stream_source.name == request.push_source_name - ) - } + from feast.data_source import PushSource - for feature_view in fvs_with_push_sources: - assert_permissions(resource=feature_view, actions=actions) - - async def _push_with_to(push_to: PushMode) -> None: - """ - Helper for performing a single push operation. - - NOTE: - - Feast providers **do not currently support async offline writes**. - - Therefore: - * ONLINE and ONLINE_AND_OFFLINE → may be async, depending on provider.async_supported.online.write - * OFFLINE → always synchronous, but executed via run_in_threadpool when called from HTTP handlers. - - The OfflineWriteBatcher handles offline writes directly in its own background thread, but the offline store writes are currently synchronous only. - """ - push_source_name = request.push_source_name - allow_registry_cache = request.allow_registry_cache - transform_on_write = request.transform_on_write - - # Async currently only applies to online store writes (ONLINE / ONLINE_AND_OFFLINE paths) as theres no async for offline store - if push_to in (PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE) and ( - store._get_provider().async_supported.online.write - ): - await store.push_async( - push_source_name=push_source_name, - df=df, - allow_registry_cache=allow_registry_cache, - to=push_to, - transform_on_write=transform_on_write, + all_fvs = store.list_feature_views( + allow_cache=request.allow_registry_cache + ) + store.list_stream_feature_views( + allow_cache=request.allow_registry_cache + ) + fvs_with_push_sources = { + fv + for fv in all_fvs + if ( + fv.stream_source is not None + and isinstance(fv.stream_source, PushSource) + and fv.stream_source.name == request.push_source_name ) - else: - await run_in_threadpool( - lambda: store.push( + } + + for feature_view in fvs_with_push_sources: + assert_permissions(resource=feature_view, actions=actions) + + async def _push_with_to(push_to: PushMode) -> None: + """ + Helper for performing a single push operation. + + NOTE: + - Feast providers **do not currently support async offline writes**. + - Therefore: + * ONLINE and ONLINE_AND_OFFLINE → may be async, depending on provider.async_supported.online.write + * OFFLINE → always synchronous, but executed via run_in_threadpool when called from HTTP handlers. + - The OfflineWriteBatcher handles offline writes directly in its own background thread, but the offline store writes are currently synchronous only. + """ + push_source_name = request.push_source_name + allow_registry_cache = request.allow_registry_cache + transform_on_write = request.transform_on_write + + # Async currently only applies to online store writes (ONLINE / ONLINE_AND_OFFLINE paths) as theres no async for offline store + if push_to in (PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE) and ( + store._get_provider().async_supported.online.write + ): + await store.push_async( push_source_name=push_source_name, df=df, allow_registry_cache=allow_registry_cache, to=push_to, transform_on_write=transform_on_write, ) - ) + else: + await run_in_threadpool( + lambda: store.push( + push_source_name=push_source_name, + df=df, + allow_registry_cache=allow_registry_cache, + to=push_to, + transform_on_write=transform_on_write, + ) + ) - needs_online = to in (PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE) - needs_offline = to in (PushMode.OFFLINE, PushMode.ONLINE_AND_OFFLINE) + needs_online = to in (PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE) + needs_offline = to in (PushMode.OFFLINE, PushMode.ONLINE_AND_OFFLINE) - status_code = status.HTTP_200_OK + status_code = status.HTTP_200_OK - if offline_batcher is None or not needs_offline: - await _push_with_to(to) - else: - if needs_online: - await _push_with_to(PushMode.ONLINE) - - offline_batcher.enqueue( - push_source_name=request.push_source_name, - df=df, - allow_registry_cache=request.allow_registry_cache, - transform_on_write=request.transform_on_write, - ) - status_code = status.HTTP_202_ACCEPTED + if offline_batcher is None or not needs_offline: + await _push_with_to(to) + else: + if needs_online: + await _push_with_to(PushMode.ONLINE) + + offline_batcher.enqueue( + push_source_name=request.push_source_name, + df=df, + allow_registry_cache=request.allow_registry_cache, + transform_on_write=request.transform_on_write, + ) + status_code = status.HTTP_202_ACCEPTED - return Response(status_code=status_code) + feast_metrics.track_push(request.push_source_name, request.to) + return Response(status_code=status_code) async def _get_feast_object( feature_view_name: str, allow_registry_cache: bool @@ -529,51 +556,50 @@ async def chat_ui(): @app.post("/materialize", dependencies=[Depends(inject_user_details)]) async def materialize(request: MaterializeRequest) -> None: - for feature_view in request.feature_views or []: - resource = await _get_feast_object(feature_view, True) - assert_permissions( - resource=resource, - actions=[AuthzedAction.WRITE_ONLINE], - ) - - if request.disable_event_timestamp: - # Query all available data and use current datetime as event timestamp - now = datetime.now() - start_date = datetime( - 1970, 1, 1 - ) # Beginning of time to capture all historical data - end_date = now - else: - if not request.start_ts or not request.end_ts: - raise ValueError( - "start_ts and end_ts are required when disable_event_timestamp is False" + with feast_metrics.track_request_latency("/materialize"): + for feature_view in request.feature_views or []: + resource = await _get_feast_object(feature_view, True) + assert_permissions( + resource=resource, + actions=[AuthzedAction.WRITE_ONLINE], ) - start_date = utils.make_tzaware(parser.parse(request.start_ts)) - end_date = utils.make_tzaware(parser.parse(request.end_ts)) - await run_in_threadpool( - store.materialize, - start_date, - end_date, - request.feature_views, - disable_event_timestamp=request.disable_event_timestamp, - full_feature_names=request.full_feature_names, - ) + if request.disable_event_timestamp: + now = datetime.now() + start_date = datetime(1970, 1, 1) + end_date = now + else: + if not request.start_ts or not request.end_ts: + raise ValueError( + "start_ts and end_ts are required when disable_event_timestamp is False" + ) + start_date = utils.make_tzaware(parser.parse(request.start_ts)) + end_date = utils.make_tzaware(parser.parse(request.end_ts)) + + await run_in_threadpool( + store.materialize, + start_date, + end_date, + request.feature_views, + disable_event_timestamp=request.disable_event_timestamp, + full_feature_names=request.full_feature_names, + ) @app.post("/materialize-incremental", dependencies=[Depends(inject_user_details)]) async def materialize_incremental(request: MaterializeIncrementalRequest) -> None: - for feature_view in request.feature_views or []: - resource = await _get_feast_object(feature_view, True) - assert_permissions( - resource=resource, - actions=[AuthzedAction.WRITE_ONLINE], + with feast_metrics.track_request_latency("/materialize-incremental"): + for feature_view in request.feature_views or []: + resource = await _get_feast_object(feature_view, True) + assert_permissions( + resource=resource, + actions=[AuthzedAction.WRITE_ONLINE], + ) + await run_in_threadpool( + store.materialize_incremental, + utils.make_tzaware(parser.parse(request.end_ts)), + request.feature_views, + full_feature_names=request.full_feature_names, ) - await run_in_threadpool( - store.materialize_incremental, - utils.make_tzaware(parser.parse(request.end_ts)), - request.feature_views, - full_feature_names=request.full_feature_names, - ) @app.exception_handler(Exception) async def rest_exception_handler(request: Request, exc: Exception): @@ -695,12 +721,15 @@ def _add_mcp_support_if_enabled(app, store: "feast.FeatureStore"): import gunicorn.app.base class FeastServeApplication(gunicorn.app.base.BaseApplication): - def __init__(self, store: "feast.FeatureStore", **options): + def __init__( + self, store: "feast.FeatureStore", metrics_enabled: bool = False, **options + ): self._app = get_app( store=store, registry_ttl_sec=options["registry_ttl_sec"], ) self._options = options + self._metrics_enabled = metrics_enabled super().__init__() def load_config(self): @@ -709,25 +738,20 @@ def load_config(self): self.cfg.set(key.lower(), value) self.cfg.set("worker_class", "uvicorn_worker.UvicornWorker") + if self._metrics_enabled: + self.cfg.set("post_worker_init", _gunicorn_post_worker_init) + self.cfg.set("child_exit", _gunicorn_child_exit) def load(self): return self._app + def _gunicorn_post_worker_init(worker): + """Start per-worker resource monitoring after Gunicorn forks.""" + feast_metrics.init_worker_monitoring() -def monitor_resources(self, interval: int = 5): - """Function to monitor and update CPU and memory usage metrics.""" - logger.debug(f"Starting resource monitoring with interval {interval} seconds") - p = psutil.Process() - logger.debug(f"PID is {p.pid}") - while True: - with p.oneshot(): - cpu_usage = p.cpu_percent() - memory_usage = p.memory_percent() - logger.debug(f"CPU usage: {cpu_usage}%, Memory usage: {memory_usage}%") - logger.debug(f"CPU usage: {cpu_usage}%, Memory usage: {memory_usage}%") - cpu_usage_gauge.set(cpu_usage) - memory_usage_gauge.set(memory_usage) - time.sleep(interval) + def _gunicorn_child_exit(server, worker): + """Clean up Prometheus metric files for a dead worker.""" + feast_metrics.mark_process_dead(worker.pid) def start_server( @@ -749,15 +773,19 @@ def start_server( raise ValueError( "Both key and cert file paths are required to start server in TLS mode." ) - if metrics: - logger.info("Starting Prometheus Server") - start_http_server(8000) - logger.debug("Starting background thread to monitor CPU and memory usage") - monitoring_thread = threading.Thread( - target=monitor_resources, args=(5,), daemon=True + fs_cfg = getattr(store.config, "feature_server", None) + metrics_cfg = getattr(fs_cfg, "metrics", None) + metrics_from_config = getattr(metrics_cfg, "enabled", False) + metrics_active = metrics or metrics_from_config + uses_gunicorn = sys.platform != "win32" + if metrics_active: + flags = feast_metrics.build_metrics_flags(metrics_cfg) + feast_metrics.start_metrics_server( + store, + metrics_config=flags, + start_resource_monitoring=not uses_gunicorn, ) - monitoring_thread.start() logger.debug("start_server called") auth_type = str_to_auth_manager_type(store.config.auth_config.type) @@ -771,7 +799,7 @@ def start_server( ) logger.debug("Auth manager initialized successfully") - if sys.platform != "win32": + if uses_gunicorn: options = { "bind": f"{host}:{port}", "accesslog": None if no_access_log else "-", @@ -787,7 +815,9 @@ def start_server( if tls_key_path and tls_cert_path: options["keyfile"] = tls_key_path options["certfile"] = tls_cert_path - FeastServeApplication(store=store, **options).run() + FeastServeApplication( + store=store, metrics_enabled=metrics_active, **options + ).run() else: import uvicorn diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 72bd93cc522..fdc790b71e1 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -15,6 +15,7 @@ import copy import itertools import os +import time import warnings from datetime import datetime, timedelta from pathlib import Path @@ -100,6 +101,29 @@ from feast.transformation.python_transformation import PythonTransformation from feast.utils import _get_feature_view_vector_field_metadata, _utc_now +_track_materialization = None # Lazy-loaded on first materialization call +_track_materialization_loaded = False + + +def _get_track_materialization(): + """Lazy-import feast.metrics only when materialization tracking is needed. + + Avoids importing the metrics module (and its prometheus_client / + psutil dependencies plus temp-dir creation) for every FeatureStore + usage such as ``feast apply`` or simple SDK reads. + """ + global _track_materialization, _track_materialization_loaded + if not _track_materialization_loaded: + _track_materialization_loaded = True + try: + from feast.metrics import track_materialization + + _track_materialization = track_materialization + except Exception: # pragma: no cover + _track_materialization = None + return _track_materialization + + warnings.simplefilter("once", DeprecationWarning) @@ -1705,15 +1729,29 @@ def tqdm_builder(length): start_date = utils.make_tzaware(start_date) end_date = utils.make_tzaware(end_date) or _utc_now() - provider.materialize_single_feature_view( - config=self.config, - feature_view=feature_view, - start_date=start_date, - end_date=end_date, - registry=self.registry, - project=self.project, - tqdm_builder=tqdm_builder, - ) + fv_start = time.monotonic() + fv_success = True + try: + provider.materialize_single_feature_view( + config=self.config, + feature_view=feature_view, + start_date=start_date, + end_date=end_date, + registry=self.registry, + project=self.project, + tqdm_builder=tqdm_builder, + ) + except Exception: + fv_success = False + raise + finally: + _tracker = _get_track_materialization() + if _tracker is not None: + _tracker( + feature_view.name, + fv_success, + time.monotonic() - fv_start, + ) if not isinstance(feature_view, OnDemandFeatureView): self.registry.apply_materialization( feature_view, @@ -1814,16 +1852,30 @@ def tqdm_builder(length): start_date = utils.make_tzaware(start_date) end_date = utils.make_tzaware(end_date) - provider.materialize_single_feature_view( - config=self.config, - feature_view=feature_view, - start_date=start_date, - end_date=end_date, - registry=self.registry, - project=self.project, - tqdm_builder=tqdm_builder, - disable_event_timestamp=disable_event_timestamp, - ) + fv_start = time.monotonic() + fv_success = True + try: + provider.materialize_single_feature_view( + config=self.config, + feature_view=feature_view, + start_date=start_date, + end_date=end_date, + registry=self.registry, + project=self.project, + tqdm_builder=tqdm_builder, + disable_event_timestamp=disable_event_timestamp, + ) + except Exception: + fv_success = False + raise + finally: + _tracker = _get_track_materialization() + if _tracker is not None: + _tracker( + feature_view.name, + fv_success, + time.monotonic() - fv_start, + ) self.registry.apply_materialization( feature_view, diff --git a/sdk/python/feast/infra/feature_servers/base_config.py b/sdk/python/feast/infra/feature_servers/base_config.py index b13e23d035e..d6b650ced15 100644 --- a/sdk/python/feast/infra/feature_servers/base_config.py +++ b/sdk/python/feast/infra/feature_servers/base_config.py @@ -37,12 +37,57 @@ class FeatureLoggingConfig(FeastConfigBaseModel): """Timeout for adding new log item to the queue.""" +class MetricsConfig(FeastConfigBaseModel): + """Prometheus metrics configuration. + + Follows the same pattern as ``FeatureLoggingConfig``: a single + ``enabled`` flag controls global on/off, and per-category booleans + allow fine-grained suppression. Can also be enabled at runtime via + the ``feast serve --metrics`` CLI flag — either option is sufficient. + """ + + enabled: StrictBool = False + """Whether Prometheus metrics collection and the metrics HTTP server + (default port 8000) should be enabled.""" + + resource: StrictBool = True + """Emit CPU and memory usage gauges (feast_feature_server_cpu_usage, + feast_feature_server_memory_usage).""" + + request: StrictBool = True + """Emit per-endpoint request counters and latency histograms + (feast_feature_server_request_total, + feast_feature_server_request_latency_seconds).""" + + online_features: StrictBool = True + """Emit online feature retrieval metrics + (feast_online_features_request_total, + feast_online_features_entity_count).""" + + push: StrictBool = True + """Emit push/write request counters + (feast_push_request_total).""" + + materialization: StrictBool = True + """Emit materialization success/failure counters and duration histograms + (feast_materialization_total, + feast_materialization_duration_seconds).""" + + freshness: StrictBool = True + """Emit per-feature-view freshness gauges + (feast_feature_freshness_seconds).""" + + class BaseFeatureServerConfig(FeastConfigBaseModel): """Base Feature Server config that should be extended""" enabled: StrictBool = False """Whether the feature server should be launched.""" + metrics: Optional[MetricsConfig] = None + """Prometheus metrics configuration. Set ``metrics.enabled: true`` or + pass the ``feast serve --metrics`` CLI flag to activate.""" + feature_logging: Optional[FeatureLoggingConfig] = None """ Feature logging configuration """ diff --git a/sdk/python/feast/metrics.py b/sdk/python/feast/metrics.py new file mode 100644 index 00000000000..be2b068d32c --- /dev/null +++ b/sdk/python/feast/metrics.py @@ -0,0 +1,466 @@ +# Copyright 2025 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Centralized Prometheus metrics for the Feast feature server. + +All metrics are defined here to provide a single source of truth. +Instrumentation is **opt-in**: metric recording is gated behind a +``_config`` object whose flags are only set when +``start_metrics_server()`` is called (i.e. when the feature server is +started with ``--metrics`` or ``metrics.enabled: true`` in the YAML). + +Each metric category can be individually toggled via the ``metrics`` +sub-block in ``feature_store.yaml``. When disabled, helpers +short-circuit with a fast attribute check and do zero work. + +Multiprocess support +-------------------- +Gunicorn pre-forks worker processes, so every worker gets its own copy +of the in-process metric state. To aggregate across workers we use +``prometheus_client``'s multiprocess mode: + +1. ``PROMETHEUS_MULTIPROCESS_DIR`` is set (to a temp dir if the user + has not already set it) **before** any metric objects are created. +2. Gauges specify ``multiprocess_mode`` so they aggregate correctly. +3. The metrics HTTP server uses ``MultiProcessCollector`` to read all + workers' metric files. +4. Gunicorn hooks (``post_worker_init``, ``child_exit``) are wired up + in ``feature_server.py`` to start per-worker monitoring and to + clean up dead-worker files. +""" + +import atexit +import logging +import os +import shutil +import tempfile +import threading +import time +from contextlib import contextmanager +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Optional + +import psutil + +if TYPE_CHECKING: + from feast.feature_store import FeatureStore + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Multiprocess directory setup — MUST happen before prometheus_client import +# so that metric values are stored in shared files rather than in-process +# memory (required for Gunicorn pre-fork workers). +# --------------------------------------------------------------------------- +_prometheus_mp_dir: Optional[str] = None +_owns_mp_dir: bool = False +_owner_pid: Optional[int] = None + +if "PROMETHEUS_MULTIPROCESS_DIR" not in os.environ: + _prometheus_mp_dir = tempfile.mkdtemp(prefix="feast_metrics_") + os.environ["PROMETHEUS_MULTIPROCESS_DIR"] = _prometheus_mp_dir + _owns_mp_dir = True + _owner_pid = os.getpid() +else: + _prometheus_mp_dir = os.environ["PROMETHEUS_MULTIPROCESS_DIR"] + +# prometheus_client uses two different env var names: +# - PROMETHEUS_MULTIPROCESS_DIR (for value storage in prometheus_client.values) +# - PROMETHEUS_MULTIPROC_DIR (for MultiProcessCollector) +# Both must point to the same directory. +if "PROMETHEUS_MULTIPROC_DIR" not in os.environ: + os.environ["PROMETHEUS_MULTIPROC_DIR"] = _prometheus_mp_dir + + +def _cleanup_multiprocess_dir(): + # Only the process that created the directory may remove it. + # Forked Gunicorn workers inherit _owns_mp_dir=True but have a + # different PID; letting them delete the shared directory would + # break metrics for sibling workers and the metrics HTTP server. + if ( + _owns_mp_dir + and _owner_pid == os.getpid() + and _prometheus_mp_dir + and os.path.isdir(_prometheus_mp_dir) + ): + shutil.rmtree(_prometheus_mp_dir, ignore_errors=True) + + +atexit.register(_cleanup_multiprocess_dir) + +# Now safe to import prometheus_client — it will detect the env var. +from prometheus_client import Counter, Gauge, Histogram # noqa: E402 + + +# --------------------------------------------------------------------------- +# Per-category runtime flags +# --------------------------------------------------------------------------- +@dataclass +class _MetricsFlags: + """Runtime toggle for each metric category. + + All flags default to ``False`` (disabled). ``start_metrics_server`` + flips them on according to the user's ``MetricsConfig``. + """ + + enabled: bool = False + resource: bool = False + request: bool = False + online_features: bool = False + push: bool = False + materialization: bool = False + freshness: bool = False + + +_config = _MetricsFlags() + + +def build_metrics_flags(metrics_config: Optional[object] = None) -> _MetricsFlags: + """Build ``_MetricsFlags`` from a ``MetricsConfig`` object. + + If *metrics_config* is ``None`` (e.g. metrics activated purely via + ``--metrics`` CLI with no YAML block), every category defaults to + enabled. Otherwise the per-category booleans are respected. + """ + if metrics_config is None: + return _MetricsFlags( + enabled=True, + resource=True, + request=True, + online_features=True, + push=True, + materialization=True, + freshness=True, + ) + return _MetricsFlags( + enabled=True, + resource=getattr(metrics_config, "resource", True), + request=getattr(metrics_config, "request", True), + online_features=getattr(metrics_config, "online_features", True), + push=getattr(metrics_config, "push", True), + materialization=getattr(metrics_config, "materialization", True), + freshness=getattr(metrics_config, "freshness", True), + ) + + +# --------------------------------------------------------------------------- +# Resource metrics — multiprocess_mode="liveall" so each live worker +# reports its own CPU/memory with a ``pid`` label. +# --------------------------------------------------------------------------- +cpu_usage_gauge = Gauge( + "feast_feature_server_cpu_usage", + "CPU usage percentage of the Feast feature server process", + multiprocess_mode="liveall", +) +memory_usage_gauge = Gauge( + "feast_feature_server_memory_usage", + "Memory usage percentage of the Feast feature server process", + multiprocess_mode="liveall", +) + +# --------------------------------------------------------------------------- +# HTTP request metrics (Counters & Histograms aggregate automatically) +# --------------------------------------------------------------------------- +request_count = Counter( + "feast_feature_server_request_total", + "Total number of requests to the Feast feature server", + ["endpoint", "status"], +) +request_latency = Histogram( + "feast_feature_server_request_latency_seconds", + "Latency of requests to the Feast feature server in seconds", + ["endpoint", "feature_count", "feature_view_count"], + buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0), +) + +# --------------------------------------------------------------------------- +# Online feature retrieval metrics +# --------------------------------------------------------------------------- +online_features_request_count = Counter( + "feast_online_features_request_total", + "Total online feature retrieval requests", +) +online_features_entity_count = Histogram( + "feast_online_features_entity_count", + "Number of entity rows per online feature request", + buckets=(1, 5, 10, 25, 50, 100, 250, 500, 1000), +) + +# --------------------------------------------------------------------------- +# Push / write metrics +# --------------------------------------------------------------------------- +push_request_count = Counter( + "feast_push_request_total", + "Total push requests to the feature store", + ["push_source", "mode"], +) + +# --------------------------------------------------------------------------- +# Materialization metrics +# --------------------------------------------------------------------------- +materialization_total = Counter( + "feast_materialization_total", + "Total materialization runs per feature view", + ["feature_view", "status"], +) +materialization_duration_seconds = Histogram( + "feast_materialization_duration_seconds", + "Duration of materialization per feature view in seconds", + ["feature_view"], + buckets=(1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0, 1800.0, 3600.0), +) + +# --------------------------------------------------------------------------- +# Feature freshness metrics — "max" shows the worst-case staleness across +# processes (freshness is identical regardless of which process computes it). +# --------------------------------------------------------------------------- +feature_freshness_seconds = Gauge( + "feast_feature_freshness_seconds", + "Seconds since the most recent materialization end time per feature view", + ["feature_view", "project"], + multiprocess_mode="max", +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +class RequestMetricsContext: + """Mutable label holder yielded by :func:`track_request_latency`. + + Callers that need to resolve labels *inside* the ``with`` block + (e.g. ``/get-online-features`` where the feature count is only + known after ``_get_features`` succeeds) can set the attributes + on the yielded object and they will be picked up in ``finally``. + """ + + __slots__ = ("feature_count", "feature_view_count") + + def __init__(self, feature_count: str = "", feature_view_count: str = ""): + self.feature_count = feature_count + self.feature_view_count = feature_view_count + + +@contextmanager +def track_request_latency( + endpoint: str, feature_count: str = "", feature_view_count: str = "" +): + """Context manager that records endpoint latency and increments request count. + + Yields a :class:`RequestMetricsContext` whose ``feature_count`` and + ``feature_view_count`` attributes can be updated inside the block. + The final values are used when recording the histogram and counter + in ``finally``, so labels are accurate even when they depend on + work done inside the block. + + Gated by the ``request`` category flag. + """ + ctx = RequestMetricsContext(feature_count, feature_view_count) + if not _config.request: + yield ctx + return + + start = time.monotonic() + status_label = "success" + try: + yield ctx + except Exception: + status_label = "error" + raise + finally: + elapsed = time.monotonic() - start + request_latency.labels( + endpoint=endpoint, + feature_count=ctx.feature_count, + feature_view_count=ctx.feature_view_count, + ).observe(elapsed) + request_count.labels(endpoint=endpoint, status=status_label).inc() + + +def track_online_features_entities(entity_count: int): + """Record the number of entity rows in an online feature request.""" + if not _config.online_features: + return + online_features_request_count.inc() + online_features_entity_count.observe(entity_count) + + +def track_push(push_source: str, mode: str): + """Increment the push request counter.""" + if not _config.push: + return + push_request_count.labels(push_source=push_source, mode=mode).inc() + + +def track_materialization( + feature_view_name: str, success: bool, duration_seconds: float +): + """Record materialization outcome and duration for a single feature view.""" + if not _config.materialization: + return + status = "success" if success else "failure" + materialization_total.labels(feature_view=feature_view_name, status=status).inc() + materialization_duration_seconds.labels(feature_view=feature_view_name).observe( + duration_seconds + ) + + +def update_feature_freshness( + store: "FeatureStore", +) -> None: + """ + Compute and set the freshness gauge for every feature view in the registry. + + Freshness = now - most_recent_end_time (from materialization_intervals). + A higher value means the feature data is more stale. + """ + try: + feature_views = store.list_feature_views(allow_cache=True) + now = datetime.now(tz=timezone.utc) + for fv in feature_views: + end_time = fv.most_recent_end_time + if end_time is not None: + if end_time.tzinfo is None: + end_time = end_time.replace(tzinfo=timezone.utc) + staleness = (now - end_time).total_seconds() + feature_freshness_seconds.labels( + feature_view=fv.name, project=store.project + ).set(staleness) + except Exception: + logger.debug("Failed to update feature freshness metrics", exc_info=True) + + +def monitor_resources(interval: int = 5): + """Background thread target that updates CPU and memory usage gauges.""" + logger.debug("Starting resource monitoring with interval %d seconds", interval) + p = psutil.Process() + logger.debug("PID is %d", p.pid) + while True: + with p.oneshot(): + cpu_usage = p.cpu_percent() + memory_usage = p.memory_percent() + logger.debug("CPU usage: %s%%, Memory usage: %s%%", cpu_usage, memory_usage) + cpu_usage_gauge.set(cpu_usage) + memory_usage_gauge.set(memory_usage) + time.sleep(interval) + + +def monitor_freshness(store: "FeatureStore", interval: int = 30): + """Background thread target that periodically updates feature freshness gauges.""" + logger.debug( + "Starting feature freshness monitoring with interval %d seconds", interval + ) + while True: + update_feature_freshness(store) + time.sleep(interval) + + +# --------------------------------------------------------------------------- +# Gunicorn multiprocess helpers +# --------------------------------------------------------------------------- + + +def mark_process_dead(pid: int): + """Clean up metric files for a dead Gunicorn worker. + + Called from the Gunicorn ``child_exit`` hook so that stale worker + data no longer appears in scraped output. + """ + if not _config.enabled: + return + try: + from prometheus_client import multiprocess + + multiprocess.mark_process_dead(pid) + except Exception: + logger.debug("Failed to mark process %d as dead", pid, exc_info=True) + + +def init_worker_monitoring(): + """Start resource monitoring inside a Gunicorn worker process. + + Called from the ``post_worker_init`` hook so that each worker + tracks its own CPU/memory independently of the master. + """ + if _config.resource: + t = threading.Thread(target=monitor_resources, args=(5,), daemon=True) + t.start() + + +def start_metrics_server( + store: "FeatureStore", + port: int = 8000, + metrics_config: Optional["_MetricsFlags"] = None, + start_resource_monitoring: bool = True, +): + """ + Start the Prometheus metrics HTTP server and background monitoring threads. + + Uses ``MultiProcessCollector`` so that metrics from all Gunicorn + workers are correctly aggregated when Prometheus scrapes port *port*. + + Args: + store: The FeatureStore instance (used for freshness checks). + port: TCP port for the Prometheus HTTP endpoint. + metrics_config: Optional pre-built ``_MetricsFlags``. When + ``None`` every category defaults to **enabled**. + start_resource_monitoring: Whether to start the CPU/memory + monitoring thread. Set to ``False`` when Gunicorn will + fork workers — the ``post_worker_init`` hook starts + per-worker monitoring instead. + """ + global _config + + if metrics_config is not None: + _config = metrics_config + else: + _config = _MetricsFlags( + enabled=True, + resource=True, + request=True, + online_features=True, + push=True, + materialization=True, + freshness=True, + ) + + from prometheus_client import CollectorRegistry, make_wsgi_app + from prometheus_client.multiprocess import MultiProcessCollector + + registry = CollectorRegistry() + MultiProcessCollector(registry) + + from wsgiref.simple_server import make_server + + httpd = make_server("", port, make_wsgi_app(registry)) + metrics_thread = threading.Thread(target=httpd.serve_forever, daemon=True) + metrics_thread.start() + logger.info( + "Prometheus metrics server started on port %d (multiprocess-safe)", port + ) + + if _config.resource and start_resource_monitoring: + resource_thread = threading.Thread( + target=monitor_resources, args=(5,), daemon=True + ) + resource_thread.start() + + if _config.freshness: + freshness_thread = threading.Thread( + target=monitor_freshness, args=(store, 30), daemon=True + ) + freshness_thread.start() diff --git a/sdk/python/tests/unit/test_metrics.py b/sdk/python/tests/unit/test_metrics.py new file mode 100644 index 00000000000..ba014064669 --- /dev/null +++ b/sdk/python/tests/unit/test_metrics.py @@ -0,0 +1,826 @@ +# Copyright 2025 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime, timedelta, timezone +from unittest.mock import MagicMock, patch + +import pytest + +from feast.metrics import ( + feature_freshness_seconds, + materialization_duration_seconds, + materialization_total, + online_features_entity_count, + online_features_request_count, + push_request_count, + request_count, + request_latency, + track_materialization, + track_online_features_entities, + track_push, + track_request_latency, + update_feature_freshness, +) + + +@pytest.fixture(autouse=True) +def _enable_metrics(): + """Enable all metric categories for each test, then restore.""" + import feast.metrics as m + + original = m._config + m._config = m._MetricsFlags( + enabled=True, + resource=True, + request=True, + online_features=True, + push=True, + materialization=True, + freshness=True, + ) + yield + m._config = original + + +class TestTrackRequestLatency: + def test_success_increments_counter_and_records_latency(self): + before_count = request_count.labels( + endpoint="/test", status="success" + )._value.get() + + with track_request_latency("/test"): + pass + + after_count = request_count.labels( + endpoint="/test", status="success" + )._value.get() + assert after_count == before_count + 1 + + def test_error_increments_error_counter(self): + before_count = request_count.labels( + endpoint="/test-err", status="error" + )._value.get() + + with pytest.raises(ValueError): + with track_request_latency("/test-err"): + raise ValueError("boom") + + after_count = request_count.labels( + endpoint="/test-err", status="error" + )._value.get() + assert after_count == before_count + 1 + + def test_latency_is_recorded(self): + before_sum = request_latency.labels( + endpoint="/test-latency", feature_count="", feature_view_count="" + )._sum.get() + + with track_request_latency("/test-latency"): + import time + + time.sleep(0.01) + + after_sum = request_latency.labels( + endpoint="/test-latency", feature_count="", feature_view_count="" + )._sum.get() + assert after_sum > before_sum + + def test_feature_count_and_feature_view_count_labels(self): + """Latency histogram carries feature_count and feature_view_count labels.""" + label_set = dict( + endpoint="/get-online-features", + feature_count="5", + feature_view_count="2", + ) + before_sum = request_latency.labels(**label_set)._sum.get() + + with track_request_latency( + "/get-online-features", feature_count="5", feature_view_count="2" + ): + pass + + after_sum = request_latency.labels(**label_set)._sum.get() + assert after_sum > before_sum + + def test_default_labels_are_empty_string(self): + """Non-online-features endpoints get empty-string labels by default.""" + label_set = dict( + endpoint="/materialize", feature_count="", feature_view_count="" + ) + before_sum = request_latency.labels(**label_set)._sum.get() + + with track_request_latency("/materialize"): + pass + + after_sum = request_latency.labels(**label_set)._sum.get() + assert after_sum > before_sum + + def test_labels_updated_via_yielded_context(self): + """Labels set on the yielded context are used in the final metrics.""" + label_set = dict( + endpoint="/ctx-update", feature_count="3", feature_view_count="1" + ) + before_sum = request_latency.labels(**label_set)._sum.get() + + with track_request_latency("/ctx-update") as ctx: + ctx.feature_count = "3" + ctx.feature_view_count = "1" + + after_sum = request_latency.labels(**label_set)._sum.get() + assert after_sum > before_sum + + def test_error_before_labels_set_still_records(self): + """Errors before labels are updated still record with default labels.""" + before_count = request_count.labels( + endpoint="/early-fail", status="error" + )._value.get() + + with pytest.raises(RuntimeError): + with track_request_latency("/early-fail") as _ctx: + raise RuntimeError("auth failed") + + after_count = request_count.labels( + endpoint="/early-fail", status="error" + )._value.get() + assert after_count == before_count + 1 + + recorded_sum = request_latency.labels( + endpoint="/early-fail", feature_count="", feature_view_count="" + )._sum.get() + assert recorded_sum > 0 + + +class TestMetricsOptIn: + """Verify that when a category is disabled, its helpers are true no-ops.""" + + @staticmethod + def _all_off(): + import feast.metrics as m + + m._config = m._MetricsFlags() # everything False + + def test_track_request_latency_noop_when_disabled(self): + self._all_off() + label_set = dict( + endpoint="/disabled-test", feature_count="", feature_view_count="" + ) + before_sum = request_latency.labels(**label_set)._sum.get() + + with track_request_latency("/disabled-test"): + pass + + assert request_latency.labels(**label_set)._sum.get() == before_sum + + def test_track_online_features_entities_noop_when_disabled(self): + self._all_off() + before = online_features_request_count._value.get() + track_online_features_entities(100) + assert online_features_request_count._value.get() == before + + def test_track_push_noop_when_disabled(self): + self._all_off() + before = push_request_count.labels( + push_source="src", mode="online" + )._value.get() + track_push("src", "online") + assert ( + push_request_count.labels(push_source="src", mode="online")._value.get() + == before + ) + + def test_track_materialization_noop_when_disabled(self): + self._all_off() + before = materialization_total.labels( + feature_view="fv_disabled", status="success" + )._value.get() + track_materialization("fv_disabled", success=True, duration_seconds=1.0) + assert ( + materialization_total.labels( + feature_view="fv_disabled", status="success" + )._value.get() + == before + ) + + +class TestGranularCategoryControl: + """Verify individual category toggles work independently.""" + + def test_request_disabled_but_push_enabled(self): + import feast.metrics as m + + m._config = m._MetricsFlags( + enabled=True, + request=False, + push=True, + resource=True, + online_features=True, + materialization=True, + freshness=True, + ) + + # request should be no-op + label_set = dict( + endpoint="/granular-req", feature_count="", feature_view_count="" + ) + before_req = request_latency.labels(**label_set)._sum.get() + with track_request_latency("/granular-req"): + pass + assert request_latency.labels(**label_set)._sum.get() == before_req + + # push should still record + before_push = push_request_count.labels( + push_source="s", mode="online" + )._value.get() + track_push("s", "online") + assert ( + push_request_count.labels(push_source="s", mode="online")._value.get() + == before_push + 1 + ) + + def test_online_features_disabled_but_materialization_enabled(self): + import feast.metrics as m + + m._config = m._MetricsFlags( + enabled=True, + online_features=False, + materialization=True, + resource=True, + request=True, + push=True, + freshness=True, + ) + + # online_features should be no-op + before_of = online_features_request_count._value.get() + track_online_features_entities(50) + assert online_features_request_count._value.get() == before_of + + # materialization should still record + before_mat = materialization_total.labels( + feature_view="fv_gran", status="success" + )._value.get() + track_materialization("fv_gran", success=True, duration_seconds=1.0) + assert ( + materialization_total.labels( + feature_view="fv_gran", status="success" + )._value.get() + == before_mat + 1 + ) + + def test_only_resource_enabled(self): + """When only resource is on, all request-path helpers are no-ops.""" + import feast.metrics as m + + m._config = m._MetricsFlags( + enabled=True, + resource=True, + request=False, + online_features=False, + push=False, + materialization=False, + freshness=False, + ) + + label_set = dict(endpoint="/res-only", feature_count="", feature_view_count="") + before_req = request_latency.labels(**label_set)._sum.get() + before_of = online_features_request_count._value.get() + before_push = push_request_count.labels( + push_source="x", mode="offline" + )._value.get() + before_mat = materialization_total.labels( + feature_view="fv_res", status="success" + )._value.get() + + with track_request_latency("/res-only"): + pass + track_online_features_entities(10) + track_push("x", "offline") + track_materialization("fv_res", success=True, duration_seconds=1.0) + + assert request_latency.labels(**label_set)._sum.get() == before_req + assert online_features_request_count._value.get() == before_of + assert ( + push_request_count.labels(push_source="x", mode="offline")._value.get() + == before_push + ) + assert ( + materialization_total.labels( + feature_view="fv_res", status="success" + )._value.get() + == before_mat + ) + + +class TestMetricsYamlConfig: + """Verify metrics config in feature_store.yaml is respected. + + We mock out everything past the metrics-gate check in ``start_server`` + so these tests never actually launch a real HTTP server. + """ + + @staticmethod + def _call_start_server(mock_store, cli_metrics: bool): + """Call start_server with enough mocking to avoid side-effects.""" + from feast.feature_server import start_server + + with ( + patch("feast.feature_server.feast_metrics") as mock_fm, + patch("feast.feature_server.str_to_auth_manager_type"), + patch("feast.feature_server.init_security_manager"), + patch("feast.feature_server.init_auth_manager"), + patch( + "feast.feature_server.FeastServeApplication", + side_effect=RuntimeError("stop"), + ) + if hasattr(__import__("sys"), "platform") + and __import__("sys").platform != "win32" + else patch("uvicorn.run", side_effect=RuntimeError("stop")), + ): + try: + start_server( + store=mock_store, + host="127.0.0.1", + port=6566, + no_access_log=True, + workers=1, + worker_connections=1000, + max_requests=1000, + max_requests_jitter=50, + keep_alive_timeout=30, + registry_ttl_sec=60, + tls_key_path="", + tls_cert_path="", + metrics=cli_metrics, + ) + except (RuntimeError, Exception): + pass + return mock_fm + + def test_metrics_enabled_from_yaml_config(self): + """start_server enables metrics when config has metrics.enabled=True, + even though the CLI flag is False.""" + from types import SimpleNamespace + + metrics_cfg = SimpleNamespace(enabled=True) + fs_cfg = SimpleNamespace(metrics=metrics_cfg) + mock_store = MagicMock() + mock_store.config = SimpleNamespace(feature_server=fs_cfg) + + mock_fm = self._call_start_server(mock_store, cli_metrics=False) + mock_fm.build_metrics_flags.assert_called_once_with(metrics_cfg) + mock_fm.start_metrics_server.assert_called_once() + + def test_cli_flag_enables_metrics_without_yaml_config(self): + """start_server enables metrics when --metrics is passed even without + any feature_server config section.""" + from types import SimpleNamespace + + mock_store = MagicMock() + mock_store.config = SimpleNamespace(feature_server=None) + + mock_fm = self._call_start_server(mock_store, cli_metrics=True) + mock_fm.build_metrics_flags.assert_called_once_with(None) + mock_fm.start_metrics_server.assert_called_once() + + def test_metrics_not_started_when_both_disabled(self): + """start_server does NOT start metrics when neither CLI nor config enables it.""" + from types import SimpleNamespace + + mock_store = MagicMock() + mock_store.config = SimpleNamespace( + feature_server=SimpleNamespace(metrics=SimpleNamespace(enabled=False)), + ) + + mock_fm = self._call_start_server(mock_store, cli_metrics=False) + mock_fm.start_metrics_server.assert_not_called() + + def test_metrics_not_started_when_config_is_none(self): + """start_server does NOT start metrics when feature_server config is None + and CLI flag is also False.""" + from types import SimpleNamespace + + mock_store = MagicMock() + mock_store.config = SimpleNamespace(feature_server=None) + + mock_fm = self._call_start_server(mock_store, cli_metrics=False) + mock_fm.start_metrics_server.assert_not_called() + + +class TestTrackOnlineFeaturesEntities: + def test_increments_request_count(self): + before = online_features_request_count._value.get() + track_online_features_entities(10) + assert online_features_request_count._value.get() == before + 1 + + def test_records_entity_count(self): + before_count = online_features_entity_count._sum.get() + track_online_features_entities(42) + assert online_features_entity_count._sum.get() >= before_count + 42 + + +class TestTrackPush: + def test_increments_push_counter(self): + before = push_request_count.labels( + push_source="my_source", mode="online" + )._value.get() + track_push("my_source", "online") + assert ( + push_request_count.labels( + push_source="my_source", mode="online" + )._value.get() + == before + 1 + ) + + +class TestTrackMaterialization: + def test_success_counter(self): + before = materialization_total.labels( + feature_view="fv1", status="success" + )._value.get() + track_materialization("fv1", success=True, duration_seconds=1.5) + assert ( + materialization_total.labels( + feature_view="fv1", status="success" + )._value.get() + == before + 1 + ) + + def test_failure_counter(self): + before = materialization_total.labels( + feature_view="fv2", status="failure" + )._value.get() + track_materialization("fv2", success=False, duration_seconds=0.5) + assert ( + materialization_total.labels( + feature_view="fv2", status="failure" + )._value.get() + == before + 1 + ) + + def test_duration_histogram(self): + before_sum = materialization_duration_seconds.labels( + feature_view="fv3" + )._sum.get() + track_materialization("fv3", success=True, duration_seconds=3.7) + after_sum = materialization_duration_seconds.labels( + feature_view="fv3" + )._sum.get() + assert pytest.approx(after_sum - before_sum, abs=0.01) == 3.7 + + +class TestUpdateFeatureFreshness: + def test_sets_freshness_for_materialized_views(self): + mock_fv = MagicMock() + mock_fv.name = "test_fv" + mock_fv.most_recent_end_time = datetime.now(tz=timezone.utc) - timedelta( + minutes=5 + ) + + mock_store = MagicMock() + mock_store.project = "test_project" + mock_store.list_feature_views.return_value = [mock_fv] + + update_feature_freshness(mock_store) + + staleness = feature_freshness_seconds.labels( + feature_view="test_fv", project="test_project" + )._value.get() + assert 280 < staleness < 320 + + def test_skips_unmaterialized_views(self): + mock_fv = MagicMock() + mock_fv.name = "unmaterialized_fv" + mock_fv.most_recent_end_time = None + + mock_store = MagicMock() + mock_store.project = "test_project" + mock_store.list_feature_views.return_value = [mock_fv] + + update_feature_freshness(mock_store) + + def test_handles_naive_datetime(self): + mock_fv = MagicMock() + mock_fv.name = "naive_fv" + # Simulate a naive UTC datetime (no tzinfo), as Feast typically stores + naive_utc_now = datetime.now(tz=timezone.utc).replace(tzinfo=None) + mock_fv.most_recent_end_time = naive_utc_now - timedelta(hours=1) + + mock_store = MagicMock() + mock_store.project = "test_project" + mock_store.list_feature_views.return_value = [mock_fv] + + update_feature_freshness(mock_store) + + staleness = feature_freshness_seconds.labels( + feature_view="naive_fv", project="test_project" + )._value.get() + assert 3500 < staleness < 3700 + + def test_handles_registry_errors_gracefully(self): + mock_store = MagicMock() + mock_store.list_feature_views.side_effect = Exception("registry down") + + update_feature_freshness(mock_store) + + +class TestResolveFeatureCounts: + """Verify _resolve_feature_counts for both feature-ref lists and FeatureService.""" + + def test_feature_ref_list(self): + from feast.feature_server import _resolve_feature_counts + + refs = ["driver_fv:conv_rate", "driver_fv:acc_rate", "vehicle_fv:mileage"] + feat_count, fv_count = _resolve_feature_counts(refs) + assert feat_count == "3" + assert fv_count == "2" + + def test_single_feature_view(self): + from feast.feature_server import _resolve_feature_counts + + refs = ["fv1:a", "fv1:b", "fv1:c"] + feat_count, fv_count = _resolve_feature_counts(refs) + assert feat_count == "3" + assert fv_count == "1" + + def test_empty_list(self): + from feast.feature_server import _resolve_feature_counts + + feat_count, fv_count = _resolve_feature_counts([]) + assert feat_count == "0" + assert fv_count == "0" + + def test_feature_service(self): + from feast.feature_server import _resolve_feature_counts + + proj1 = MagicMock() + proj1.features = [MagicMock(), MagicMock()] + proj2 = MagicMock() + proj2.features = [MagicMock()] + + fs_svc = MagicMock() + fs_svc.feature_view_projections = [proj1, proj2] + + from feast.feature_service import FeatureService + + fs_svc.__class__ = FeatureService + + feat_count, fv_count = _resolve_feature_counts(fs_svc) + assert feat_count == "3" + assert fv_count == "2" + + +class TestFeatureServerMetricsIntegration: + """Test that feature server endpoints record metrics.""" + + @pytest.fixture + def mock_fs_factory(self): + from tests.foo_provider import FooProvider + + def builder(**async_support): + provider = FooProvider.with_async_support(**async_support) + fs = MagicMock() + fs._get_provider.return_value = provider + from feast.online_response import OnlineResponse + from feast.protos.feast.serving.ServingService_pb2 import ( + GetOnlineFeaturesResponse, + ) + + empty_response = OnlineResponse(GetOnlineFeaturesResponse(results=[])) + fs.get_online_features = MagicMock(return_value=empty_response) + fs.push = MagicMock() + fs.get_online_features_async = MagicMock(return_value=empty_response) + fs.push_async = MagicMock() + return fs + + return builder + + def test_get_online_features_records_metrics(self, mock_fs_factory): + from fastapi.testclient import TestClient + + from feast.feature_server import get_app + + fs = mock_fs_factory(online_read=False) + client = TestClient(get_app(fs)) + + before_req = request_count.labels( + endpoint="/get-online-features", status="success" + )._value.get() + before_entity = online_features_request_count._value.get() + + client.post( + "/get-online-features", + json={ + "features": ["fv:feat1"], + "entities": {"id": [1, 2, 3]}, + }, + ) + + assert ( + request_count.labels( + endpoint="/get-online-features", status="success" + )._value.get() + == before_req + 1 + ) + assert online_features_request_count._value.get() == before_entity + 1 + + @pytest.mark.parametrize( + "features,expected_feat_count,expected_fv_count", + [ + (["fv1:a"], "1", "1"), + (["fv1:a", "fv1:b", "fv2:c"], "3", "2"), + ( + ["fv1:a", "fv1:b", "fv2:c", "fv2:d", "fv3:e"], + "5", + "3", + ), + ], + ids=["1_feat_1_fv", "3_feats_2_fvs", "5_feats_3_fvs"], + ) + def test_latency_labels_with_varying_request_sizes( + self, mock_fs_factory, features, expected_feat_count, expected_fv_count + ): + """Verify feature_count and feature_view_count labels change with request size.""" + from fastapi.testclient import TestClient + + from feast.feature_server import get_app + + fs = mock_fs_factory(online_read=False) + client = TestClient(get_app(fs)) + + label_set = dict( + endpoint="/get-online-features", + feature_count=expected_feat_count, + feature_view_count=expected_fv_count, + ) + before_sum = request_latency.labels(**label_set)._sum.get() + + client.post( + "/get-online-features", + json={ + "features": features, + "entities": {"id": [1]}, + }, + ) + + after_sum = request_latency.labels(**label_set)._sum.get() + assert after_sum > before_sum + + def test_push_records_metrics(self, mock_fs_factory): + from fastapi.testclient import TestClient + + from feast.feature_server import get_app + from feast.utils import _utc_now + + fs = mock_fs_factory(online_write=False) + client = TestClient(get_app(fs)) + + before = push_request_count.labels( + push_source="driver_locations_push", mode="online" + )._value.get() + + client.post( + "/push", + json={ + "push_source_name": "driver_locations_push", + "df": { + "driver_lat": [42.0], + "driver_long": ["42.0"], + "driver_id": [123], + "event_timestamp": [str(_utc_now())], + "created_timestamp": [str(_utc_now())], + }, + "to": "online", + }, + ) + + assert ( + push_request_count.labels( + push_source="driver_locations_push", mode="online" + )._value.get() + == before + 1 + ) + + +class TestBuildMetricsFlags: + """Verify build_metrics_flags correctly maps MetricsConfig to _MetricsFlags.""" + + def test_no_config_enables_all(self): + from feast.metrics import build_metrics_flags + + flags = build_metrics_flags(None) + assert flags.enabled is True + assert flags.resource is True + assert flags.request is True + assert flags.online_features is True + assert flags.push is True + assert flags.materialization is True + assert flags.freshness is True + + def test_selective_disable(self): + from types import SimpleNamespace + + from feast.metrics import build_metrics_flags + + mc = SimpleNamespace( + enabled=True, + resource=True, + request=False, + online_features=True, + push=False, + materialization=True, + freshness=False, + ) + flags = build_metrics_flags(mc) + assert flags.enabled is True + assert flags.resource is True + assert flags.request is False + assert flags.online_features is True + assert flags.push is False + assert flags.materialization is True + assert flags.freshness is False + + def test_all_categories_disabled(self): + from types import SimpleNamespace + + from feast.metrics import build_metrics_flags + + mc = SimpleNamespace( + enabled=True, + resource=False, + request=False, + online_features=False, + push=False, + materialization=False, + freshness=False, + ) + flags = build_metrics_flags(mc) + assert flags.enabled is True + assert flags.resource is False + assert flags.request is False + + +class TestCleanupMultiprocessDir: + """Verify the atexit handler only deletes the temp dir in the owner process.""" + + def test_cleanup_skipped_in_forked_child(self, tmp_path): + """Simulate a forked worker: _owns_mp_dir=True but _owner_pid != current PID.""" + import feast.metrics as m + + original_dir = m._prometheus_mp_dir + original_owns = m._owns_mp_dir + original_pid = m._owner_pid + + fake_dir = tmp_path / "feast_metrics_test" + fake_dir.mkdir() + + m._prometheus_mp_dir = str(fake_dir) + m._owns_mp_dir = True + m._owner_pid = -1 # Different from os.getpid() + + try: + m._cleanup_multiprocess_dir() + assert fake_dir.exists(), ( + "Directory should NOT be deleted when _owner_pid != os.getpid()" + ) + finally: + m._prometheus_mp_dir = original_dir + m._owns_mp_dir = original_owns + m._owner_pid = original_pid + + def test_cleanup_runs_in_owner_process(self, tmp_path): + """The owner process (matching PID) should delete the directory.""" + import os + + import feast.metrics as m + + original_dir = m._prometheus_mp_dir + original_owns = m._owns_mp_dir + original_pid = m._owner_pid + + fake_dir = tmp_path / "feast_metrics_test" + fake_dir.mkdir() + + m._prometheus_mp_dir = str(fake_dir) + m._owns_mp_dir = True + m._owner_pid = os.getpid() + + try: + m._cleanup_multiprocess_dir() + assert not fake_dir.exists(), ( + "Directory SHOULD be deleted when _owner_pid == os.getpid()" + ) + finally: + m._prometheus_mp_dir = original_dir + m._owns_mp_dir = original_owns + m._owner_pid = original_pid