From e826a01248a840f05182c97536f85e089ec584cf Mon Sep 17 00:00:00 2001 From: Nineteendo Date: Fri, 14 Jun 2024 15:28:10 +0200 Subject: [PATCH 01/26] Add `queue.Queue.__iter__()` --- Lib/queue.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/Lib/queue.py b/Lib/queue.py index 25beb46e30d6bd..a52c3984f404f0 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -275,6 +275,13 @@ def _put(self, item): def _get(self): return self.queue.popleft() + def __iter__(self): + while True: + try: + yield self.get_nowait() + except (Empty, ShutDown): + return + __class_getitem__ = classmethod(types.GenericAlias) From c8cbe4d0b1a48d31bfaa2dba3f3874c679a02dbe Mon Sep 17 00:00:00 2001 From: Nineteendo Date: Fri, 14 Jun 2024 15:34:19 +0200 Subject: [PATCH 02/26] Add `asyncio.Queue.__iter__()` --- Lib/asyncio/queues.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index 2f3865114a84f9..bc1bc114252656 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -76,6 +76,13 @@ def _wakeup_next(self, waiters): waiter.set_result(None) break + def __iter__(self): + while True: + try: + yield self.get_nowait() + except (QueueEmpty, QueueShutDown): + return + def __repr__(self): return f'<{type(self).__name__} at {id(self):#x} {self._format()}>' From 940e3c0ee41835d62771bfaf00dfc3168ea1206c Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Fri, 14 Jun 2024 13:37:47 +0000 Subject: [PATCH 03/26] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20b?= =?UTF-8?q?lurb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Core and Builtins/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst diff --git a/Misc/NEWS.d/next/Core and Builtins/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst b/Misc/NEWS.d/next/Core and Builtins/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst new file mode 100644 index 00000000000000..bda2559146f42a --- /dev/null +++ b/Misc/NEWS.d/next/Core and Builtins/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst @@ -0,0 +1 @@ +Make :class:`queue.Queue` and `asyncio.Queue` iterables. From 8fca2888da1ae26bf442723b922617be76b2f1e2 Mon Sep 17 00:00:00 2001 From: Nice Zombies Date: Fri, 14 Jun 2024 17:59:19 +0200 Subject: [PATCH 04/26] Fix lint --- .../2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Core and Builtins/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst b/Misc/NEWS.d/next/Core and Builtins/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst index bda2559146f42a..779814d180fbe4 100644 --- a/Misc/NEWS.d/next/Core and Builtins/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst +++ b/Misc/NEWS.d/next/Core and Builtins/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst @@ -1 +1 @@ -Make :class:`queue.Queue` and `asyncio.Queue` iterables. +Make :class:`queue.Queue` and :class:`asyncio.Queue` iterables. From 499457c1993c69f6cf7ff33ac94dcf082fc6b039 Mon Sep 17 00:00:00 2001 From: Nineteendo Date: Fri, 14 Jun 2024 19:36:33 +0200 Subject: [PATCH 05/26] Wait --- Lib/asyncio/queues.py | 7 ------- Lib/queue.py | 4 ++-- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index bc1bc114252656..2f3865114a84f9 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -76,13 +76,6 @@ def _wakeup_next(self, waiters): waiter.set_result(None) break - def __iter__(self): - while True: - try: - yield self.get_nowait() - except (QueueEmpty, QueueShutDown): - return - def __repr__(self): return f'<{type(self).__name__} at {id(self):#x} {self._format()}>' diff --git a/Lib/queue.py b/Lib/queue.py index a52c3984f404f0..a01ccea6cb3b8c 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -278,8 +278,8 @@ def _get(self): def __iter__(self): while True: try: - yield self.get_nowait() - except (Empty, ShutDown): + yield self.get() + except ShutDown: return __class_getitem__ = classmethod(types.GenericAlias) From 7f206d98a7928a4531a84082d5874e8b6296d0f6 Mon Sep 17 00:00:00 2001 From: Nice Zombies Date: Fri, 14 Jun 2024 19:37:02 +0200 Subject: [PATCH 06/26] Update 2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst --- .../2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Core and Builtins/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst b/Misc/NEWS.d/next/Core and Builtins/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst index 779814d180fbe4..112317b8ac4b91 100644 --- a/Misc/NEWS.d/next/Core and Builtins/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst +++ b/Misc/NEWS.d/next/Core and Builtins/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst @@ -1 +1 @@ -Make :class:`queue.Queue` and :class:`asyncio.Queue` iterables. +Make :class:`queue.Queue` an iterable. From 3cad1810cbe41fd88c1e8ce5244d152b5156e028 Mon Sep 17 00:00:00 2001 From: Nice Zombies Date: Tue, 18 Jun 2024 07:49:15 +0200 Subject: [PATCH 07/26] Move while loop inside try/except block --- Lib/queue.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Lib/queue.py b/Lib/queue.py index a01ccea6cb3b8c..e392bc66e07783 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -276,11 +276,11 @@ def _get(self): return self.queue.popleft() def __iter__(self): - while True: - try: + try: + while True: yield self.get() - except ShutDown: - return + except ShutDown: + return __class_getitem__ = classmethod(types.GenericAlias) From cc9f7ef91dd4d89165ba8410a6ad144e222915b2 Mon Sep 17 00:00:00 2001 From: Nineteendo Date: Tue, 18 Jun 2024 08:24:51 +0200 Subject: [PATCH 08/26] test iteration --- Lib/test/test_queue.py | 42 +++++++++++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index d5927fbf39142b..511861ea5dfd3e 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -152,21 +152,34 @@ def basic_queue_test(self, q): self.do_blocking_test(q.get, (True, 10), q.put, ('empty',)) - def worker(self, q): - while True: - x = q.get() - if x < 0: - q.task_done() - return + def worker(self, q, task_done): + for x in q: with self.cumlock: self.cum += x - q.task_done() + if task_done: + q.task_done() + + def queue_iter_test(self, _): + q = self.type2test() + self.cum = 0 + threads = [] + for i in range(2): + thread = threading.Thread(target=self.worker, args=(q, False)) + thread.start() + threads.append(thread) + for i in range(100): + q.put(i) + q.shutdown() + for thread in threads: + thread.join() + self.assertEqual(self.cum, sum(range(100))) - def queue_join_test(self, q): + def queue_join_test(self, _): + q = self.type2test() self.cum = 0 threads = [] - for i in (0,1): - thread = threading.Thread(target=self.worker, args=(q,)) + for i in range(2): + thread = threading.Thread(target=self.worker, args=(q, True)) thread.start() threads.append(thread) for i in range(100): @@ -174,9 +187,12 @@ def queue_join_test(self, q): q.join() self.assertEqual(self.cum, sum(range(100)), "q.join() did not block until all tasks were done") - for i in (0,1): - q.put(-1) # instruct the threads to close - q.join() # verify that you can join twice + for i in range(100, 200): + q.put(i) + q.shutdown() + q.join() # verify that you can join twice + self.assertEqual(self.cum, sum(range(200)), + "q.join() did not block until all tasks were done") for thread in threads: thread.join() From 92d7dc025519c526caa92c4bf8a851db5ffcd884 Mon Sep 17 00:00:00 2001 From: Nineteendo Date: Tue, 18 Jun 2024 08:26:06 +0200 Subject: [PATCH 09/26] Reduce diff --- Lib/test/test_queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 511861ea5dfd3e..0d9913ced5e30b 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -163,7 +163,7 @@ def queue_iter_test(self, _): q = self.type2test() self.cum = 0 threads = [] - for i in range(2): + for i in (0,1): thread = threading.Thread(target=self.worker, args=(q, False)) thread.start() threads.append(thread) @@ -178,7 +178,7 @@ def queue_join_test(self, _): q = self.type2test() self.cum = 0 threads = [] - for i in range(2): + for i in (0,1): thread = threading.Thread(target=self.worker, args=(q, True)) thread.start() threads.append(thread) From a92e0318f86d017b831aabf58def4b1feb3fd861 Mon Sep 17 00:00:00 2001 From: Nineteendo Date: Tue, 18 Jun 2024 08:36:04 +0200 Subject: [PATCH 10/26] Run test --- Lib/test/test_queue.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 0d9913ced5e30b..b23c3c12d5a19d 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -159,7 +159,7 @@ def worker(self, q, task_done): if task_done: q.task_done() - def queue_iter_test(self, _): + def test_iter(self): q = self.type2test() self.cum = 0 threads = [] @@ -174,8 +174,7 @@ def queue_iter_test(self, _): thread.join() self.assertEqual(self.cum, sum(range(100))) - def queue_join_test(self, _): - q = self.type2test() + def queue_join_test(self, q): self.cum = 0 threads = [] for i in (0,1): @@ -209,15 +208,15 @@ def test_queue_task_done(self): def test_queue_join(self): # Test that a queue join()s successfully, and before anything else # (done twice for insurance). - q = self.type2test() - self.queue_join_test(q) - self.queue_join_test(q) - try: - q.task_done() - except ValueError: - pass - else: - self.fail("Did not detect task count going negative") + for _ in range(2): + q = self.type2test() + self.queue_join_test(q) + try: + q.task_done() + except ValueError: + pass + else: + self.fail("Did not detect task count going negative") def test_basic(self): # Do it a couple of times on the same queue. From 3621cc169d06928f11870944d72089dceb533638 Mon Sep 17 00:00:00 2001 From: Nineteendo Date: Tue, 18 Jun 2024 08:38:36 +0200 Subject: [PATCH 11/26] Reduce diff --- Lib/test/test_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index b23c3c12d5a19d..6ddd0278afcfc0 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -189,7 +189,7 @@ def queue_join_test(self, q): for i in range(100, 200): q.put(i) q.shutdown() - q.join() # verify that you can join twice + q.join() # verify that you can join twice self.assertEqual(self.cum, sum(range(200)), "q.join() did not block until all tasks were done") for thread in threads: From 98252c6e925d85fc988cd19b51621d3d478b0970 Mon Sep 17 00:00:00 2001 From: Nineteendo Date: Tue, 18 Jun 2024 09:18:03 +0200 Subject: [PATCH 12/26] Adapt example from asyncio --- Doc/library/queue.rst | 71 +++++++++++++++++++++++++++---------------- 1 file changed, 45 insertions(+), 26 deletions(-) diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index fbbebcf4ed8f92..b9871e2bc91cd6 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -132,6 +132,51 @@ provide the public methods described below. will not block. Similarly, if full() returns ``False`` it doesn't guarantee that a subsequent call to put() will not block. +.. method:: Queue.__iter__() + + Return an :term:`iterator` which iterates over the items in this queue + until :meth:`Queue.shutdown` is called and the queue is empty. + + .. versionadded:: 3.14 + + +Example of how to wait for enqueued tasks to be completed:: + + import concurrent.futures, queue, random, time + + def worker(name, queue): + # Get a "work item" out of the queue. + for sleep_for in queue: + # Sleep for the "sleep_for" seconds. + time.sleep(sleep_for) + + print(f'{name} has slept for {sleep_for:.2f} seconds') + + # Create a queue that we will use to store our "workload". + q = queue.Queue() + + # Generate random timings and put them into the queue. + total_sleep_time = 0 + for _ in range(20): + sleep_for = random.uniform(0.05, 1.0) + total_sleep_time += sleep_for + q.put_nowait(sleep_for) + + # All tasks have been queued + q.shutdown() + + # Create three worker tasks to process the queue concurrently. + started_at = time.monotonic() + with concurrent.futures.ThreadPoolExecutor() as tp: + for i in range(3): + tp.submit(worker, f'worker-{i}', q) + + total_slept_for = time.monotonic() - started_at + + print('====') + print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') + print(f'total expected sleep time: {total_sleep_time:.2f} seconds') + .. method:: Queue.put(item, block=True, timeout=None) @@ -204,32 +249,6 @@ fully processed by daemon consumer threads. count of unfinished tasks drops to zero, :meth:`join` unblocks. -Example of how to wait for enqueued tasks to be completed:: - - import threading - import queue - - q = queue.Queue() - - def worker(): - while True: - item = q.get() - print(f'Working on {item}') - print(f'Finished {item}') - q.task_done() - - # Turn-on the worker thread. - threading.Thread(target=worker, daemon=True).start() - - # Send thirty task requests to the worker. - for item in range(30): - q.put(item) - - # Block until all tasks are done. - q.join() - print('All work completed') - - Terminating queues ^^^^^^^^^^^^^^^^^^ From 4b1ff4ba47578fdb68e91b6a05325a00ddbd78c3 Mon Sep 17 00:00:00 2001 From: Nineteendo Date: Tue, 18 Jun 2024 09:23:18 +0200 Subject: [PATCH 13/26] Simpler example --- Doc/library/queue.rst | 39 ++++++++++++++------------------------- 1 file changed, 14 insertions(+), 25 deletions(-) diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index b9871e2bc91cd6..667696fb48ab8a 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -142,40 +142,29 @@ provide the public methods described below. Example of how to wait for enqueued tasks to be completed:: - import concurrent.futures, queue, random, time + import concurrent.futures + import queue - def worker(name, queue): - # Get a "work item" out of the queue. - for sleep_for in queue: - # Sleep for the "sleep_for" seconds. - time.sleep(sleep_for) - - print(f'{name} has slept for {sleep_for:.2f} seconds') - - # Create a queue that we will use to store our "workload". q = queue.Queue() - # Generate random timings and put them into the queue. - total_sleep_time = 0 - for _ in range(20): - sleep_for = random.uniform(0.05, 1.0) - total_sleep_time += sleep_for - q.put_nowait(sleep_for) + # Queue thirty tasks. + for item in range(30): + q.put(item) # All tasks have been queued q.shutdown() - # Create three worker tasks to process the queue concurrently. - started_at = time.monotonic() - with concurrent.futures.ThreadPoolExecutor() as tp: - for i in range(3): - tp.submit(worker, f'worker-{i}', q) + def worker(): + for item in q: + print(f'Working on {item}') + print(f'Finished {item}') - total_slept_for = time.monotonic() - started_at + # Create 2 worker threads. + with concurrent.futures.ThreadPoolExecutor() as tp: + tp.submit(worker) + tp.submit(worker) - print('====') - print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') - print(f'total expected sleep time: {total_sleep_time:.2f} seconds') + print('All work completed') .. method:: Queue.put(item, block=True, timeout=None) From 5e9f48b76d5d652447761601833b5ad61ed61760 Mon Sep 17 00:00:00 2001 From: Nineteendo Date: Tue, 18 Jun 2024 09:40:32 +0200 Subject: [PATCH 14/26] Put definition first --- Doc/library/queue.rst | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index 667696fb48ab8a..2a1aa44d43c39f 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -145,6 +145,11 @@ Example of how to wait for enqueued tasks to be completed:: import concurrent.futures import queue + def worker(): + for item in q: + print(f'Working on {item}') + print(f'Finished {item}') + q = queue.Queue() # Queue thirty tasks. @@ -154,11 +159,6 @@ Example of how to wait for enqueued tasks to be completed:: # All tasks have been queued q.shutdown() - def worker(): - for item in q: - print(f'Working on {item}') - print(f'Finished {item}') - # Create 2 worker threads. with concurrent.futures.ThreadPoolExecutor() as tp: tp.submit(worker) From 4b382ec300702fe85a60f49ab5e78e98df8bd360 Mon Sep 17 00:00:00 2001 From: Nineteendo Date: Tue, 18 Jun 2024 09:47:40 +0200 Subject: [PATCH 15/26] Print worker name --- Doc/library/queue.rst | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index 2a1aa44d43c39f..3f5b61ac838280 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -144,11 +144,12 @@ Example of how to wait for enqueued tasks to be completed:: import concurrent.futures import queue + import time - def worker(): + def worker(name, q): for item in q: - print(f'Working on {item}') - print(f'Finished {item}') + time.sleep(.01) + print(f'{name} finished {item}') q = queue.Queue() @@ -159,10 +160,10 @@ Example of how to wait for enqueued tasks to be completed:: # All tasks have been queued q.shutdown() - # Create 2 worker threads. + # Create 3 worker threads. with concurrent.futures.ThreadPoolExecutor() as tp: - tp.submit(worker) - tp.submit(worker) + for i in range(3): + tp.submit(worker, f'worker-{i}', q) print('All work completed') From 076a7a617f5c62d558f364453003a95e51c05bf5 Mon Sep 17 00:00:00 2001 From: Nice Zombies Date: Tue, 18 Jun 2024 19:44:14 +0200 Subject: [PATCH 16/26] Restore example --- Doc/library/queue.rst | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index 3f5b61ac838280..39a1ced798c5b9 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -239,6 +239,31 @@ fully processed by daemon consumer threads. count of unfinished tasks drops to zero, :meth:`join` unblocks. +Example of how to wait for enqueued tasks to be completed:: + + import threading + import queue + + q = queue.Queue() + + def worker(): + for item in q: + print(f'Working on {item}') + print(f'Finished {item}') + q.task_done() + + # Turn-on the worker thread. + threading.Thread(target=worker, daemon=True).start() + + # Send thirty task requests to the worker. + for item in range(30): + q.put(item) + + # Block until all tasks are done. + q.join() + print('All work completed') + + Terminating queues ^^^^^^^^^^^^^^^^^^ From 6132f5a854eb3caa771f6fc66aa1d0f9262fd59b Mon Sep 17 00:00:00 2001 From: Nineteendo Date: Tue, 18 Jun 2024 19:56:27 +0200 Subject: [PATCH 17/26] Indent example --- Doc/library/queue.rst | 40 +++++++++++++++++----------------------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index 39a1ced798c5b9..3c35f1a1c19a5c 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -137,35 +137,29 @@ provide the public methods described below. Return an :term:`iterator` which iterates over the items in this queue until :meth:`Queue.shutdown` is called and the queue is empty. - .. versionadded:: 3.14 - - -Example of how to wait for enqueued tasks to be completed:: - - import concurrent.futures - import queue - import time + Example:: - def worker(name, q): - for item in q: - time.sleep(.01) - print(f'{name} finished {item}') + import concurrent.futures + import queue + import time - q = queue.Queue() + def worker(name, q): + for item in q: + time.sleep(.01) + print(f'{name} finished {item}') - # Queue thirty tasks. - for item in range(30): - q.put(item) + q = queue.Queue() + for item in range(30): + q.put(item) - # All tasks have been queued - q.shutdown() + q.shutdown() + with concurrent.futures.ThreadPoolExecutor() as tp: + for i in range(3): + tp.submit(worker, f'worker-{i}', q) - # Create 3 worker threads. - with concurrent.futures.ThreadPoolExecutor() as tp: - for i in range(3): - tp.submit(worker, f'worker-{i}', q) + print('All work completed') - print('All work completed') + .. versionadded:: 3.14 .. method:: Queue.put(item, block=True, timeout=None) From 9a28bc508f5afb28946c40b7581ff3c54afc9102 Mon Sep 17 00:00:00 2001 From: Nineteendo Date: Tue, 18 Jun 2024 20:46:47 +0200 Subject: [PATCH 18/26] Cleanup tests --- Lib/test/test_queue.py | 69 +++++++++++++++++++++--------------------- 1 file changed, 35 insertions(+), 34 deletions(-) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 6ddd0278afcfc0..81ece265c46338 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -1,5 +1,6 @@ # Some simple queue module tests, plus some failure conditions # to ensure the Queue locks remain stable. +import concurrent.futures import itertools import random import sys @@ -151,49 +152,49 @@ def basic_queue_test(self, q): self.do_blocking_test(q.get, (), q.put, ('empty',)) self.do_blocking_test(q.get, (True, 10), q.put, ('empty',)) - - def worker(self, q, task_done): - for x in q: - with self.cumlock: - self.cum += x - if task_done: - q.task_done() - def test_iter(self): q = self.type2test() - self.cum = 0 - threads = [] - for i in (0,1): - thread = threading.Thread(target=self.worker, args=(q, False)) - thread.start() - threads.append(thread) for i in range(100): q.put(i) + q.shutdown() - for thread in threads: - thread.join() + self.cum = 0 + def worker(): + for x in q: + with self.cumlock: + self.cum += x + + with concurrent.futures.ThreadPoolExecutor as tp: + tp.submit(worker) + tp.submit(worker) + self.assertEqual(self.cum, sum(range(100))) def queue_join_test(self, q): self.cum = 0 - threads = [] - for i in (0,1): - thread = threading.Thread(target=self.worker, args=(q, True)) - thread.start() - threads.append(thread) - for i in range(100): - q.put(i) - q.join() - self.assertEqual(self.cum, sum(range(100)), - "q.join() did not block until all tasks were done") - for i in range(100, 200): - q.put(i) - q.shutdown() - q.join() # verify that you can join twice - self.assertEqual(self.cum, sum(range(200)), - "q.join() did not block until all tasks were done") - for thread in threads: - thread.join() + def worker(): + for x in q: + with self.cumlock: + self.cum += x + + q.task_done() + + with concurrent.futures.ThreadPoolExecutor as tp: + tp.submit(worker) + tp.submit(worker) + for i in range(100): + q.put(i) + + q.join() + self.assertEqual(self.cum, sum(range(100)), + "q.join() didn't block until all tasks were done") + for i in range(100, 200): + q.put(i) + + q.join() # verify that you can join twice + self.assertEqual(self.cum, sum(range(200)), + "q.join() didn't block until all tasks were done") + q.shutdown() def test_queue_task_done(self): # Test to make sure a queue task completed successfully. From 0a69fd9752bc02aa11ac6a65b6cd8a22cd486b1c Mon Sep 17 00:00:00 2001 From: Nineteendo Date: Tue, 18 Jun 2024 20:48:41 +0200 Subject: [PATCH 19/26] Instantiate --- Lib/test/test_queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 81ece265c46338..0a0e05f9d18017 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -164,7 +164,7 @@ def worker(): with self.cumlock: self.cum += x - with concurrent.futures.ThreadPoolExecutor as tp: + with concurrent.futures.ThreadPoolExecutor() as tp: tp.submit(worker) tp.submit(worker) @@ -179,7 +179,7 @@ def worker(): q.task_done() - with concurrent.futures.ThreadPoolExecutor as tp: + with concurrent.futures.ThreadPoolExecutor() as tp: tp.submit(worker) tp.submit(worker) for i in range(100): From 4407eedd34f008d08b64377594a24f1d871422bc Mon Sep 17 00:00:00 2001 From: Nineteendo Date: Tue, 18 Jun 2024 21:00:23 +0200 Subject: [PATCH 20/26] Move inside with block --- Lib/test/test_queue.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 0a0e05f9d18017..695cc6070062be 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -176,8 +176,7 @@ def worker(): for x in q: with self.cumlock: self.cum += x - - q.task_done() + q.task_done() with concurrent.futures.ThreadPoolExecutor() as tp: tp.submit(worker) From 1e500abcd7aa2e4c2636d67846c4550eba75fd6a Mon Sep 17 00:00:00 2001 From: Nineteendo Date: Tue, 18 Jun 2024 21:14:49 +0200 Subject: [PATCH 21/26] Add comment --- Lib/test/test_queue.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 695cc6070062be..be39b47735502a 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -193,6 +193,8 @@ def worker(): q.join() # verify that you can join twice self.assertEqual(self.cum, sum(range(200)), "q.join() didn't block until all tasks were done") + + # instruct the threads to close q.shutdown() def test_queue_task_done(self): From 101409f316e6fd93ccfa139336eaa0a0857b6ef8 Mon Sep 17 00:00:00 2001 From: Nineteendo Date: Tue, 18 Jun 2024 21:19:56 +0200 Subject: [PATCH 22/26] Separate function definition --- Lib/test/test_queue.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index be39b47735502a..bbe3c934d0c317 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -159,6 +159,7 @@ def test_iter(self): q.shutdown() self.cum = 0 + def worker(): for x in q: with self.cumlock: @@ -172,6 +173,7 @@ def worker(): def queue_join_test(self, q): self.cum = 0 + def worker(): for x in q: with self.cumlock: From bcdb21ff04df988175b9d4edc53ae22c2a00d4c3 Mon Sep 17 00:00:00 2001 From: Nice Zombies Date: Wed, 19 Jun 2024 12:09:31 +0200 Subject: [PATCH 23/26] Update whatsnew --- Doc/whatsnew/3.14.rst | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index b357553735e8bb..6219dad1ec3c04 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -100,6 +100,12 @@ os by :func:`os.unsetenv`, or made outside Python in the same process. (Contributed by Victor Stinner in :gh:`120057`.) +queue +----- + +Made :class:`queue.Queue` an :term:`iterable`. +(Contributed by Wannes Boeykens in :gh:`120503`.) + symtable -------- From 0743257f2dba19d6b163124361da3ff9f0a1ac52 Mon Sep 17 00:00:00 2001 From: Nice Zombies Date: Wed, 19 Jun 2024 12:10:13 +0200 Subject: [PATCH 24/26] Update blurb --- .../2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Core and Builtins/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst b/Misc/NEWS.d/next/Core and Builtins/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst index 112317b8ac4b91..e7cf8f084e6073 100644 --- a/Misc/NEWS.d/next/Core and Builtins/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst +++ b/Misc/NEWS.d/next/Core and Builtins/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst @@ -1 +1,2 @@ -Make :class:`queue.Queue` an iterable. +Made :class:`queue.Queue` an :term:`iterable`. +(Contributed by Wannes Boeykens in :gh:`120503`.) From 5e37cd5ea45504c870e192e77312bf3b17c446d9 Mon Sep 17 00:00:00 2001 From: Nice Zombies Date: Wed, 19 Jun 2024 12:34:30 +0200 Subject: [PATCH 25/26] Rename 2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst to 2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst --- .../2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename Misc/NEWS.d/next/{Core and Builtins => Library}/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst (100%) diff --git a/Misc/NEWS.d/next/Core and Builtins/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst b/Misc/NEWS.d/next/Library/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst similarity index 100% rename from Misc/NEWS.d/next/Core and Builtins/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst rename to Misc/NEWS.d/next/Library/2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst From b23a1b326791edd0b4310867c12bce008e7b2656 Mon Sep 17 00:00:00 2001 From: Nice Zombies Date: Fri, 21 Jun 2024 09:06:56 +0200 Subject: [PATCH 26/26] Clarify documentation --- Doc/library/queue.rst | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index 3c35f1a1c19a5c..5f17124c6a61cf 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -134,8 +134,9 @@ provide the public methods described below. .. method:: Queue.__iter__() - Return an :term:`iterator` which iterates over the items in this queue - until :meth:`Queue.shutdown` is called and the queue is empty. + Return an :term:`iterator` which iterates over the queue of items until + :meth:`shutdown` is called and continues iteration until the queue is + empty. Example::