From d7620f07ad4c335ab61e6ec084525b984cff2515 Mon Sep 17 00:00:00 2001 From: Kurbanov Bulat Date: Sat, 8 Jan 2022 20:51:31 +0300 Subject: [PATCH] Make cache update background task --- src/app/services/cache_updater.py | 33 +++++++++++++++++++----------- src/app/services/library_client.py | 3 ++- src/app/views.py | 8 +++++--- 3 files changed, 28 insertions(+), 16 deletions(-) diff --git a/src/app/services/cache_updater.py b/src/app/services/cache_updater.py index 43a911d..201e856 100644 --- a/src/app/services/cache_updater.py +++ b/src/app/services/cache_updater.py @@ -1,4 +1,5 @@ import asyncio +import logging from typing import Optional from app.models import CachedFile @@ -8,7 +9,15 @@ from app.services.files_uploader import upload_file from app.services.library_client import get_books, get_book, Book -PAGE_SIZE = 50 +logger = logging.getLogger("telegram_channel_files_manager") + + +PAGE_SIZE = 100 + + +class FileTypeNotAllowed(Exception): + def __init__(self, message: str) -> None: + super().__init__(message) class CacheUpdater: @@ -18,21 +27,17 @@ class CacheUpdater: async def _check_book(self, book: Book): for file_type in book.available_types: - cached_file = await CachedFile.objects.get_or_none( + exists = await CachedFile.objects.filter( object_id=book.id, object_type=file_type - ) + ).exists() - if cached_file is None: + if not exists: await self.queue.put((book, file_type)) async def _start_producer(self): - books_page = await get_books(1, 1) + books_page = await get_books(1, PAGE_SIZE) - page_count = books_page.total // PAGE_SIZE - if books_page.total % PAGE_SIZE != 0: - page_count += 1 - - for page_number in range(1, page_count + 1): + for page_number in range(1, books_page.total_pages + 1): page = await get_books(page_number, PAGE_SIZE) for book in page.items: @@ -42,6 +47,7 @@ class CacheUpdater: @classmethod async def _cache_file(cls, book: Book, file_type) -> Optional[CachedFile]: + logger.info(f"Cache {book.id} {file_type}...") data = await download(book.source.id, book.remote_id, file_type) if data is None: @@ -70,9 +76,12 @@ class CacheUpdater: await self._cache_file(book, file_type) async def _update(self): + logger.info("Start update...") await asyncio.gather( - self._start_producer(), self._start_worker(), self._start_worker() + self._start_producer(), + *[self._start_worker() for _ in range(2)], ) + logger.info("Update complete!") @classmethod async def update(cls): @@ -84,6 +93,6 @@ class CacheUpdater: book = await get_book(book_id) if file_type not in book.available_types: - return None # ToDO: raise HTTPException + raise FileTypeNotAllowed(f"{file_type} not in {book.available_types}!") return await cls._cache_file(book, file_type) diff --git a/src/app/services/library_client.py b/src/app/services/library_client.py index c2b07c9..d8d2cf6 100644 --- a/src/app/services/library_client.py +++ b/src/app/services/library_client.py @@ -15,6 +15,7 @@ class Page(BaseModel, Generic[T]): total: int page: int size: int + total_pages: int class BookAuthor(BaseModel): @@ -57,7 +58,7 @@ async def get_book(book_id: int) -> BookDetail: async def get_books(page: int, page_size: int) -> Page[Book]: - async with httpx.AsyncClient() as client: + async with httpx.AsyncClient(timeout=60) as client: response = await client.get( ( f"{env_config.LIBRARY_URL}/api/v1/books/" diff --git a/src/app/views.py b/src/app/views.py index 786cfda..bd78ecd 100644 --- a/src/app/views.py +++ b/src/app/views.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, Depends, HTTPException, status +from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, status from asyncpg import exceptions @@ -58,5 +58,7 @@ async def create_or_update_cached_file(data: CreateCachedFile): @router.post("/update_cache") -async def update_cache(): - await CacheUpdater.update() +async def update_cache(background_tasks: BackgroundTasks): + background_tasks.add_task(CacheUpdater.update) + + return "Ok!"