|
17 | 17 | # along with Pyrogram. If not, see <http://www.gnu.org/licenses/>. |
18 | 18 |
|
19 | 19 | import asyncio |
20 | | -import base64 |
21 | 20 | import inspect |
22 | | -import json |
23 | 21 | import logging |
24 | 22 | import math |
25 | 23 | import mimetypes |
@@ -325,7 +323,12 @@ async def start(self): |
325 | 323 | await self.session.stop() |
326 | 324 | raise e |
327 | 325 |
|
328 | | - self.updates_worker_task = asyncio.ensure_future(self.updates_worker()) |
| 326 | + for _ in range(Client.UPDATES_WORKERS): |
| 327 | + self.updates_worker_tasks.append( |
| 328 | + asyncio.ensure_future(self.updates_worker()) |
| 329 | + ) |
| 330 | + |
| 331 | + log.info("Started {} UpdatesWorkerTasks".format(Client.UPDATES_WORKERS)) |
329 | 332 |
|
330 | 333 | for _ in range(Client.DOWNLOAD_WORKERS): |
331 | 334 | self.download_worker_tasks.append( |
@@ -367,8 +370,15 @@ async def stop(self): |
367 | 370 |
|
368 | 371 | log.info("Stopped {} DownloadWorkerTasks".format(Client.DOWNLOAD_WORKERS)) |
369 | 372 |
|
370 | | - self.updates_queue.put_nowait(None) |
371 | | - await self.updates_worker_task |
| 373 | + for _ in range(Client.UPDATES_WORKERS): |
| 374 | + self.updates_queue.put_nowait(None) |
| 375 | + |
| 376 | + for task in self.updates_worker_tasks: |
| 377 | + await task |
| 378 | + |
| 379 | + self.updates_worker_tasks.clear() |
| 380 | + |
| 381 | + log.info("Stopped {} UpdatesWorkerTasks".format(Client.UPDATES_WORKERS)) |
372 | 382 |
|
373 | 383 | for media_session in self.media_sessions.values(): |
374 | 384 | await media_session.stop() |
@@ -862,8 +872,6 @@ async def download_worker(self): |
862 | 872 | done.set() |
863 | 873 |
|
864 | 874 | async def updates_worker(self): |
865 | | - log.info("UpdatesWorkerTask started") |
866 | | - |
867 | 875 | while True: |
868 | 876 | updates = await self.updates_queue.get() |
869 | 877 |
|
@@ -946,8 +954,6 @@ async def updates_worker(self): |
946 | 954 | except Exception as e: |
947 | 955 | log.error(e, exc_info=True) |
948 | 956 |
|
949 | | - log.info("UpdatesWorkerTask stopped") |
950 | | - |
951 | 957 | async def send(self, |
952 | 958 | data: TLObject, |
953 | 959 | retries: int = Session.MAX_RETRIES, |
|
0 commit comments