diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 7c65b649046..57db2c471d7 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -159,13 +159,7 @@ def get_historical_features( else: start_date = make_tzaware(start_date) - entity_df = pd.DataFrame( - { - "event_timestamp": pd.date_range( - start=start_date, end=end_date, freq="1s", tz=timezone.utc - )[:1] # Just one row - } - ) + entity_df = pd.DataFrame({"event_timestamp": [end_date]}) entity_schema = _get_entity_schema(entity_df, config) @@ -173,11 +167,18 @@ def get_historical_features( offline_utils.infer_event_timestamp_from_entity_df(entity_schema) ) - entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range( - entity_df, - entity_df_event_timestamp_col, - config, - ) + # In non-entity mode, use the actual requested range so that + # min_event_timestamp (= range[0] - TTL) doesn't clip the window. + # The synthetic entity_df only has end_date, which would wrongly + # set min_event_timestamp to end_date - TTL instead of start_date - TTL. + if start_date is not None and end_date is not None: + entity_df_event_timestamp_range = (start_date, end_date) + else: + entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range( + entity_df, + entity_df_event_timestamp_col, + config, + ) @contextlib.contextmanager def query_generator() -> Iterator[str]: diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 757e0d72a6d..a2c7381ddc6 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -728,3 +728,115 @@ def test_historical_features_field_mapping( actual_df, sort_by=["driver_id"], ) + + +@pytest.mark.integration +@pytest.mark.universal_offline_stores(only=["file"]) +def test_historical_features_non_entity_retrieval(environment): + """Test get_historical_features with entity_df=None using start_date/end_date. + + This exercises the non-entity retrieval path where a synthetic entity_df is + generated internally. Regression test for the bug where start_date was used + instead of end_date for min_event_timestamp in the synthetic entity_df. + """ + store = environment.feature_store + + now = datetime.now().replace(microsecond=0, second=0, minute=0) + two_days_ago = now - timedelta(days=2) + one_day_ago = now - timedelta(days=1) + + driver_stats_df = pd.DataFrame( + data=[ + { + "driver_id": 1001, + "avg_daily_trips": 10, + "event_timestamp": two_days_ago, + "created": two_days_ago, + }, + { + "driver_id": 1001, + "avg_daily_trips": 20, + "event_timestamp": one_day_ago, + "created": one_day_ago, + }, + { + "driver_id": 1001, + "avg_daily_trips": 30, + "event_timestamp": now, + "created": now, + }, + { + "driver_id": 1002, + "avg_daily_trips": 100, + "event_timestamp": two_days_ago, + "created": two_days_ago, + }, + { + "driver_id": 1002, + "avg_daily_trips": 200, + "event_timestamp": one_day_ago, + "created": one_day_ago, + }, + { + "driver_id": 1002, + "avg_daily_trips": 300, + "event_timestamp": now, + "created": now, + }, + ] + ) + + start_date = now - timedelta(days=3) + end_date = now + timedelta(hours=1) + + driver_stats_data_source = environment.data_source_creator.create_data_source( + df=driver_stats_df, + destination_name=f"test_driver_stats_{int(time.time_ns())}_{random.randint(1000, 9999)}", + timestamp_field="event_timestamp", + created_timestamp_column="created", + ) + + driver_entity = Entity(name="driver", join_keys=["driver_id"]) + driver_fv = FeatureView( + name="driver_stats", + entities=[driver_entity], + schema=[Field(name="avg_daily_trips", dtype=Int32)], + source=driver_stats_data_source, + ) + + store.apply([driver_entity, driver_fv]) + + offline_job = store.get_historical_features( + entity_df=None, + features=["driver_stats:avg_daily_trips"], + full_feature_names=False, + start_date=start_date, + end_date=end_date, + ) + + actual_df = offline_job.to_df() + + assert not actual_df.empty, "Result should not be empty" + assert "avg_daily_trips" in actual_df.columns + + actual_driver_ids = set(actual_df["driver_id"].tolist()) + assert 1001 in actual_driver_ids, "driver 1001 should be in results" + assert 1002 in actual_driver_ids, "driver 1002 should be in results" + + # Verify timestamps fall within the requested range. + # Strip tz info to avoid tz-naive vs tz-aware comparison issues. + ts_start = pd.Timestamp(start_date).tz_localize(None) + ts_end = pd.Timestamp(end_date).tz_localize(None) + for ts in actual_df["event_timestamp"]: + ts_val = pd.Timestamp(ts).tz_localize(None) + assert ts_val >= ts_start, f"Timestamp {ts_val} before start_date" + assert ts_val <= ts_end, f"Timestamp {ts_val} after end_date" + + # The latest features must be present -- this is the critical regression check. + # With the old bug (using start_date instead of end_date), the synthetic entity_df + # had wrong max_event_timestamp causing the latest rows to be missed. + actual_trips = set(actual_df["avg_daily_trips"].tolist()) + assert 30 in actual_trips, "Latest trip value 30 for driver 1001 should be present" + assert 300 in actual_trips, ( + "Latest trip value 300 for driver 1002 should be present" + ) diff --git a/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py b/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py index fad837e4c16..0cb0d98eeae 100644 --- a/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py +++ b/sdk/python/tests/unit/infra/offline_stores/contrib/postgres_offline_store/test_postgres.py @@ -615,6 +615,71 @@ def test_non_entity_mode_with_both_dates(self): assert "start_date" not in str(e) assert "end_date" not in str(e) + def test_non_entity_entity_df_uses_end_date(self): + """Test that the synthetic entity_df uses end_date, not start_date. + + Regression test: the old code used pd.date_range(start=start_date, ...)[:1] + which put start_date in the entity_df. Since PIT joins use + MAX(entity_timestamp) as the upper bound, start_date made end_date + unreachable. The fix uses [end_date] directly. + """ + test_repo_config = RepoConfig( + project="test_project", + registry="test_registry", + provider="local", + offline_store=_mock_offline_store_config(), + ) + + feature_view = _mock_feature_view("test_fv", ttl=timedelta(days=1)) + start_date = datetime(2023, 1, 1, tzinfo=timezone.utc) + end_date = datetime(2023, 1, 7, tzinfo=timezone.utc) + + mock_get_entity_schema = MagicMock( + return_value={"event_timestamp": "timestamp"} + ) + + with ( + patch.multiple( + "feast.infra.offline_stores.contrib.postgres_offline_store.postgres", + _get_conn=MagicMock(), + _upload_entity_df=MagicMock(), + _get_entity_schema=mock_get_entity_schema, + _get_entity_df_event_timestamp_range=MagicMock( + return_value=(start_date, end_date) + ), + ), + patch( + "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.get_expected_join_keys", + return_value=[], + ), + patch( + "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.assert_expected_columns_in_entity_df", + ), + patch( + "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.offline_utils.get_feature_view_query_context", + return_value=[], + ), + ): + PostgreSQLOfflineStore.get_historical_features( + config=test_repo_config, + feature_views=[feature_view], + feature_refs=["test_fv:feature1"], + entity_df=None, + registry=MagicMock(), + project="test_project", + start_date=start_date, + end_date=end_date, + ) + + # _get_entity_schema is called with the synthetic entity_df + df = mock_get_entity_schema.call_args[0][0] + assert len(df) == 1 + ts = df["event_timestamp"].iloc[0] + # The entity_df must use end_date, not start_date + assert ts == end_date, ( + f"entity_df timestamp should be end_date ({end_date}), got {ts}" + ) + def test_non_entity_mode_with_end_date_only(self): """Test non-entity retrieval calculates start_date from TTL""" test_repo_config = RepoConfig(