mirror of
https://github.com/flibusta-apps/telegram_files_cache_server.git
synced 2025-12-06 14:45:36 +01:00
Add arq
This commit is contained in:
@@ -1,7 +1,8 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from arq.connections import ArqRedis
|
||||
|
||||
from app.models import CachedFile
|
||||
from app.services.caption_getter import get_caption
|
||||
from app.services.downloader import download
|
||||
@@ -20,79 +21,57 @@ class FileTypeNotAllowed(Exception):
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class CacheUpdater:
|
||||
def __init__(self):
|
||||
self.queue = asyncio.Queue(maxsize=10)
|
||||
self.all_books_checked = False
|
||||
async def check_book(book: Book, arq_pool: ArqRedis) -> None:
|
||||
for file_type in book.available_types:
|
||||
exists = await CachedFile.objects.filter(
|
||||
object_id=book.id, object_type=file_type
|
||||
).exists()
|
||||
|
||||
async def _check_book(self, book: Book):
|
||||
for file_type in book.available_types:
|
||||
exists = await CachedFile.objects.filter(
|
||||
object_id=book.id, object_type=file_type
|
||||
).exists()
|
||||
if not exists:
|
||||
await arq_pool.enqueue_job("cache_file_by_book_id", book.id, file_type)
|
||||
|
||||
if not exists:
|
||||
await self.queue.put((book, file_type))
|
||||
|
||||
async def _start_producer(self):
|
||||
books_page = await get_books(1, PAGE_SIZE)
|
||||
async def check_books_page(ctx, page_number: int) -> None:
|
||||
arq_pool: ArqRedis = ctx["arc_pool"]
|
||||
|
||||
for page_number in range(1, books_page.total_pages + 1):
|
||||
page = await get_books(page_number, PAGE_SIZE)
|
||||
page = await get_books(page_number, PAGE_SIZE)
|
||||
|
||||
for book in page.items:
|
||||
await self._check_book(book)
|
||||
for book in page.items:
|
||||
await check_book(book, arq_pool)
|
||||
|
||||
self.all_books_checked = True
|
||||
|
||||
@classmethod
|
||||
async def _cache_file(cls, book: Book, file_type) -> Optional[CachedFile]:
|
||||
logger.info(f"Cache {book.id} {file_type}...")
|
||||
data = await download(book.source.id, book.remote_id, file_type)
|
||||
async def check_books(ctx) -> None:
|
||||
arq_pool: ArqRedis = ctx["arc_pool"]
|
||||
books_page = await get_books(1, PAGE_SIZE)
|
||||
|
||||
if data is None:
|
||||
return None
|
||||
for page_number in range(1, books_page.total_pages + 1):
|
||||
await arq_pool.enqueue_job("check_books_page", page_number)
|
||||
|
||||
content, filename = data
|
||||
|
||||
caption = get_caption(book)
|
||||
async def cache_file(book: Book, file_type) -> Optional[CachedFile]:
|
||||
logger.info(f"Cache {book.id} {file_type}...")
|
||||
data = await download(book.source.id, book.remote_id, file_type)
|
||||
|
||||
upload_data = await upload_file(content, filename, caption)
|
||||
if data is None:
|
||||
return None
|
||||
|
||||
return await CachedFile.objects.create(
|
||||
object_id=book.id, object_type=file_type, data=upload_data.data
|
||||
)
|
||||
content, filename = data
|
||||
|
||||
async def _start_worker(self):
|
||||
while not self.all_books_checked or not self.queue.empty():
|
||||
try:
|
||||
task = self.queue.get_nowait()
|
||||
book: Book = task[0]
|
||||
file_type: str = task[1]
|
||||
except asyncio.QueueEmpty:
|
||||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
caption = get_caption(book)
|
||||
|
||||
await self._cache_file(book, file_type)
|
||||
upload_data = await upload_file(content, filename, caption)
|
||||
|
||||
async def _update(self):
|
||||
logger.info("Start update...")
|
||||
await asyncio.gather(
|
||||
self._start_producer(),
|
||||
*[self._start_worker() for _ in range(4)],
|
||||
)
|
||||
logger.info("Update complete!")
|
||||
return await CachedFile.objects.create(
|
||||
object_id=book.id, object_type=file_type, data=upload_data.data
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def update(cls):
|
||||
updater = cls()
|
||||
return await updater._update()
|
||||
|
||||
@classmethod
|
||||
async def cache_file(cls, book_id: int, file_type: str) -> Optional[CachedFile]:
|
||||
book = await get_book(book_id)
|
||||
async def cache_file_by_book_id(
|
||||
ctx, book_id: int, file_type: str
|
||||
) -> Optional[CachedFile]:
|
||||
book = await get_book(book_id)
|
||||
|
||||
if file_type not in book.available_types:
|
||||
raise FileTypeNotAllowed(f"{file_type} not in {book.available_types}!")
|
||||
if file_type not in book.available_types:
|
||||
raise FileTypeNotAllowed(f"{file_type} not in {book.available_types}!")
|
||||
|
||||
return await cls._cache_file(book, file_type)
|
||||
return await cache_file(book, file_type)
|
||||
|
||||
@@ -58,7 +58,7 @@ async def get_book(book_id: int) -> BookDetail:
|
||||
|
||||
|
||||
async def get_books(page: int, page_size: int) -> Page[Book]:
|
||||
async with httpx.AsyncClient(timeout=60) as client:
|
||||
async with httpx.AsyncClient(timeout=5 * 60) as client:
|
||||
response = await client.get(
|
||||
(
|
||||
f"{env_config.LIBRARY_URL}/api/v1/books/"
|
||||
|
||||
@@ -1,15 +1,16 @@
|
||||
import base64
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, status
|
||||
from fastapi import APIRouter, Depends, HTTPException, status, Request
|
||||
|
||||
from starlette.responses import Response
|
||||
|
||||
from arq.connections import ArqRedis
|
||||
from asyncpg import exceptions
|
||||
|
||||
from app.depends import check_token
|
||||
from app.models import CachedFile as CachedFileDB
|
||||
from app.serializers import CachedFile, CreateCachedFile
|
||||
from app.services.cache_updater import CacheUpdater
|
||||
from app.services.cache_updater import cache_file_by_book_id
|
||||
from app.services.caption_getter import get_caption
|
||||
from app.services.downloader import get_filename
|
||||
from app.services.files_client import download_file as download_file_from_cache
|
||||
@@ -28,7 +29,7 @@ async def get_cached_file(object_id: int, object_type: str):
|
||||
)
|
||||
|
||||
if not cached_file:
|
||||
cached_file = await CacheUpdater.cache_file(object_id, object_type)
|
||||
cached_file = await cache_file_by_book_id(object_id, object_type)
|
||||
|
||||
if not cached_file:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
|
||||
@@ -43,7 +44,7 @@ async def download_cached_file(object_id: int, object_type: str):
|
||||
)
|
||||
|
||||
if not cached_file:
|
||||
cached_file = await CacheUpdater.cache_file(object_id, object_type)
|
||||
cached_file = await cache_file_by_book_id(object_id, object_type)
|
||||
|
||||
if not cached_file:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
|
||||
@@ -102,7 +103,8 @@ async def create_or_update_cached_file(data: CreateCachedFile):
|
||||
|
||||
|
||||
@router.post("/update_cache")
|
||||
async def update_cache(background_tasks: BackgroundTasks):
|
||||
background_tasks.add_task(CacheUpdater.update)
|
||||
async def update_cache(request: Request):
|
||||
arq_pool: ArqRedis = request.app.state.arq_pool
|
||||
await arq_pool.enqueue_job("check_books")
|
||||
|
||||
return "Ok!"
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from fastapi import FastAPI
|
||||
|
||||
from app.views import router
|
||||
from core.arq_pool import get_arq_pool
|
||||
from core.db import database
|
||||
|
||||
|
||||
@@ -17,6 +18,8 @@ def start_app() -> FastAPI:
|
||||
if not database_.is_connected:
|
||||
await database_.connect()
|
||||
|
||||
app.state.arq_pool = await get_arq_pool()
|
||||
|
||||
@app.on_event("shutdown")
|
||||
async def shutdown() -> None:
|
||||
database_ = app.state.database
|
||||
|
||||
15
src/core/arq_pool.py
Normal file
15
src/core/arq_pool.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from arq.connections import create_pool, RedisSettings, ArqRedis
|
||||
|
||||
from core.config import env_config
|
||||
|
||||
|
||||
def get_redis_settings() -> RedisSettings:
|
||||
return RedisSettings(
|
||||
host=env_config.REDIS_HOST,
|
||||
port=env_config.REDIS_PORT,
|
||||
database=env_config.REDIS_DB,
|
||||
)
|
||||
|
||||
|
||||
async def get_arq_pool() -> ArqRedis:
|
||||
return await create_pool(get_redis_settings())
|
||||
@@ -19,5 +19,9 @@ class EnvConfig(BaseSettings):
|
||||
FILES_SERVER_API_KEY: str
|
||||
FILES_SERVER_URL: str
|
||||
|
||||
REDIS_HOST: str
|
||||
REDIS_PORT: int
|
||||
REDIS_DB: int
|
||||
|
||||
|
||||
env_config = EnvConfig()
|
||||
|
||||
27
src/core/setup_arq.py
Normal file
27
src/core/setup_arq.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from app.services.cache_updater import (
|
||||
check_books,
|
||||
cache_file_by_book_id,
|
||||
check_books_page,
|
||||
)
|
||||
from core.arq_pool import get_redis_settings, get_arq_pool
|
||||
from core.db import database
|
||||
|
||||
|
||||
async def startup(ctx):
|
||||
if not database.is_connected:
|
||||
await database.connect()
|
||||
|
||||
ctx["arc_pool"] = await get_arq_pool()
|
||||
|
||||
|
||||
async def shutdown(ctx):
|
||||
if database.is_connected:
|
||||
await database.disconnect()
|
||||
|
||||
|
||||
class WorkerSettings:
|
||||
functions = [check_books, cache_file_by_book_id, check_books_page]
|
||||
on_startup = startup
|
||||
on_shutdown = shutdown
|
||||
redis_settings = get_redis_settings()
|
||||
max_jobs = 4
|
||||
Reference in New Issue
Block a user