diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index fbbebcf4ed8f92..5f17124c6a61cf 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -132,6 +132,36 @@ provide the public methods described below. will not block. Similarly, if full() returns ``False`` it doesn't guarantee that a subsequent call to put() will not block. +.. method:: Queue.__iter__() + + Return an :term:`iterator` which iterates over the queue of items until + :meth:`shutdown` is called and continues iteration until the queue is + empty. + + Example:: + + import concurrent.futures + import queue + import time + + def worker(name, q): + for item in q: + time.sleep(.01) + print(f'{name} finished {item}') + + q = queue.Queue() + for item in range(30): + q.put(item) + + q.shutdown() + with concurrent.futures.ThreadPoolExecutor() as tp: + for i in range(3): + tp.submit(worker, f'worker-{i}', q) + + print('All work completed') + + .. versionadded:: 3.14 + .. method:: Queue.put(item, block=True, timeout=None) @@ -212,8 +242,7 @@ Example of how to wait for enqueued tasks to be completed:: q = queue.Queue() def worker(): - while True: - item = q.get() + for item in q: print(f'Working on {item}') print(f'Finished {item}') q.task_done() diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index 804d39ab64646d..e8cd8b86a92206 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -107,6 +107,12 @@ pathlib another, like :func:`shutil.copyfile`. (Contributed by Barney Gale in :gh:`73991`.) +queue +----- + +Made :class:`queue.Queue` an :term:`iterable`. +(Contributed by Wannes Boeykens in :gh:`120503`.) + symtable -------- diff --git a/Lib/queue.py b/Lib/queue.py index 25beb46e30d6bd..e392bc66e07783 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -275,6 +275,13 @@ def _put(self, item): def _get(self): return self.queue.popleft() + def __iter__(self): + try: + while True: + yield self.get() + except ShutDown: + return + __class_getitem__ = classmethod(types.GenericAlias) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 6dced7df0064d7..6fa8daffb8bc92 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -1,5 +1,6 @@ # Some simple queue module tests, plus some failure conditions # to ensure the Queue locks remain stable. +import concurrent.futures import itertools import random import threading @@ -150,34 +151,52 @@ def basic_queue_test(self, q): self.do_blocking_test(q.get, (), q.put, ('empty',)) self.do_blocking_test(q.get, (True, 10), q.put, ('empty',)) + def test_iter(self): + q = self.type2test() + for i in range(100): + q.put(i) - def worker(self, q): - while True: - x = q.get() - if x < 0: - q.task_done() - return - with self.cumlock: - self.cum += x - q.task_done() + q.shutdown() + self.cum = 0 + + def worker(): + for x in q: + with self.cumlock: + self.cum += x + + with concurrent.futures.ThreadPoolExecutor() as tp: + tp.submit(worker) + tp.submit(worker) + + self.assertEqual(self.cum, sum(range(100))) def queue_join_test(self, q): self.cum = 0 - threads = [] - for i in (0,1): - thread = threading.Thread(target=self.worker, args=(q,)) - thread.start() - threads.append(thread) - for i in range(100): - q.put(i) - q.join() - self.assertEqual(self.cum, sum(range(100)), - "q.join() did not block until all tasks were done") - for i in (0,1): - q.put(-1) # instruct the threads to close - q.join() # verify that you can join twice - for thread in threads: - thread.join() + + def worker(): + for x in q: + with self.cumlock: + self.cum += x + q.task_done() + + with concurrent.futures.ThreadPoolExecutor() as tp: + tp.submit(worker) + tp.submit(worker) + for i in range(100): + q.put(i) + + q.join() + self.assertEqual(self.cum, sum(range(100)), + "q.join() didn't block until all tasks were done") + for i in range(100, 200): + q.put(i) + + q.join() # verify that you can join twice + self.assertEqual(self.cum, sum(range(200)), + "q.join() didn't block until all tasks were done") + + # instruct the threads to close + q.shutdown() def test_queue_task_done(self): # Test to make sure a queue task completed successfully. @@ -192,15 +211,15 @@ def test_queue_task_done(self): def test_queue_join(self): # Test that a queue join()s successfully, and before anything else # (done twice for insurance). - q = self.type2test() - self.queue_join_test(q) - self.queue_join_test(q) - try: - q.task_done() - except ValueError: - pass - else: - self.fail("Did not detect task count going negative") + for _ in range(2): + q = self.type2test() + self.queue_join_test(q) + try: + q.task_done() + except ValueError: + pass + else: + self.fail("Did not detect task count going negative") def test_basic(self): # Do it a couple of times on the same queue. diff --git a/Misc/NEWS.d/next/Library/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst b/Misc/NEWS.d/next/Library/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst new file mode 100644 index 00000000000000..e7cf8f084e6073 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst @@ -0,0 +1,2 @@ +Made :class:`queue.Queue` an :term:`iterable`. +(Contributed by Wannes Boeykens in :gh:`120503`.)