Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ snowflake = [
]
sqlite_vec = ["sqlite-vec==v0.1.6"]
mcp = ["fastapi_mcp"]
mlflow = ["mlflow>=2.10.0"]

dbt = ["dbt-artifacts-parser"]

Expand Down
137 changes: 129 additions & 8 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,25 @@
_track_materialization = None # Lazy-loaded on first materialization call
_track_materialization_loaded = False

_mlflow_log_fn = None # Lazy-loaded on first feature retrieval
_mlflow_log_fn_loaded = False


def _get_mlflow_log_fn():
"""Lazy-import mlflow logger only when MLflow integration is configured."""
global _mlflow_log_fn, _mlflow_log_fn_loaded
if not _mlflow_log_fn_loaded:
_mlflow_log_fn_loaded = True
try:
from feast.mlflow_integration.logger import (
log_feature_retrieval_to_mlflow,
)

_mlflow_log_fn = log_feature_retrieval_to_mlflow
except Exception:
_mlflow_log_fn = None
return _mlflow_log_fn


def _get_track_materialization():
"""Lazy-import feast.metrics only when materialization tracking is needed.
Expand Down Expand Up @@ -194,6 +213,54 @@ def __init__(
# Initialize feature service cache for performance optimization
self._feature_service_cache = {}

# Configure MLflow tracking URI globally from config
self._init_mlflow_tracking()

def _init_mlflow_tracking(self):
"""Configure MLflow globally from feature_store.yaml.

Sets the tracking URI and experiment name so the user never needs
to call mlflow.set_tracking_uri() or mlflow.set_experiment() in
their scripts. The experiment is named after the Feast project.

When no tracking_uri is specified, defaults to http://127.0.0.1:5000
(a local MLflow tracking server). This ensures that train.py,
predict.py, feast ui, and the MLflow UI all share the same backend.
"""
try:
mlflow_cfg = self.config.mlflow
if mlflow_cfg is None or not mlflow_cfg.enabled:
return

import mlflow

tracking_uri = mlflow_cfg.tracking_uri or "http://127.0.0.1:5000"
mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment(self.config.project)
except ImportError:
pass
except Exception as e:
warnings.warn(f"Failed to configure MLflow tracking: {e}")

def _resolve_feature_service_name(
self, feature_refs: List[str]
) -> Optional[str]:
"""Try to find a feature service that covers the given feature refs."""
try:
ref_set = set(feature_refs)
for fs in self.registry.list_feature_services(
self.project, allow_cache=True
):
fs_refs = set()
for proj in fs.feature_view_projections:
for feat in proj.features:
fs_refs.add(f"{proj.name}:{feat.name}")
if ref_set == fs_refs or ref_set.issubset(fs_refs):
return fs.name
except Exception:
pass
return None

def _init_openlineage_emitter(self) -> Optional[Any]:
"""Initialize OpenLineage emitter if configured and enabled."""
try:
Expand Down Expand Up @@ -1483,6 +1550,8 @@ def get_historical_features(
if end_date is not None:
kwargs["end_date"] = end_date

_retrieval_start = time.monotonic()

job = provider.get_historical_features(
self.config,
feature_views,
Expand All @@ -1494,6 +1563,31 @@ def get_historical_features(
**kwargs,
)

# Auto-log to MLflow if configured
if (
self.config.mlflow is not None
and self.config.mlflow.enabled
and self.config.mlflow.auto_log
):
_log_fn = _get_mlflow_log_fn()
if _log_fn is not None:
_duration = time.monotonic() - _retrieval_start
_entity_count = (
len(entity_df) if isinstance(entity_df, pd.DataFrame) else 0
)
_fs = features if isinstance(features, FeatureService) else None
_fs_name = features.name if isinstance(features, FeatureService) else self._resolve_feature_service_name(_feature_refs)
_log_fn(
feature_refs=_feature_refs,
entity_count=_entity_count,
duration_seconds=_duration,
retrieval_type="historical",
feature_service=_fs,
feature_service_name=_fs_name,
project=self.project,
tracking_uri=self.config.mlflow.tracking_uri,
)

return job

def create_saved_dataset(
Expand Down Expand Up @@ -2621,6 +2715,8 @@ def get_online_features(
"""
provider = self._get_provider()

_retrieval_start = time.monotonic()

