diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index b557fddc687..d32740adf32 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -68,6 +68,9 @@ class RedisOnlineStoreConfig(FeastConfigBaseModel): """Connection string containing the host, port, and configuration parameters for Redis format: host:port,parameter1,parameter2 eg. redis:6379,db=0 """ + key_ttl_seconds: Optional[int] = None + """(Optional) redis key bin ttl (in seconds) for expiring entities""" + class RedisOnlineStore(OnlineStore): _client: Optional[Union[Redis, RedisCluster]] = None @@ -227,9 +230,11 @@ def online_write_batch( entity_hset[f_key] = val.SerializeToString() pipe.hset(redis_key_bin, mapping=entity_hset) - # TODO: support expiring the entity / features in Redis - # otherwise entity features remain in redis until cleaned up in separate process - # client.expire redis_key_bin based a ttl setting + + if online_store_config.key_ttl_seconds: + pipe.expire( + name=redis_key_bin, time=online_store_config.key_ttl_seconds + ) results = pipe.execute() if progress: progress(len(results)) diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 7d6296baa51..70fc7d2866f 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -35,6 +35,67 @@ from tests.utils.data_source_utils import prep_file_source +@pytest.mark.integration +def test_entity_ttl_online_store(local_redis_environment, universal_data_sources): + if os.getenv("FEAST_IS_LOCAL_TEST", "False") == "True": + return + fs = local_redis_environment.feature_store + # setting ttl setting in online store to 1 second + fs.config.online_store.key_ttl_seconds = 1 + entities, datasets, data_sources = universal_data_sources + driver_hourly_stats = create_driver_hourly_stats_feature_view( + data_sources["driver"] + ) + driver_entity = driver() + + # Register Feature View and Entity + fs.apply([driver_hourly_stats, driver_entity]) + + # fake data to ingest into Online Store + data = { + "driver_id": [1], + "conv_rate": [0.5], + "acc_rate": [0.6], + "avg_daily_trips": [4], + "event_timestamp": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")], + "created": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")], + } + df_ingest = pd.DataFrame(data) + + # directly ingest data into the Online Store + fs.write_to_online_store("driver_stats", df_ingest) + + # assert the right data is in the Online Store + df = fs.get_online_features( + features=[ + "driver_stats:avg_daily_trips", + "driver_stats:acc_rate", + "driver_stats:conv_rate", + ], + entity_rows=[{"driver": 1}], + ).to_df() + assertpy.assert_that(df["avg_daily_trips"].iloc[0]).is_equal_to(4) + assertpy.assert_that(df["acc_rate"].iloc[0]).is_close_to(0.6, 1e-6) + assertpy.assert_that(df["conv_rate"].iloc[0]).is_close_to(0.5, 1e-6) + + # simulate time passing for testing ttl + time.sleep(1) + + # retrieve the same entity again + df = fs.get_online_features( + features=[ + "driver_stats:avg_daily_trips", + "driver_stats:acc_rate", + "driver_stats:conv_rate", + ], + entity_rows=[{"driver": 1}], + ).to_df() + # assert that the entity features expired in the online store + assertpy.assert_that(df["avg_daily_trips"].iloc[0]).is_none() + assertpy.assert_that(df["acc_rate"].iloc[0]).is_none() + assertpy.assert_that(df["conv_rate"].iloc[0]).is_none() + + # TODO: make this work with all universal (all online store types) @pytest.mark.integration def test_write_to_online_store_event_check(local_redis_environment):