This commit is contained in:
2022-01-29 18:49:45 +03:00
commit 87108fb1bd
12 changed files with 625 additions and 0 deletions

11
src/app/depends.py Normal file
View File

@@ -0,0 +1,11 @@
from fastapi import Security, HTTPException, status
from core.auth import default_security
from core.config import env_config
async def check_token(api_key: str = Security(default_security)):
if api_key != env_config.API_KEY:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Wrong api key!"
)

95
src/app/services.py Normal file
View File

@@ -0,0 +1,95 @@
import asyncio
import concurrent.futures
from arq.connections import ArqRedis
import asyncpg
from meilisearch import Client
from core.config import env_config
def get_meilisearch_client() -> Client:
return Client(url=env_config.MEILI_HOST, api_key=env_config.MEILI_MASTER_KEY)
async def get_postgres_pool() -> asyncpg.Pool:
return await asyncpg.create_pool( # type: ignore
database=env_config.POSTGRES_DB_NAME,
host=env_config.POSTGRES_HOST,
port=env_config.POSTGRES_PORT,
user=env_config.POSTGRES_USER,
password=env_config.POSTGRES_PASSWORD,
)
async def update_books(ctx) -> bool:
loop = asyncio.get_event_loop()
meili = get_meilisearch_client()
index = meili.index("books")
postgres_pool = await get_postgres_pool()
with concurrent.futures.ThreadPoolExecutor() as pool:
count = await postgres_pool.fetchval("SELECT count(*) FROM books;")
for offset in range(0, count, 4096):
rows = await postgres_pool.fetch(f"SELECT id, title, is_deleted FROM books ORDER BY id LIMIT 4096 OFFSET {offset}")
documents = [dict(row) for row in rows]
await loop.run_in_executor(pool, index.add_documents, documents)
return True
async def update_authors(ctx) -> bool:
loop = asyncio.get_event_loop()
meili = get_meilisearch_client()
index = meili.index("authors")
postgres_pool = await get_postgres_pool()
with concurrent.futures.ThreadPoolExecutor() as pool:
count = await postgres_pool.fetchval("SELECT count(*) FROM authors;")
for offset in range(0, count, 4096):
rows = await postgres_pool.fetch(f"SELECT id, first_name, last_name, middle_name FROM authors ORDER BY id LIMIT 4096 OFFSET {offset}")
documents = [dict(row) for row in rows]
await loop.run_in_executor(pool, index.add_documents, documents)
return True
async def update_sequences(ctx) -> bool:
loop = asyncio.get_event_loop()
meili = get_meilisearch_client()
index = meili.index("sequences")
postgres_pool = await get_postgres_pool()
with concurrent.futures.ThreadPoolExecutor() as pool:
count = await postgres_pool.fetchval("SELECT count(*) FROM sequences;")
for offset in range(0, count, 4096):
rows = await postgres_pool.fetch(f"SELECT id, name FROM sequences ORDER BY id LIMIT 4096 OFFSET {offset}")
documents = [dict(row) for row in rows]
await loop.run_in_executor(pool, index.add_documents, documents)
return True
async def update(ctx) -> bool:
arq_pool: ArqRedis = ctx["arc_pool"]
await arq_pool.enqueue_job("update_books")
await arq_pool.enqueue_job("update_authors")
await arq_pool.enqueue_job("update_sequences")
return True

19
src/app/views.py Normal file
View File

@@ -0,0 +1,19 @@
from fastapi import APIRouter, Depends, Request
from arq.connections import ArqRedis
from app.depends import check_token
router = APIRouter(
prefix="/api/v1",
dependencies=[Depends(check_token)]
)
@router.post("/update")
async def update(request: Request):
arq_pool: ArqRedis = request.app.state.arq_pool
await arq_pool.enqueue_job("update")
return "Ok!"

16
src/core/app.py Normal file
View File

@@ -0,0 +1,16 @@
from fastapi import FastAPI
from app.views import router
from core.arq_pool import get_arq_pool
def start_app() -> FastAPI:
app = FastAPI()
app.include_router(router)
@app.on_event("startup")
async def startup() -> None:
app.state.arq_pool = await get_arq_pool()
return app

15
src/core/arq_pool.py Normal file
View File

@@ -0,0 +1,15 @@
from arq.connections import create_pool, RedisSettings, ArqRedis
from core.config import env_config
def get_redis_settings() -> RedisSettings:
return RedisSettings(
host=env_config.REDIS_HOST,
port=env_config.REDIS_PORT,
database=env_config.REDIS_DB,
)
async def get_arq_pool() -> ArqRedis:
return await create_pool(get_redis_settings())

4
src/core/auth.py Normal file
View File

@@ -0,0 +1,4 @@
from fastapi.security import APIKeyHeader
default_security = APIKeyHeader(name="Authorization")

21
src/core/config.py Normal file
View File

@@ -0,0 +1,21 @@
from pydantic import BaseSettings
class EnvConfig(BaseSettings):
API_KEY: str
POSTGRES_DB_NAME: str
POSTGRES_HOST: str
POSTGRES_PORT: int
POSTGRES_USER: str
POSTGRES_PASSWORD: str
REDIS_HOST: str
REDIS_PORT: int
REDIS_DB: int
MEILI_HOST: str
MEILI_MASTER_KEY: str
env_config = EnvConfig()

18
src/core/setup_arq.py Normal file
View File

@@ -0,0 +1,18 @@
from app.services import (
update,
update_books,
update_authors,
update_sequences,
)
from core.arq_pool import get_redis_settings, get_arq_pool
async def startup(ctx):
ctx["arc_pool"] = await get_arq_pool()
class WorkerSettings:
functions = [update, update_books, update_authors, update_sequences]
on_startup = startup
redis_settings = get_redis_settings()
max_jobs = 2

4
src/main.py Normal file
View File

@@ -0,0 +1,4 @@
from core.app import start_app
app = start_app()