response = provider.get_online_features(
config=self.config,
features=features,
Expand All @@ -2631,6 +2727,36 @@ def get_online_features(
include_feature_view_version_metadata=include_feature_view_version_metadata,
)

# Auto-log to MLflow if configured
if (
self.config.mlflow is not None
and self.config.mlflow.enabled
and self.config.mlflow.auto_log
):
_log_fn = _get_mlflow_log_fn()
if _log_fn is not None:
_duration = time.monotonic() - _retrieval_start
_feature_refs = utils._get_features(
self.registry, self.project, features, allow_cache=True
)
_entity_count = (
len(entity_rows)
if isinstance(entity_rows, list)
else 0
)
_fs = features if isinstance(features, FeatureService) else None
_fs_name = features.name if isinstance(features, FeatureService) else self._resolve_feature_service_name(_feature_refs)
_log_fn(
feature_refs=_feature_refs,
entity_count=_entity_count,
duration_seconds=_duration,
retrieval_type="online",
feature_service=_fs,
feature_service_name=_fs_name,
project=self.project,
tracking_uri=self.config.mlflow.tracking_uri,
)

return response

async def get_online_features_async(
Expand Down Expand Up @@ -2781,10 +2907,7 @@ def _doc_feature(x):
online_features_response=online_features_response,
data=requested_features_data,
)
feature_types = {
f.name: f.dtype.to_value_type() for f in requested_feature_view.features
}
return OnlineResponse(online_features_response, feature_types=feature_types)
return OnlineResponse(online_features_response)

def retrieve_online_documents_v2(
self,
Expand Down Expand Up @@ -3074,8 +3197,7 @@ def _retrieve_from_online_store_v2(
online_features_response.metadata.feature_names.val.extend(
features_to_request
)
feature_types = {f.name: f.dtype.to_value_type() for f in table.features}
return OnlineResponse(online_features_response, feature_types=feature_types)
return OnlineResponse(online_features_response)

table_entity_values, idxs, output_len = utils._get_unique_entities_from_values(
entity_key_dict,
Expand All @@ -3098,8 +3220,7 @@ def _retrieve_from_online_store_v2(
data=entity_key_dict,
)

feature_types = {f.name: f.dtype.to_value_type() for f in table.features}
return OnlineResponse(online_features_response, feature_types=feature_types)
return OnlineResponse(online_features_response)

def serve(
self,
Expand Down
52 changes: 52 additions & 0 deletions sdk/python/feast/mlflow_integration/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""
MLflow integration for Feast Feature Store.

This module provides seamless integration between Feast and MLflow for
automatic experiment tracking of feature retrieval operations. When enabled
in feature_store.yaml, feature metadata is logged automatically to MLflow
during get_historical_features and get_online_features calls.

Usage:
Configure MLflow in your feature_store.yaml:

project: my_project
# ... other config ...

mlflow:
enabled: true
tracking_uri: http://localhost:5000
auto_log: true

Then use Feast normally - feature retrieval metadata is logged automatically
to any active MLflow run.

For advanced use cases, the module also provides:
- resolve_feature_service_from_model_uri: Map an MLflow model to its Feast
feature service.
- get_entity_df_from_mlflow_run: Reproduce training by pulling entity data
from a previous MLflow run's artifacts.
"""

from feast.mlflow_integration.config import MlflowConfig
from feast.mlflow_integration.entity_df_builder import (
FeastMlflowEntityDfError,
get_entity_df_from_mlflow_run,
)
from feast.mlflow_integration.logger import (
log_feature_retrieval_to_mlflow,
log_training_dataset_to_mlflow,
)
from feast.mlflow_integration.model_resolver import (
FeastMlflowModelResolutionError,
resolve_feature_service_from_model_uri,
)

__all__ = [
"MlflowConfig",
"log_feature_retrieval_to_mlflow",
"log_training_dataset_to_mlflow",
"resolve_feature_service_from_model_uri",
"FeastMlflowModelResolutionError",
"get_entity_df_from_mlflow_run",
"FeastMlflowEntityDfError",
]
37 changes: 37 additions & 0 deletions sdk/python/feast/mlflow_integration/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from typing import Optional

from pydantic import StrictBool, StrictStr

from feast.repo_config import FeastBaseModel


class MlflowConfig(FeastBaseModel):
"""Configuration for MLflow integration.

This enables automatic logging of feature retrieval metadata to MLflow
during get_historical_features and get_online_features calls.

Example configuration in feature_store.yaml:
mlflow:
enabled: true
tracking_uri: http://localhost:5000
auto_log: true
"""

enabled: StrictBool = False
""" bool: Whether MLflow integration is enabled. Defaults to False. """

tracking_uri: Optional[StrictStr] = None
""" str: MLflow tracking URI. If not set, uses MLflow's default
(MLFLOW_TRACKING_URI env var or local ./mlruns). """

auto_log: StrictBool = True
""" bool: Automatically log feature retrieval metadata to the active
MLflow run when get_historical_features or get_online_features is
called. Defaults to True. """

auto_log_dataset: StrictBool = False
""" bool: When True, the training DataFrame produced by
get_historical_features().to_df() is logged as an MLflow dataset
input on the active run. Defaults to False because the DataFrame
can be large. """
Loading
Loading