From dea21b44c6e61f92267c7a1e431fc7e94dad1a52 Mon Sep 17 00:00:00 2001 From: Kurbanov Bulat Date: Sat, 20 Nov 2021 19:48:20 +0300 Subject: [PATCH] Fix tasks cancelation --- src/app/services/fl_downloader.py | 65 +++++++++++++++---------------- 1 file changed, 31 insertions(+), 34 deletions(-) diff --git a/src/app/services/fl_downloader.py b/src/app/services/fl_downloader.py index 6ffa8b2..e0008b1 100644 --- a/src/app/services/fl_downloader.py +++ b/src/app/services/fl_downloader.py @@ -1,5 +1,5 @@ -from typing import Optional - +from asyncio.exceptions import CancelledError +from typing import Optional, cast import asyncio import httpx @@ -82,6 +82,27 @@ class FLDownloader(BaseDownloader): return response.content, False + async def _wait_until_some_done(self, tasks: set[asyncio.Task]) -> Optional[tuple[bytes, bool]]: + tasks_ = tasks + + while tasks_: + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + + for task in done: + try: + data = cast(tuple[bytes, bool], task.result()) + + for p_task in pending: + p_task.cancel() + + return data + except (NotSuccess, ReceivedHTML): + continue + + tasks_ = pending + + return None + async def _download_with_converting(self) -> tuple[bytes, bool]: tasks = set() @@ -92,24 +113,13 @@ class FLDownloader(BaseDownloader): ) ) - content: Optional[bytes] = None - is_zip: Optional[bool] = None + data = await self._wait_until_some_done(tasks) - while tasks: - done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - - for task in done: - try: - content, is_zip = task.result() - break - except (NotSuccess, ReceivedHTML): - continue - - tasks = pending - - if content is None or is_zip is None: + if data is None: raise ValueError + content, is_zip = data + if is_zip: content = await asyncio.get_event_loop().run_in_executor( process_pool_executor, unzip, content, 'fb2' @@ -147,25 +157,12 @@ class FLDownloader(BaseDownloader): ) ) - content: Optional[bytes] = None - is_zip: Optional[bool] = None + data = await self._wait_until_some_done(tasks) - while tasks: - done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - - for task in done: - try: - content, is_zip = task.result() - - for p_task in pending: - p_task.cancel() - - break - except (NotSuccess, ReceivedHTML, ValueError): - continue - - tasks = pending + if data is None: + raise ValueError + content, is_zip = data if content is None or is_zip is None: raise ValueError