mirror of
https://github.com/flibusta-apps/telegram_files_cache_server.git
synced 2025-12-08 09:30:40 +01:00
Init
This commit is contained in:
76
src/app/services/cache_updater.py
Normal file
76
src/app/services/cache_updater.py
Normal file
@@ -0,0 +1,76 @@
|
||||
import asyncio
|
||||
|
||||
from app.services.library_client import get_books, Book
|
||||
from app.services.downloader import download
|
||||
from app.services.files_uploader import upload_file
|
||||
from app.models import CachedFile
|
||||
|
||||
|
||||
PAGE_SIZE = 50
|
||||
|
||||
|
||||
class CacheUpdater:
|
||||
def __init__(self):
|
||||
self.queue = asyncio.Queue(maxsize=10)
|
||||
self.all_books_checked = False
|
||||
|
||||
async def _check_book(self, book: Book):
|
||||
for file_type in book.available_types:
|
||||
cached_file = await CachedFile.objects.get_or_none(
|
||||
object_id=book.id, object_type=file_type
|
||||
)
|
||||
|
||||
if cached_file is None:
|
||||
await self.queue.put((book, file_type))
|
||||
|
||||
async def _start_producer(self):
|
||||
books_page = await get_books(1, 1)
|
||||
|
||||
page_count = books_page.total // PAGE_SIZE
|
||||
if books_page.total % PAGE_SIZE != 0:
|
||||
page_count += 1
|
||||
|
||||
for page_number in range(1, page_count + 1):
|
||||
page = await get_books(page_number, PAGE_SIZE)
|
||||
|
||||
for book in page.items:
|
||||
await self._check_book(book)
|
||||
|
||||
self.all_books_checked = True
|
||||
|
||||
async def _start_worker(self):
|
||||
while not self.all_books_checked or not self.queue.empty():
|
||||
try:
|
||||
task = self.queue.get_nowait()
|
||||
book: Book = task[0]
|
||||
file_type: str = task[1]
|
||||
except asyncio.QueueEmpty:
|
||||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
|
||||
data = await download(book.source.id, book.remote_id, file_type)
|
||||
|
||||
if data is None:
|
||||
return
|
||||
|
||||
content, filename = data
|
||||
|
||||
upload_data = await upload_file(content, filename, 'Test')
|
||||
|
||||
await CachedFile.objects.create(
|
||||
object_id=book.id,
|
||||
object_type=file_type,
|
||||
data=upload_data.data
|
||||
)
|
||||
|
||||
async def _update(self):
|
||||
await asyncio.gather(
|
||||
self._start_producer(),
|
||||
self._start_worker(),
|
||||
self._start_worker()
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def update(cls):
|
||||
updater = cls()
|
||||
return await updater._update()
|
||||
21
src/app/services/downloader.py
Normal file
21
src/app/services/downloader.py
Normal file
@@ -0,0 +1,21 @@
|
||||
from typing import Optional
|
||||
|
||||
import httpx
|
||||
|
||||
from core.config import env_config
|
||||
|
||||
|
||||
async def download(source_id: int, remote_id: int, file_type: str) -> Optional[tuple[bytes, str]]:
|
||||
headers = {"Authorization": env_config.DOWNLOADER_API_KEY}
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(f"{env_config.DOWNLOADER_URL}/download/{source_id}/{remote_id}/{file_type}", headers=headers, timeout=5 * 60)
|
||||
|
||||
if response.status_code != 200:
|
||||
return None
|
||||
|
||||
content_disposition = response.headers['Content-Disposition']
|
||||
|
||||
name = content_disposition.replace('attachment; filename=', '')
|
||||
|
||||
return response.content, name
|
||||
25
src/app/services/files_uploader.py
Normal file
25
src/app/services/files_uploader.py
Normal file
@@ -0,0 +1,25 @@
|
||||
from datetime import datetime
|
||||
|
||||
import httpx
|
||||
from pydantic import BaseModel
|
||||
|
||||
from core.config import env_config
|
||||
|
||||
|
||||
class UploadedFile(BaseModel):
|
||||
id: int
|
||||
backend: str
|
||||
data: dict
|
||||
upload_time: datetime
|
||||
|
||||
|
||||
async def upload_file(content: bytes, filename: str, caption: str) -> UploadedFile:
|
||||
headers = {"Authorization": env_config.FILES_SERVER_API_KEY}
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
form = {'caption': caption}
|
||||
files = {'file': (filename, content)}
|
||||
|
||||
response = await client.post(f"{env_config.FILES_SERVER_URL}/api/v1/files/upload/", data=form, files=files, headers=headers, timeout=5 * 60)
|
||||
|
||||
return UploadedFile.parse_obj(response.json())
|
||||
54
src/app/services/library_client.py
Normal file
54
src/app/services/library_client.py
Normal file
@@ -0,0 +1,54 @@
|
||||
from typing import Generic, TypeVar
|
||||
from pydantic import BaseModel
|
||||
from datetime import date
|
||||
|
||||
from core.config import env_config
|
||||
|
||||
import httpx
|
||||
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
|
||||
class Page(BaseModel, Generic[T]):
|
||||
items: list[T]
|
||||
total: int
|
||||
page: int
|
||||
size: int
|
||||
|
||||
|
||||
class BookAuthor(BaseModel):
|
||||
id: int
|
||||
first_name: str
|
||||
last_name: str
|
||||
middle_name: str
|
||||
|
||||
|
||||
class BookSource(BaseModel):
|
||||
id: int
|
||||
name: str
|
||||
|
||||
|
||||
class Book(BaseModel):
|
||||
id: int
|
||||
title: str
|
||||
file_type: str
|
||||
available_types: list[str]
|
||||
source: BookSource
|
||||
remote_id: int
|
||||
uploaded: date
|
||||
authors: list[BookAuthor]
|
||||
|
||||
|
||||
async def get_books(page: int, page_size: int) -> Page[Book]:
|
||||
headers = {"Authorization": env_config.LIBRARY_API_KEY}
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(f"{env_config.LIBRARY_URL}/api/v1/books/?page={page}&size={page_size}&is_deleted=false", headers=headers)
|
||||
|
||||
data = response.json()
|
||||
|
||||
page_data = Page[Book].parse_obj(data)
|
||||
page_data.items = [Book.parse_obj(item) for item in page_data.items]
|
||||
|
||||
return page_data
|
||||
Reference in New Issue
Block a user