Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,25 +159,26 @@ def get_historical_features(
else:
start_date = make_tzaware(start_date)

entity_df = pd.DataFrame(
{
"event_timestamp": pd.date_range(
start=start_date, end=end_date, freq="1s", tz=timezone.utc
)[:1] # Just one row
}
)
entity_df = pd.DataFrame({"event_timestamp": [end_date]})

entity_schema = _get_entity_schema(entity_df, config)

entity_df_event_timestamp_col = (
offline_utils.infer_event_timestamp_from_entity_df(entity_schema)
)

entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range(
entity_df,
entity_df_event_timestamp_col,
config,
)
# In non-entity mode, use the actual requested range so that
# min_event_timestamp (= range[0] - TTL) doesn't clip the window.
# The synthetic entity_df only has end_date, which would wrongly
# set min_event_timestamp to end_date - TTL instead of start_date - TTL.
if start_date is not None and end_date is not None:
entity_df_event_timestamp_range = (start_date, end_date)
else:
entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range(
entity_df,
entity_df_event_timestamp_col,
config,
)

@contextlib.contextmanager
def query_generator() -> Iterator[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,3 +728,115 @@ def test_historical_features_field_mapping(
actual_df,
sort_by=["driver_id"],
)


@pytest.mark.integration
@pytest.mark.universal_offline_stores(only=["file"])
def test_historical_features_non_entity_retrieval(environment):
"""Test get_historical_features with entity_df=None using start_date/end_date.

This exercises the non-entity retrieval path where a synthetic entity_df is
generated internally. Regression test for the bug where start_date was used
instead of end_date for min_event_timestamp in the synthetic entity_df.
"""
store = environment.feature_store

now = datetime.now().replace(microsecond=0, second=0, minute=0)
two_days_ago = now - timedelta(days=2)
one_day_ago = now - timedelta(days=1)

driver_stats_df = pd.DataFrame(
data=[
{
"driver_id": 1001,
"avg_daily_trips": 10,
"event_timestamp": two_days_ago,
"created": two_days_ago,
},
{
"driver_id": 1001,
"avg_daily_trips": 20,
"event_timestamp": one_day_ago,
"created": one_day_ago,
},
{
"driver_id": 1001,
"avg_daily_trips": 30,
"event_timestamp": now,
"created": now,
},
{
"driver_id": 1002,
"avg_daily_trips": 100,
"event_timestamp": two_days_ago,
"created": two_days_ago,
},
{
"driver_id": 1002,
"avg_daily_trips": 200,
"event_timestamp": one_day_ago,
"created": one_day_ago,
},
{
"driver_id": 1002,
"avg_daily_trips": 300,
"event_timestamp": now,
"created": now,
},
]
)

start_date = now - timedelta(days=3)
end_date = now + timedelta(hours=1)

driver_stats_data_source = environment.data_source_creator.create_data_source(
df=driver_stats_df,
destination_name=f"test_driver_stats_{int(time.time_ns())}_{random.randint(1000, 9999)}",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

driver_entity = Entity(name="driver", join_keys=["driver_id"])
driver_fv = FeatureView(
name="driver_stats",
entities=[driver_entity],
schema=[Field(name="avg_daily_trips", dtype=Int32)],
source=driver_stats_data_source,
)

store.apply([driver_entity, driver_fv])

offline_job = store.get_historical_features(
entity_df=None,
features=["driver_stats:avg_daily_trips"],
full_feature_names=False,
start_date=start_date,
end_date=end_date,
)

actual_df = offline_job.to_df()

assert not actual_df.empty, "Result should not be empty"
assert "avg_daily_trips" in actual_df.columns

actual_driver_ids = set(actual_df["driver_id"].tolist())
assert 1001 in actual_driver_ids, "driver 1001 should be in results"
assert 1002 in actual_driver_ids, "driver 1002 should be in results"

# Verify timestamps fall within the requested range.
# Strip tz info to avoid tz-naive vs tz-aware comparison issues.
ts_start = pd.Timestamp(start_date).tz_localize(None)
ts_end = pd.Timestamp(end_date).tz_localize(None)
for ts in actual_df["event_timestamp"]:
ts_val = pd.Timestamp(ts).tz_localize(None)
assert ts_val >= ts_start, f"Timestamp {ts_val} before start_date"
assert ts_val <= ts_end, f"Timestamp {ts_val} after end_date"

# The latest features must be present -- this is the critical regression check.
# With the old bug (using start_date instead of end_date), the synthetic entity_df
# had wrong max_event_timestamp causing the latest rows to be missed.
actual_trips = set(actual_df["avg_daily_trips"].tolist())
assert 30 in actual_trips, "Latest trip value 30 for driver 1001 should be present"
assert 300 in actual_trips, (
"Latest trip value 300 for driver 1002 should be present"
)
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,71 @@ def test_non_entity_mode_with_both_dates(self):
assert "start_date" not in str(e)
assert "end_date" not in str(e)

def test_non_entity_entity_df_uses_end_date(self):
"""Test that the synthetic entity_df uses end_date, not start_date.

Regression test: the old code used pd.date_range(start=start_date, ...)[:1]
which put start_date in the entity_df. Since PIT joins use
MAX(entity_timestamp) as the upper bound, start_date made end_date
unreachable. The fix uses [end_date] directly.
"""
test_repo_config = RepoConfig(
project="test_project",
registry="test_registry",
provider="local",
offline_store=_mock_offline_store_config(),
)

feature_view = _mock_feature_view("test_fv", ttl=timedelta(days=1))
start_date = datetime(2023, 1, 1, tzinfo=timezone.utc)
end_date = datetime(2023, 1, 7, tzinfo=timezone.utc)

mock_get_entity_schema = MagicMock(
return_value={"event_timestamp": "timestamp"}
)

with (
patch.multiple(
"feast.infra.offline_stores.contrib.postgres_offline_store.postgres",
_get_conn=MagicMock(),
_upload_entity_df=MagicMock(),
_get_entity_schema=mock_get_entity_schema,
_get_entity_df_event_timestamp_range=MagicMock(
return_value=(start_date, end_date)
),
),
patch(
"feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.get_expected_join_keys",
return_value=[],
),
patch(
"feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.assert_expected_columns_in_entity_df",
),
patch(
"feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.get_feature_view_query_context",
return_value=[],
),
):
PostgreSQLOfflineStore.get_historical_features(
config=test_repo_config,
feature_views=[feature_view],
feature_refs=["test_fv:feature1"],
entity_df=None,
registry=MagicMock(),
project="test_project",
start_date=start_date,
end_date=end_date,
)

# _get_entity_schema is called with the synthetic entity_df
df = mock_get_entity_schema.call_args[0][0]
assert len(df) == 1
ts = df["event_timestamp"].iloc[0]
# The entity_df must use end_date, not start_date
assert ts == end_date, (
f"entity_df timestamp should be end_date ({end_date}), got {ts}"
)

def test_non_entity_mode_with_end_date_only(self):
"""Test non-entity retrieval calculates start_date from TTL"""
test_repo_config = RepoConfig(
Expand Down
Loading