Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 87 additions & 1 deletion sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
)
from fastapi.concurrency import run_in_threadpool
from fastapi.logger import logger
from fastapi.responses import JSONResponse
from fastapi.responses import JSONResponse, ORJSONResponse
from fastapi.staticfiles import StaticFiles
from google.protobuf.json_format import MessageToDict
from pydantic import BaseModel, field_validator
Expand All @@ -50,9 +50,11 @@
from feast.data_source import PushMode
from feast.errors import (
FeastError,
FeatureViewNotFoundException,
)
from feast.feast_object import FeastObject
from feast.feature_view_utils import get_feature_view_from_feature_store
from feast.filter_models import ComparisonFilter, CompoundFilter
from feast.permissions.action import WRITE, AuthzedAction
from feast.permissions.security_manager import assert_permissions
from feast.permissions.server.rest import inject_user_details
Expand Down Expand Up @@ -108,7 +110,42 @@ class GetOnlineDocumentsRequest(BaseModel):
top_k: Optional[int] = None
query: Optional[List[float]] = None
query_string: Optional[str] = None
distance_metric: Optional[str] = None
api_version: Optional[int] = 1
filters: Optional[Union[ComparisonFilter, CompoundFilter]] = None


class OpenAISearchMetadata(BaseModel):
features_to_retrieve: Optional[List[str]] = None
content_field: Optional[str] = None


class OpenAIComparisonFilter(BaseModel):
key: str
type: str
value: Union[str, int, float, bool, List[Union[str, int]]]


class OpenAICompoundFilter(BaseModel):
type: str
filters: List[Union[OpenAIComparisonFilter, "OpenAICompoundFilter"]]


OpenAICompoundFilter.model_rebuild()


class OpenAIRankingOptions(BaseModel):
ranker: Optional[str] = None
score_threshold: Optional[float] = None


class OpenAISearchRequest(BaseModel):
query: Union[str, List[str]]
filters: Optional[Union[OpenAIComparisonFilter, OpenAICompoundFilter]] = None
max_num_results: Optional[int] = 10
ranking_options: Optional[OpenAIRankingOptions] = None
rewrite_query: Optional[bool] = None
metadata: Optional[OpenAISearchMetadata] = None


class FeatureVectorResponse(BaseModel):
Expand Down Expand Up @@ -418,6 +455,10 @@ async def retrieve_online_documents(
)
if request.api_version == 2 and request.query_string is not None:
read_params["query_string"] = request.query_string
if request.api_version == 2 and request.distance_metric is not None:
read_params["distance_metric"] = request.distance_metric
if request.api_version == 2 and request.filters is not None:
read_params["filters"] = request.filters

