From 434793dcc9c1a947243bd2176cff2e18ed18db0b Mon Sep 17 00:00:00 2001 From: Pamela Toman Date: Mon, 7 Mar 2022 21:25:29 -0800 Subject: [PATCH] Closes threadpool resources upon datastore online_write_batch completion Signed-off-by: Pamela Toman --- sdk/python/feast/infra/online_stores/datastore.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index a29a8393e2e..e7621ab88f8 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -159,11 +159,13 @@ def online_write_batch( write_batch_size = online_config.write_batch_size feast_project = config.project - pool = ThreadPool(processes=write_concurrency) - pool.map( - lambda b: self._write_minibatch(client, feast_project, table, b, progress), - self._to_minibatches(data, batch_size=write_batch_size), - ) + with ThreadPool(processes=write_concurrency) as pool: + pool.map( + lambda b: self._write_minibatch( + client, feast_project, table, b, progress + ), + self._to_minibatches(data, batch_size=write_batch_size), + ) @staticmethod def _to_minibatches(data: ProtoBatch, batch_size) -> Iterator[ProtoBatch]: