This commit is contained in:
2022-01-19 21:54:45 +03:00
parent e21cbf17cc
commit abb6ddda42
11 changed files with 201 additions and 58 deletions

View File

@@ -1,11 +1,8 @@
from enum import Enum
from app.services.updaters.base import BaseUpdater
from app.services.updaters.fl_updater import FlUpdater
class UpdaterTypes(Enum):
FL = "fl"
UPDATERS: dict[UpdaterTypes, BaseUpdater] = {UpdaterTypes.FL: FlUpdater}
UPDATERS: dict[UpdaterTypes, str] = {UpdaterTypes.FL: "run_fl_update"}

View File

@@ -1,7 +0,0 @@
from typing import Protocol
class BaseUpdater(Protocol):
@classmethod
async def update(cls) -> bool:
...

View File

@@ -1,14 +1,17 @@
import asyncio
from logging import Logger
from typing import Optional
from aiologger import Logger
import aiomysql
from arq.connections import ArqRedis
import asyncpg
from app.services.updaters.base import BaseUpdater
from core.config import env_config
logger = Logger("fl_updater")
async def run(cmd) -> tuple[bytes, bytes, Optional[int]]:
proc = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
@@ -26,7 +29,7 @@ def remove_dots(s: str):
return s.replace(".", "")
class FlUpdater(BaseUpdater):
class FlUpdater:
SOURCE: int
FILES = [
@@ -52,15 +55,13 @@ class FlUpdater(BaseUpdater):
sequences_updated_event: asyncio.Event
genres_updated_event: asyncio.Event
logger: Logger
async def _import_dump(self, filename: str):
await run(
f"wget -O - {env_config.FL_BASE_URL}/sql/{filename}.gz | gunzip | "
f"mysql -h {env_config.MYSQL_HOST} -u {env_config.MYSQL_USER} "
f'-p"{env_config.MYSQL_PASSWORD}" {env_config.MYSQL_DB_NAME}'
)
await self.logger.info(f"Imported {filename}")
logger.info(f"Imported {filename}")
async def _prepare(self):
posgres_pool = await asyncpg.create_pool(
@@ -85,7 +86,7 @@ class FlUpdater(BaseUpdater):
self.postgres_pool = posgres_pool
async def _set_source(self):
await self.logger.info("Set source...")
logger.info("Set source...")
source_row = await self.postgres_pool.fetchrow(
"SELECT id FROM sources WHERE name = 'flibusta';"
@@ -102,7 +103,7 @@ class FlUpdater(BaseUpdater):
self.SOURCE = source_row["id"]
await self.logger.info("Source has set!")
logger.info("Source has set!")
async def _update_authors(self):
def prepare_author(row: list):
@@ -114,7 +115,7 @@ class FlUpdater(BaseUpdater):
remove_wrong_ch(row[3]),
]
await self.logger.info("Update authors...")
logger.info("Update authors...")
await self.postgres_pool.execute(
"""
@@ -157,7 +158,7 @@ class FlUpdater(BaseUpdater):
self.authors_updated_event.set()
await self.logger.info("Authors updated!")
logger.info("Authors updated!")
async def _update_books(self):
replace_dict = {"ru-": "ru", "ru~": "ru"}
@@ -178,7 +179,7 @@ class FlUpdater(BaseUpdater):
row[5] == "1",
]
await self.logger.info("Update books...")
logger.info("Update books...")
await self.postgres_pool.execute(
"""
@@ -223,13 +224,13 @@ class FlUpdater(BaseUpdater):
self.books_updated_event.set()
await self.logger.info("Books updated!")
logger.info("Books updated!")
async def _update_books_authors(self):
await self.books_updated_event.wait()
await self.authors_updated_event.wait()
await self.logger.info("Update books_authors...")
logger.info("Update books_authors...")
await self.postgres_pool.execute(
"""
@@ -271,13 +272,13 @@ class FlUpdater(BaseUpdater):
[(self.SOURCE, *row) for row in rows],
)
await self.logger.info("Books_authors updated!")
logger.info("Books_authors updated!")
async def _update_translations(self):
await self.books_updated_event.wait()
await self.authors_updated_event.wait()
await self.logger.info("Update translations...")
logger.info("Update translations...")
await self.postgres_pool.execute(
"""
@@ -324,7 +325,7 @@ class FlUpdater(BaseUpdater):
[(self.SOURCE, *row) for row in rows],
)
await self.logger.info("Translations updated!")
logger.info("Translations updated!")
async def _update_sequences(self):
def prepare_sequence(row: list):
@@ -334,7 +335,7 @@ class FlUpdater(BaseUpdater):
remove_wrong_ch(row[1]),
]
await self.logger.info("Update sequences...")
logger.info("Update sequences...")
await self.postgres_pool.execute(
"""
@@ -373,13 +374,13 @@ class FlUpdater(BaseUpdater):
self.sequences_updated_event.set()
await self.logger.info("Sequences updated!")
logger.info("Sequences updated!")
async def _update_sequences_info(self):
await self.sequences_updated_event.wait()
await self.books_updated_event.wait()
await self.logger.info("Update book_sequences...")
logger.info("Update book_sequences...")
await self.postgres_pool.execute(
"""
@@ -429,12 +430,12 @@ class FlUpdater(BaseUpdater):
[[self.SOURCE, *row] for row in rows],
)
await self.logger.info("Book_sequences updated!")
logger.info("Book_sequences updated!")
async def _update_book_annotations(self):
await self.books_updated_event.wait()
await self.logger.info("Update book_annotations...")
logger.info("Update book_annotations...")
await self.postgres_pool.execute(
"""
@@ -478,12 +479,12 @@ class FlUpdater(BaseUpdater):
[[self.SOURCE, *row] for row in rows],
)
await self.logger.info("Book_annotations updated!")
logger.info("Book_annotations updated!")
await self._update_book_annotations_pic()
async def _update_book_annotations_pic(self):
await self.logger.info("Update book_annotations_pic...")
logger.info("Update book_annotations_pic...")
async with self.mysql_pool.acquire() as conn:
async with conn.cursor() as cursor:
@@ -508,12 +509,12 @@ class FlUpdater(BaseUpdater):
[[self.SOURCE, *row] for row in rows],
)
await self.logger.info("Book_annotation_pics updated!")
logger.info("Book_annotation_pics updated!")
async def _update_author_annotations(self):
await self.authors_updated_event.wait()
await self.logger.info("Update author_annotations...")
logger.info("Update author_annotations...")
await self.postgres_pool.execute(
"""
@@ -554,12 +555,12 @@ class FlUpdater(BaseUpdater):
[[self.SOURCE, *row] for row in rows],
)
await self.logger.info("Author_annotation_updated!")
logger.info("Author_annotation_updated!")
await self._update_author_annotations_pics()
async def _update_author_annotations_pics(self):
await self.logger.info("Update author_annotations_pic...")
logger.info("Update author_annotations_pic...")
async with self.mysql_pool.acquire() as conn:
async with conn.cursor() as cursor:
@@ -584,10 +585,10 @@ class FlUpdater(BaseUpdater):
[[self.SOURCE, *row] for row in rows],
)
await self.logger.info("Author_annotatioins_pic updated!")
logger.info("Author_annotatioins_pic updated!")
async def _update_genres(self):
await self.logger.info("Update genres...")
logger.info("Update genres...")
await self.postgres_pool.execute(
"""
@@ -627,13 +628,13 @@ class FlUpdater(BaseUpdater):
[[self.SOURCE, *row] for row in rows],
)
await self.logger.info("Genres updated!")
logger.info("Genres updated!")
async def _update_books_genres(self):
await self.books_updated_event.wait()
await self.genres_updated_event.wait()
await self.logger.info("Update book_genres...")
logger.info("Update book_genres...")
await self.postgres_pool.execute(
"""
@@ -675,15 +676,19 @@ class FlUpdater(BaseUpdater):
[(self.SOURCE, *row) for row in rows],
)
await self.logger.info("Book_genres updated!")
async def _update(self) -> bool:
self.logger = Logger.with_default_handlers()
await self._prepare()
logger.info("Book_genres updated!")
async def _import(self, ctx) -> bool:
await asyncio.gather(*[self._import_dump(filename) for filename in self.FILES])
arq_pool: ArqRedis = ctx["arq_pool"]
await arq_pool.enqueue_job("run_fl_update2")
return True
async def _update(self, ctx) -> bool:
await self._prepare()
await self._set_source()
self.authors_updated_event = asyncio.Event()
@@ -706,7 +711,10 @@ class FlUpdater(BaseUpdater):
return True
@classmethod
async def update(cls) -> bool:
updater = cls()
return await updater._update()
async def run_fl_update(ctx) -> bool:
return await FlUpdater()._import(ctx)
async def run_fl_update2(ctx) -> bool:
return await FlUpdater()._update(ctx)

View File

@@ -1,4 +1,6 @@
from fastapi import APIRouter, BackgroundTasks, Depends
from fastapi import APIRouter, Depends, Request
from arq.connections import ArqRedis
from app.depends import check_token
from app.services.updaters import UpdaterTypes, UPDATERS
@@ -8,9 +10,8 @@ router = APIRouter(tags=["updater"], dependencies=[Depends(check_token)])
@router.post("/update/{updater}")
async def update(updater: UpdaterTypes, background_tasks: BackgroundTasks):
updater_ = UPDATERS[updater]
background_tasks.add_task(updater_.update)
async def update(request: Request, updater: UpdaterTypes):
arq_pool: ArqRedis = request.app.state.arq_pool
await arq_pool.enqueue_job(UPDATERS[updater])
return "Ok!"

View File

@@ -1,6 +1,7 @@
from fastapi import FastAPI
from app.views import router
from core.arq_pool import get_arq_pool
def start_app() -> FastAPI:
@@ -8,4 +9,8 @@ def start_app() -> FastAPI:
app.include_router(router)
@app.on_event("startup")
async def startup() -> None:
app.state.arq_pool = await get_arq_pool()
return app

15
src/core/arq_pool.py Normal file
View File

@@ -0,0 +1,15 @@
from arq.connections import create_pool, RedisSettings, ArqRedis
from core.config import env_config
def get_redis_settings() -> RedisSettings:
return RedisSettings(
host=env_config.REDIS_HOST,
port=env_config.REDIS_PORT,
database=env_config.REDIS_DB,
)
async def get_arq_pool() -> ArqRedis:
return await create_pool(get_redis_settings())

View File

@@ -24,6 +24,10 @@ class EnvConfig(BaseSettings):
MYSQL_USER: str
MYSQL_PASSWORD: str
REDIS_HOST: str
REDIS_PORT: int
REDIS_DB: int
FL_BASE_URL: str

14
src/core/setup_arq.py Normal file
View File

@@ -0,0 +1,14 @@
from app.services.updaters.fl_updater import run_fl_update, run_fl_update2
from core.arq_pool import get_redis_settings, get_arq_pool
async def startup(ctx):
ctx["arq_pool"] = await get_arq_pool()
class WorkerSettings:
functions = [run_fl_update, run_fl_update2]
on_startup = startup
redis_settings = get_redis_settings()
max_jobs = 1
job_timeout = 60 * 60