if request.api_version == 2:
response = await run_in_threadpool(
Expand All @@ -436,6 +477,51 @@ async def retrieve_online_documents(
)
return response_dict

@app.post(
"/v1/vector_stores/{vector_store_id}/search",
dependencies=[Depends(inject_user_details)],
)
async def openai_vector_store_search(
vector_store_id: str,
request: OpenAISearchRequest,
) -> ORJSONResponse:
with feast_metrics.track_request_latency(
"/v1/vector_stores/{vector_store_id}/search"
):
try:
result = await run_in_threadpool(
lambda: store.retrieve_online_documents_openai(
vector_store_id=vector_store_id,
query=request.query,
max_num_results=request.max_num_results or 10,
filters=(
request.filters.model_dump() if request.filters else None
),
ranking_options=(
request.ranking_options.model_dump()
if request.ranking_options
else None
),
rewrite_query=request.rewrite_query,
features_to_retrieve=(
request.metadata.features_to_retrieve
if request.metadata
else None
),
)
)
except FeatureViewNotFoundException:
return ORJSONResponse(
status_code=404,
content={
"error": {
"message": f"No vector store found with id '{vector_store_id}'",
"type": "not_found_error",
}
},
)
return ORJSONResponse(content=result)

@app.post("/push", dependencies=[Depends(inject_user_details)])
async def push(request: PushFeaturesRequest) -> Response:
with feast_metrics.track_request_latency("/push"):
Expand Down
160 changes: 160 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
from feast.feast_object import FeastObject
from feast.feature_service import FeatureService
from feast.feature_view import DUMMY_ENTITY, DUMMY_ENTITY_NAME, FeatureView
from feast.filter_models import ComparisonFilter, CompoundFilter, convert_dict_to_filter
from feast.inference import (
update_data_sources_with_inferred_event_timestamp_col,
update_feature_views_with_inferred_features_and_entities,
Expand Down Expand Up @@ -2677,6 +2678,7 @@ def retrieve_online_documents_v2(
text_weight: float = 0.5,
image_weight: float = 0.5,
combine_strategy: str = "weighted_sum",
filters: Optional[Union[ComparisonFilter, CompoundFilter]] = None,
) -> OnlineResponse:
"""
Retrieves the top k closest document features. Note, embeddings are a subset of features.
Expand Down Expand Up @@ -2829,8 +2831,164 @@ def retrieve_online_documents_v2(
top_k,
distance_metric,
query_string,
filters,
)

def retrieve_online_documents_openai(
self,
vector_store_id: str,
query: Union[str, List[str]],
max_num_results: int = 10,
filters: Optional[Dict[str, Any]] = None,
ranking_options: Optional[Dict[str, Any]] = None,
rewrite_query: Optional[bool] = None,
features_to_retrieve: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""
OpenAI-compatible vector store search.

Accepts a raw query string, optionally embeds it via LiteLLM
(when ``query_embedding_model`` is configured in feature_store.yaml),
and returns results in OpenAI's ``vector_store.search_results.page``
format.

Args:
vector_store_id: Feature view name (maps to the OpenAI
``vector_store_id`` path parameter).
query: Natural language query string, or list of strings.
max_num_results: Maximum number of results to return.
filters: OpenAI-compatible filters (accepted but not yet
applied).
ranking_options: OpenAI-compatible ranking options (accepted
but not yet applied).
rewrite_query: Whether to rewrite the query (accepted but
not yet applied).
features_to_retrieve: Specific feature names to return.
If None, all features from the feature view are used.

Returns:
Dict matching the OpenAI ``vector_store.search_results.page``
schema.

Examples:
Keyword search (no embedding model configured)::

result = store.retrieve_online_documents_openai(
vector_store_id="city_embeddings",
query="cities in California",
max_num_results=5,
)

Vector search (embedding model configured in YAML)::

# feature_store.yaml has:
# feature_server:
# query_embedding_model: text-embedding-3-small
result = store.retrieve_online_documents_openai(
vector_store_id="product_embeddings",
query="wireless audio device",
max_num_results=3,
features_to_retrieve=["name", "description"],
)
"""
feature_view = self.get_feature_view(vector_store_id)

if features_to_retrieve:
feature_names = features_to_retrieve
else:
feature_names = [f.name for f in feature_view.features]

features = [f"{feature_view.name}:{name}" for name in feature_names]
query_text = query if isinstance(query, str) else " ".join(query)

embed_cfg = self.config.embedding_model
if embed_cfg is None:
raise ValueError(
"embedding_model is not configured in feature_store.yaml. "
"Add an 'embedding_model' section with at least a 'model' "
"field to use retrieve_online_documents_openai.\n"
"Example:\n"
" embedding_model:\n"
" model: text-embedding-3-small\n"
" api_key: sk-..."
)

try:
from litellm import embedding as litellm_embedding
except ImportError:
raise ImportError(
"litellm is required for query embedding. "
"Install with: pip install litellm"
)

litellm_kwargs: Dict[str, Any] = {
"model": embed_cfg.model,
"input": [query_text],
}
if embed_cfg.api_key:
litellm_kwargs["api_key"] = embed_cfg.api_key
if embed_cfg.api_base:
litellm_kwargs["api_base"] = embed_cfg.api_base
if embed_cfg.api_version:
litellm_kwargs["api_version"] = embed_cfg.api_version
if embed_cfg.dimensions:
litellm_kwargs["dimensions"] = embed_cfg.dimensions

embed_response = litellm_embedding(**litellm_kwargs)
query_embedding = embed_response.data[0]["embedding"]

typed_filters: Optional[Union[ComparisonFilter, CompoundFilter]] = None
if filters is not None:
typed_filters = convert_dict_to_filter(filters)

response = self.retrieve_online_documents_v2(
features=features,
query=query_embedding,
top_k=max_num_results,
filters=typed_filters,
)

response_dict = response.to_dict()

result_data = []
if response_dict:
first_key = next(iter(response_dict))
num_rows = len(response_dict.get(first_key, []))
for i in range(num_rows):
score = 0.0
attributes: Dict[str, Any] = {}
content_parts: List[Dict[str, str]] = []

for key, values in response_dict.items():
val = values[i] if i < len(values) else None
if key == "distance":
score = float(val) if val is not None else 0.0
else:
attributes[key] = val
if isinstance(val, str):
content_parts.append({"type": "text", "text": val})

result_data.append(
{
"file_id": f"{vector_store_id}_{i}",
"filename": vector_store_id,
"score": score,
"attributes": attributes,
"content": content_parts
if content_parts
else [{"type": "text", "text": str(attributes)}],
}
)

search_query = query if isinstance(query, list) else [query]
return {
"object": "vector_store.search_results.page",
"search_query": search_query,
"data": result_data,
"has_more": False,
"next_page": None,
}

def _retrieve_from_online_store(
self,
provider: Provider,
Expand Down Expand Up @@ -2893,6 +3051,7 @@ def _retrieve_from_online_store_v2(
top_k: int,
distance_metric: Optional[str],
query_string: Optional[str],
filters: Optional[Union[ComparisonFilter, CompoundFilter]] = None,
) -> OnlineResponse:
"""
Search and return document features from the online document store.
Expand All @@ -2909,6 +3068,7 @@ def _retrieve_from_online_store_v2(
top_k=top_k,
distance_metric=distance_metric,
query_string=query_string,
filters=filters,
)

entity_key_dict: Dict[str, List[ValueProto]] = {}
Expand Down
50 changes: 50 additions & 0 deletions sdk/python/feast/filter_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from typing import Any, Dict, List, Literal, Optional, Union

from pydantic import BaseModel


class ComparisonFilter(BaseModel):
"""A filter that compares a metadata field against a value.

:param type: The comparison operator to apply
:param key: The metadata field name to filter on
:param value: The value to compare against
"""

type: Literal["eq", "ne", "gt", "gte", "lt", "lte", "in", "nin"]
key: str
value: Any


class CompoundFilter(BaseModel):
"""A filter that combines multiple filters with a logical operator.

:param type: The logical operator ("and" requires all filters match,
"or" requires any filter matches)
:param filters: The list of filters to combine
"""

type: Literal["and", "or"]
filters: List[Union[ComparisonFilter, "CompoundFilter"]]


CompoundFilter.model_rebuild()

FilterType = Optional[Union[ComparisonFilter, CompoundFilter]]


def convert_dict_to_filter(
filter_dict: Dict[str, Any],
) -> Union[ComparisonFilter, CompoundFilter]:
"""Convert a raw dict (e.g. from OpenAI-compatible JSON) into a typed filter object."""
filter_type = filter_dict.get("type")
if filter_type in ("and", "or"):
return CompoundFilter(
type=filter_type,
filters=[convert_dict_to_filter(f) for f in filter_dict["filters"]],
)
return ComparisonFilter(
type=filter_dict["type"],
key=filter_dict["key"],
value=filter_dict["value"],
)
Loading
Loading