mirror of
https://github.com/flibusta-apps/batch_downloader.git
synced 2025-12-06 14:25:36 +01:00
Update deps
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
from fastapi import HTTPException, Request, Security, status
|
||||
|
||||
from redis.asyncio import Redis
|
||||
from taskiq import TaskiqDepends
|
||||
from taskiq import Context, TaskiqDepends
|
||||
|
||||
from core.auth import default_security
|
||||
from core.config import env_config
|
||||
@@ -14,5 +14,9 @@ async def check_token(api_key: str = Security(default_security)):
|
||||
)
|
||||
|
||||
|
||||
def get_redis(request: Request = TaskiqDepends()) -> Redis:
|
||||
def get_redis(request: Request) -> Redis:
|
||||
return request.app.state.redis
|
||||
|
||||
|
||||
def get_redis_taskiq(context: Context = TaskiqDepends()) -> Redis:
|
||||
return context.state.redis
|
||||
|
||||
@@ -13,7 +13,7 @@ from taskiq import TaskiqDepends
|
||||
from taskiq.task import AsyncTaskiqTask
|
||||
from transliterate import translit
|
||||
|
||||
from app.depends import get_redis
|
||||
from app.depends import get_redis_taskiq
|
||||
from app.services.library_client import LibraryClient
|
||||
from app.services.task_manager import ObjectType, TaskManager, TaskStatusEnum
|
||||
from core.config import env_config
|
||||
@@ -81,7 +81,7 @@ async def download_file_to_file(link: str, output: BytesIO) -> bool:
|
||||
|
||||
|
||||
@broker.task()
|
||||
async def download(task_id: uuid.UUID, book_id: int, file_type: str) -> str | None:
|
||||
async def download(task_id: str, book_id: int, file_type: str) -> str | None:
|
||||
try:
|
||||
with tempfile.SpooledTemporaryFile() as temp_file:
|
||||
data = await _download_to_tmpfile(book_id, file_type, temp_file)
|
||||
@@ -127,8 +127,8 @@ async def _check_subtasks(subtasks: list[str]) -> bool:
|
||||
|
||||
|
||||
@broker.task()
|
||||
async def check_subtasks(task_id: uuid.UUID, redis: Redis = TaskiqDepends(get_redis)):
|
||||
task = await TaskManager.get_task(redis, task_id)
|
||||
async def check_subtasks(task_id: str, redis: Redis = TaskiqDepends(get_redis_taskiq)):
|
||||
task = await TaskManager.get_task(redis, uuid.UUID(task_id))
|
||||
|
||||
if task is None:
|
||||
return False
|
||||
@@ -141,8 +141,8 @@ async def check_subtasks(task_id: uuid.UUID, redis: Redis = TaskiqDepends(get_re
|
||||
|
||||
|
||||
@broker.task()
|
||||
async def create_archive(task_id: uuid.UUID, redis: Redis = TaskiqDepends(get_redis)):
|
||||
task = await TaskManager.get_task(redis, task_id)
|
||||
async def create_archive(task_id: str, redis: Redis = TaskiqDepends(get_redis_taskiq)):
|
||||
task = await TaskManager.get_task(redis, uuid.UUID(task_id))
|
||||
assert task
|
||||
|
||||
match task.object_type:
|
||||
|
||||
@@ -2,7 +2,6 @@ from typing import Generic, TypeVar
|
||||
|
||||
import httpx
|
||||
from pydantic import BaseModel
|
||||
from pydantic.generics import GenericModel
|
||||
|
||||
from core.config import env_config
|
||||
|
||||
@@ -25,7 +24,7 @@ class TranslatorBook(BaseModel):
|
||||
Item = TypeVar("Item", bound=BaseModel)
|
||||
|
||||
|
||||
class Page(GenericModel, Generic[Item]):
|
||||
class Page(BaseModel, Generic[Item]):
|
||||
items: list[Item]
|
||||
total: int
|
||||
page: int
|
||||
@@ -64,7 +63,7 @@ class LibraryClient:
|
||||
if response.status_code != 200:
|
||||
return None
|
||||
|
||||
return Page[SequenceBook].parse_raw(response.text)
|
||||
return Page[SequenceBook].model_validate_json(response.text)
|
||||
|
||||
@staticmethod
|
||||
async def get_author_books(
|
||||
@@ -84,7 +83,7 @@ class LibraryClient:
|
||||
if response.status_code != 200:
|
||||
return None
|
||||
|
||||
return Page[AuthorBook].parse_raw(response.text)
|
||||
return Page[AuthorBook].model_validate_json(response.text)
|
||||
|
||||
@staticmethod
|
||||
async def get_translator_books(
|
||||
@@ -104,7 +103,7 @@ class LibraryClient:
|
||||
if response.status_code != 200:
|
||||
return None
|
||||
|
||||
return Page[TranslatorBook].parse_raw(response.text)
|
||||
return Page[TranslatorBook].model_validate_json(response.text)
|
||||
|
||||
@staticmethod
|
||||
async def get_sequence(sequence_id: int) -> Sequence | None:
|
||||
@@ -117,7 +116,7 @@ class LibraryClient:
|
||||
if response.status_code != 200:
|
||||
return None
|
||||
|
||||
return Sequence.parse_raw(response.text)
|
||||
return Sequence.model_validate_json(response.text)
|
||||
|
||||
@staticmethod
|
||||
async def get_author(author_id: int) -> Author | None:
|
||||
@@ -130,4 +129,4 @@ class LibraryClient:
|
||||
if response.status_code != 200:
|
||||
return None
|
||||
|
||||
return Author.parse_raw(response.text)
|
||||
return Author.model_validate_json(response.text)
|
||||
|
||||
@@ -64,7 +64,7 @@ class TaskCreator:
|
||||
if file_format not in book.available_types:
|
||||
continue
|
||||
|
||||
task = await download.kiq(task_id, book.id, file_format)
|
||||
task = await download.kiq(str(task_id), book.id, file_format)
|
||||
task_ids.append(task.task_id)
|
||||
|
||||
if len(task_ids) == 0:
|
||||
|
||||
@@ -37,7 +37,7 @@ class TaskManager:
|
||||
key = cls._get_key(task.id)
|
||||
|
||||
try:
|
||||
data = task.json()
|
||||
data = task.model_dump_json()
|
||||
await redis.set(key, data, ex=60 * 60)
|
||||
|
||||
return True
|
||||
@@ -53,6 +53,6 @@ class TaskManager:
|
||||
if data is None:
|
||||
return None
|
||||
|
||||
return Task.parse_raw(data)
|
||||
return Task.model_validate_json(data)
|
||||
except RedisError:
|
||||
return None
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from pydantic import BaseSettings
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
|
||||
class Config(BaseSettings):
|
||||
@@ -7,7 +7,7 @@ class Config(BaseSettings):
|
||||
REDIS_HOST: str
|
||||
REDIS_PORT: int
|
||||
REDIS_DB: int
|
||||
REDIS_PASSWORD: str | None
|
||||
REDIS_PASSWORD: str | None = None
|
||||
|
||||
MINIO_HOST: str
|
||||
MINIO_BUCKET: str
|
||||
@@ -20,10 +20,10 @@ class Config(BaseSettings):
|
||||
CACHE_API_KEY: str
|
||||
CACHE_URL: str
|
||||
|
||||
SENTRY_DSN: str | None
|
||||
SENTRY_DSN: str | None = None
|
||||
|
||||
|
||||
env_config = Config()
|
||||
env_config = Config() # type: ignore
|
||||
|
||||
REDIS_URL = (
|
||||
f"redis://{env_config.REDIS_HOST}:{env_config.REDIS_PORT}/{env_config.REDIS_DB}"
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import taskiq_fastapi
|
||||
from redis.asyncio import Redis
|
||||
from taskiq import TaskiqEvents, TaskiqState
|
||||
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
|
||||
|
||||
from core.config import REDIS_URL
|
||||
@@ -8,4 +9,12 @@ result_backend = RedisAsyncResultBackend(redis_url=REDIS_URL, result_ex_time=5 *
|
||||
|
||||
broker = ListQueueBroker(url=REDIS_URL).with_result_backend(result_backend)
|
||||
|
||||
taskiq_fastapi.init(broker, "main:app")
|
||||
|
||||
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
|
||||
async def startup(state: TaskiqState) -> None:
|
||||
state.redis = Redis.from_url(REDIS_URL)
|
||||
|
||||
|
||||
@broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)
|
||||
async def shutdown(state: TaskiqState) -> None:
|
||||
await state.redis.close()
|
||||
|
||||
Reference in New Issue
Block a user