From a21788181dc23f4655f0c2db77085cf3c0069fec Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Sun, 24 Apr 2022 21:46:22 +0300 Subject: [PATCH] Fix bugs --- poetry.lock | 29 ++++++++- pyproject.toml | 1 + src/app/services/exceptions.py | 10 +++ src/app/services/fl_downloader.py | 102 +++++++++++------------------- src/app/services/utils.py | 16 ++++- 5 files changed, 89 insertions(+), 69 deletions(-) create mode 100644 src/app/services/exceptions.py diff --git a/poetry.lock b/poetry.lock index 295d7c7..aa6272a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,3 +1,11 @@ +[[package]] +name = "aiofiles" +version = "0.8.0" +description = "File support for asyncio." +category = "main" +optional = false +python-versions = ">=3.6,<4.0" + [[package]] name = "anyio" version = "3.5.0" @@ -26,6 +34,17 @@ python-versions = ">=3.7" [package.extras] tests = ["pytest", "pytest-asyncio", "mypy (>=0.800)"] +[[package]] +name = "asynctempfile" +version = "0.5.0" +description = "Async version of tempfile" +category = "main" +optional = false +python-versions = "*" + +[package.dependencies] +aiofiles = ">=0.6.0" + [[package]] name = "certifi" version = "2021.10.8" @@ -326,9 +345,13 @@ test = ["aiohttp", "flake8 (>=3.9.2,<3.10.0)", "psutil", "pycodestyle (>=2.7.0,< [metadata] lock-version = "1.1" python-versions = "^3.9" -content-hash = "a4d999cff0a76d5061b5232d43b0f2c97db1489605017522b6a5ec6aefd850a8" +content-hash = "2361449346e203acb78f61fb55439c60af4e847c17045a56cb3101c6119b78a8" [metadata.files] +aiofiles = [ + {file = "aiofiles-0.8.0-py3-none-any.whl", hash = "sha256:7a973fc22b29e9962d0897805ace5856e6a566ab1f0c8e5c91ff6c866519c937"}, + {file = "aiofiles-0.8.0.tar.gz", hash = "sha256:8334f23235248a3b2e83b2c3a78a22674f39969b96397126cc93664d9a901e59"}, +] anyio = [ {file = "anyio-3.5.0-py3-none-any.whl", hash = "sha256:b5fa16c5ff93fa1046f2eeb5bbff2dad4d3514d6cda61d02816dba34fa8c3c2e"}, {file = "anyio-3.5.0.tar.gz", hash = "sha256:a0aeffe2fb1fdf374a8e4b471444f0f3ac4fb9f5a5b542b48824475e0042a5a6"}, @@ -337,6 +360,10 @@ asgiref = [ {file = "asgiref-3.5.0-py3-none-any.whl", hash = "sha256:88d59c13d634dcffe0510be048210188edd79aeccb6a6c9028cdad6f31d730a9"}, {file = "asgiref-3.5.0.tar.gz", hash = "sha256:2f8abc20f7248433085eda803936d98992f1343ddb022065779f37c5da0181d0"}, ] +asynctempfile = [ + {file = "asynctempfile-0.5.0-py3-none-any.whl", hash = "sha256:cec59bdb71c850e3de9bb4415f88998165c364709696240eea9ec5204a7439af"}, + {file = "asynctempfile-0.5.0.tar.gz", hash = "sha256:4a647c747357e8827397baadbdfe87f3095d30923fa789e797111eb02160884a"}, +] certifi = [ {file = "certifi-2021.10.8-py2.py3-none-any.whl", hash = "sha256:d62a0163eb4c2344ac042ab2bdf75399a71a2d8c7d47eac2e2ee91b9d6339569"}, {file = "certifi-2021.10.8.tar.gz", hash = "sha256:78884e7c1d4b00ce3cea67b44566851c4343c120abd683433ce934a68ea58872"}, diff --git a/pyproject.toml b/pyproject.toml index 5402b1f..937112b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ prometheus-fastapi-instrumentator = "^5.7.1" uvloop = "^0.16.0" gunicorn = "^20.1.0" sentry-sdk = "^1.5.10" +asynctempfile = "^0.5.0" [tool.poetry.dev-dependencies] diff --git a/src/app/services/exceptions.py b/src/app/services/exceptions.py new file mode 100644 index 0000000..661f90a --- /dev/null +++ b/src/app/services/exceptions.py @@ -0,0 +1,10 @@ +class NotSuccess(Exception): + pass + + +class ReceivedHTML(Exception): + pass + + +class ConvertationError(Exception): + pass diff --git a/src/app/services/fl_downloader.py b/src/app/services/fl_downloader.py index 78da8d7..ab9e9bb 100644 --- a/src/app/services/fl_downloader.py +++ b/src/app/services/fl_downloader.py @@ -1,30 +1,18 @@ import asyncio -import os -import tempfile -from typing import IO, Optional, AsyncIterator, cast - -from fastapi import UploadFile +from typing import Optional, AsyncIterator, cast +import aiofiles +import aiofiles.os +import asynctempfile import httpx from app.services.base import BaseDownloader from app.services.book_library import BookLibraryClient +from app.services.exceptions import NotSuccess, ReceivedHTML, ConvertationError from app.services.utils import zip, unzip, get_filename, process_pool_executor from core.config import env_config, SourceConfig -class NotSuccess(Exception): - pass - - -class ReceivedHTML(Exception): - pass - - -class ConvertationError(Exception): - pass - - class FLDownloader(BaseDownloader): EXCLUDE_UNZIP = ["html"] @@ -120,7 +108,7 @@ class FLDownloader(BaseDownloader): await data[0].aclose() await data[1].aclose() - except (NotSuccess, ReceivedHTML, ConvertationError): + except (NotSuccess, ReceivedHTML, ConvertationError, FileNotFoundError): continue async def _wait_until_some_done( @@ -145,35 +133,28 @@ class FLDownloader(BaseDownloader): ) return data - except (NotSuccess, ReceivedHTML, ConvertationError): + except (NotSuccess, ReceivedHTML, ConvertationError, FileNotFoundError): continue tasks_ = pending return None - async def _write_response_content_to_ntf(self, ntf, response: httpx.Response): - temp_file = UploadFile(await self.get_filename(), ntf) - + async def _write_response_content_to_ntf(self, temp_file, response: httpx.Response): async for chunk in response.aiter_bytes(2048): await temp_file.write(chunk) - temp_file.file.flush() - + await temp_file.flush() await temp_file.seek(0) - return temp_file.file + async def _unzip(self, response: httpx.Response) -> Optional[str]: + async with asynctempfile.NamedTemporaryFile(delete=False) as temp_file: + await self._write_response_content_to_ntf(temp_file, response) - async def _unzip(self, response: httpx.Response): - with tempfile.NamedTemporaryFile() as ntf: - await self._write_response_content_to_ntf(ntf, response) - - internal_tempfile_name = await asyncio.get_event_loop().run_in_executor( - process_pool_executor, unzip, ntf.name, "fb2" + return await asyncio.get_event_loop().run_in_executor( + process_pool_executor, unzip, temp_file.name, "fb2" ) - return internal_tempfile_name - async def _download_with_converting( self, ) -> tuple[httpx.AsyncClient, httpx.Response, bool]: @@ -191,26 +172,25 @@ class FLDownloader(BaseDownloader): client, response, is_zip = data - is_temp_file = False try: if is_zip: - file_to_convert_name = await self._unzip(response) + filename_to_convert = await self._unzip(response) else: - file_to_convert = tempfile.NamedTemporaryFile() - await self._write_response_content_to_ntf(file_to_convert, response) - file_to_convert_name = file_to_convert.name - is_temp_file = True + async with asynctempfile.NamedTemporaryFile(delete=False) as temp_file: + await self._write_response_content_to_ntf(temp_file, response) + filename_to_convert = temp_file.name finally: await response.aclose() await client.aclose() form = {"format": self.file_type} - files = {"file": open(file_to_convert_name, "rb")} + files = {"file": open(filename_to_convert, "rb")} converter_client = httpx.AsyncClient(timeout=2 * 60) converter_request = converter_client.build_request( "POST", env_config.CONVERTER_URL, data=form, files=files ) + try: converter_response = await converter_client.send( converter_request, stream=True @@ -219,19 +199,17 @@ class FLDownloader(BaseDownloader): await converter_client.aclose() raise finally: - if is_temp_file: - await asyncio.get_event_loop().run_in_executor( - process_pool_executor, os.remove, file_to_convert_name - ) - - if response.status_code != 200: - raise ConvertationError + await aiofiles.os.remove(filename_to_convert) try: + if response.status_code != 200: + raise ConvertationError + return converter_client, converter_response, False except asyncio.CancelledError: await converter_response.aclose() await converter_client.aclose() + await aiofiles.os.remove(filename_to_convert) raise async def _get_content(self) -> Optional[tuple[AsyncIterator[bytes], str]]: @@ -252,40 +230,32 @@ class FLDownloader(BaseDownloader): try: if is_zip and self.file_type.lower() not in self.EXCLUDE_UNZIP: - temp_file_name = await self._unzip(response) + temp_filename = await self._unzip(response) else: - - temp_file = tempfile.NamedTemporaryFile() - await self._write_response_content_to_ntf(temp_file, response) - temp_file_name = temp_file.name + async with asynctempfile.NamedTemporaryFile() as temp_file: + temp_filename = temp_file.name + await self._write_response_content_to_ntf(temp_file, response) finally: await response.aclose() await client.aclose() - is_unziped_temp_file = False if self.need_zip: content_filename = await asyncio.get_event_loop().run_in_executor( - process_pool_executor, zip, await self.get_filename(), temp_file_name + process_pool_executor, zip, await self.get_filename(), temp_filename ) - is_unziped_temp_file = True + await aiofiles.os.remove(temp_filename) else: - content_filename = temp_file_name - - content = cast(IO, open(content_filename, "rb")) + content_filename = temp_filename force_zip = is_zip and self.file_type.lower() in self.EXCLUDE_UNZIP async def _content_iterator() -> AsyncIterator[bytes]: - t_file = UploadFile(await self.get_final_filename(force_zip), content) try: - while chunk := await t_file.read(2048): - yield cast(bytes, chunk) + async with aiofiles.open(content_filename) as temp_file: + while chunk := await temp_file.read(2048): + yield cast(bytes, chunk) finally: - await t_file.close() - if is_unziped_temp_file: - await asyncio.get_event_loop().run_in_executor( - process_pool_executor, os.remove, content_filename - ) + await aiofiles.os.remove(content_filename) return _content_iterator(), await self.get_final_filename(force_zip) diff --git a/src/app/services/utils.py b/src/app/services/utils.py index b81fa2f..01f236b 100644 --- a/src/app/services/utils.py +++ b/src/app/services/utils.py @@ -1,6 +1,8 @@ from concurrent.futures.process import ProcessPoolExecutor +import os import re import tempfile +from typing import Optional import zipfile import transliterate @@ -11,7 +13,15 @@ from app.services.book_library import Book, BookAuthor process_pool_executor = ProcessPoolExecutor(2) -def unzip(temp_zipfile, file_type: str): +def remove_temp_file(filename: str) -> bool: + try: + os.remove(filename) + return True + except OSError: + return False + + +def unzip(temp_zipfile: str, file_type: str) -> Optional[str]: result = tempfile.NamedTemporaryFile(delete=False) zip_file = zipfile.ZipFile(temp_zipfile) @@ -24,6 +34,9 @@ def unzip(temp_zipfile, file_type: str): result.seek(0) return result.name + result.close() + remove_temp_file(result.name) + raise FileNotFoundError @@ -50,7 +63,6 @@ def zip( zfile.create_system = 0 zip_file.close() - result.close() return result.name