Rewrite to rust

This commit is contained in:
2023-01-16 09:37:28 +01:00
parent 5f7970fbe5
commit 84e38d3613
26 changed files with 2918 additions and 1496 deletions

View File

@@ -1,11 +0,0 @@
from fastapi import Security, HTTPException, status
from core.auth import default_security
from core.config import env_config
async def check_token(api_key: str = Security(default_security)):
if api_key != env_config.API_KEY:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Wrong api key!"
)

View File

@@ -1,194 +0,0 @@
import asyncio
import concurrent.futures
from arq.connections import ArqRedis
import asyncpg
from meilisearch import Client
from core.config import env_config
thread_pool = concurrent.futures.ThreadPoolExecutor()
def get_meilisearch_client() -> Client:
return Client(url=env_config.MEILI_HOST, api_key=env_config.MEILI_MASTER_KEY)
async def get_postgres_connection() -> asyncpg.Connection:
return await asyncpg.connect(
database=env_config.POSTGRES_DB_NAME,
host=env_config.POSTGRES_HOST,
port=env_config.POSTGRES_PORT,
user=env_config.POSTGRES_USER,
password=env_config.POSTGRES_PASSWORD,
)
DEFAULT_RANKING_RULES = [
"words",
"typo",
"proximity",
"attribute",
"sort",
"exactness",
]
async def update_books(ctx) -> bool: # NOSONAR
loop = asyncio.get_event_loop()
meili = get_meilisearch_client()
index = meili.index("books")
postgres = await get_postgres_connection()
async with postgres.transaction():
cursor = await postgres.cursor(
"SELECT id, title, lang FROM books WHERE is_deleted = 'f';"
)
while rows := await cursor.fetch(1024):
await loop.run_in_executor(
thread_pool, index.add_documents, [dict(row) for row in rows]
)
index.update_searchable_attributes(["title"])
index.update_filterable_attributes(["lang"])
await postgres.close()
return True
async def update_authors(ctx) -> bool: # NOSONAR
loop = asyncio.get_event_loop()
meili = get_meilisearch_client()
index = meili.index("authors")
postgres = await get_postgres_connection()
async with postgres.transaction():
cursor = await postgres.cursor(
"SELECT id, first_name, last_name, middle_name, "
" array("
" SELECT DISTINCT lang FROM book_authors "
" LEFT JOIN books ON book = books.id "
" WHERE authors.id = book_authors.author "
" AND books.is_deleted = 'f' "
" ) as author_langs, "
" array("
" SELECT DISTINCT lang FROM translations "
" LEFT JOIN books ON book = books.id "
" WHERE authors.id = translations.author "
" AND books.is_deleted = 'f' "
" ) as translator_langs, "
" (SELECT count(books.id) FROM book_authors "
" LEFT JOIN books ON book = books.id "
" WHERE authors.id = book_authors.author "
" AND books.is_deleted = 'f') as books_count "
"FROM authors;"
)
while rows := await cursor.fetch(1024):
await loop.run_in_executor(
thread_pool, index.add_documents, [dict(row) for row in rows]
)
index.update_searchable_attributes(["first_name", "last_name", "middle_name"])
index.update_filterable_attributes(["author_langs", "translator_langs"])
index.update_ranking_rules([*DEFAULT_RANKING_RULES, "books_count:desc"]) # NOSONAR
await postgres.close()
return True
async def update_sequences(ctx) -> bool: # NOSONAR
loop = asyncio.get_event_loop()
meili = get_meilisearch_client()
index = meili.index("sequences")
postgres = await get_postgres_connection()
async with postgres.transaction():
cursor = await postgres.cursor(
"SELECT id, name, "
" array("
" SELECT DISTINCT lang FROM book_sequences "
" LEFT JOIN books ON book = books.id "
" WHERE sequences.id = book_sequences.sequence "
" AND books.is_deleted = 'f' "
" ) as langs, "
" (SELECT count(books.id) FROM book_sequences "
" LEFT JOIN books ON book = books.id "
" WHERE sequences.id = book_sequences.sequence "
" AND books.is_deleted = 'f') as books_count "
"FROM sequences;"
)
while rows := await cursor.fetch(1024):
await loop.run_in_executor(
thread_pool, index.add_documents, [dict(row) for row in rows]
)
index.update_searchable_attributes(["name"])
index.update_filterable_attributes(["langs"])
index.update_ranking_rules([*DEFAULT_RANKING_RULES, "books_count:desc"])
await postgres.close()
return True
async def update_genres(ctx) -> bool: # NOSONAR
loop = asyncio.get_event_loop()
meili = get_meilisearch_client()
index = meili.index("genres")
postgres = await get_postgres_connection()
async with postgres.transaction():
cursor = await postgres.cursor(
"SELECT id, description, meta, "
" array( "
" SELECT DISTINCT lang FROM book_genres "
" LEFT JOIN books ON book = books.id "
" WHERE genres.id = book_genres.genre "
" AND books.is_deleted = 'f' "
" ) as langs, "
" ( "
" SELECT count(*) FROM book_genres "
" LEFT JOIN books ON book = books.id "
" WHERE genres.id = book_genres.genre "
" AND books.is_deleted = 'f' "
" ) as books_count "
"FROM genres;"
)
while rows := await cursor.fetch(1024):
await loop.run_in_executor(
thread_pool, index.add_documents, [dict(row) for row in rows]
)
index.update_searchable_attributes(["description"])
index.update_filterable_attributes(["langs"])
index.update_ranking_rules([*DEFAULT_RANKING_RULES, "books_count:desc"])
await postgres.close()
return True
async def update(ctx: dict, *args, **kwargs) -> bool: # NOSONAR
arq_pool: ArqRedis = ctx["arc_pool"]
await arq_pool.enqueue_job("update_books")
await arq_pool.enqueue_job("update_authors")
await arq_pool.enqueue_job("update_sequences")
await arq_pool.enqueue_job("update_genres")
return True

View File

@@ -1,21 +0,0 @@
from fastapi import APIRouter, Depends, Request
from arq.connections import ArqRedis
from app.depends import check_token
router = APIRouter(prefix="/api/v1", dependencies=[Depends(check_token)])
@router.get("/healthcheck")
async def healthcheck():
return "Ok!"
@router.post("/update")
async def update(request: Request):
arq_pool: ArqRedis = request.app.state.arq_pool
await arq_pool.enqueue_job("update")
return "Ok!"

41
src/config.rs Normal file
View File

@@ -0,0 +1,41 @@
pub struct Config {
pub api_key: String,
pub sentry_sdn: String,
pub postgres_db_name: String,
pub postgres_host: String,
pub postgres_port: u16,
pub postgres_user: String,
pub postgres_password: String,
pub meili_host: String,
pub meili_master_key: String
}
fn get_env(env: &'static str) -> String {
std::env::var(env).unwrap_or_else(|_| panic!("Cannot get the {} env variable", env))
}
impl Config {
pub fn load() -> Config {
Config {
api_key: get_env("API_KEY"),
sentry_sdn: get_env("SENTRY_SDN"),
postgres_db_name: get_env("POSTGRES_DB_NAME"),
postgres_host: get_env("POSTGRES_HOST"),
postgres_port: get_env("POSTGRES_PORT").parse().unwrap(),
postgres_user: get_env("POSTGRES_USER"),
postgres_password: get_env("POSTGRES_PASSWORD"),
meili_host: get_env("MEILI_HOST"),
meili_master_key: get_env("MEILI_MASTER_KEY")
}
}
}
lazy_static! {
pub static ref CONFIG: Config = Config::load();
}

View File

@@ -1,17 +0,0 @@
from fastapi import FastAPI
from app.views import router
from core.arq_pool import get_arq_pool
import core.sentry # noqa: F401
def start_app() -> FastAPI:
app = FastAPI()
app.include_router(router)
@app.on_event("startup")
async def startup() -> None:
app.state.arq_pool = await get_arq_pool()
return app

View File

@@ -1,15 +0,0 @@
from arq.connections import create_pool, RedisSettings, ArqRedis
from core.config import env_config
def get_redis_settings() -> RedisSettings:
return RedisSettings(
host=env_config.REDIS_HOST,
port=env_config.REDIS_PORT,
database=env_config.REDIS_DB,
)
async def get_arq_pool() -> ArqRedis:
return await create_pool(get_redis_settings())

View File

@@ -1,4 +0,0 @@
from fastapi.security import APIKeyHeader
default_security = APIKeyHeader(name="Authorization")

View File

@@ -1,23 +0,0 @@
from pydantic import BaseSettings
class EnvConfig(BaseSettings):
API_KEY: str
POSTGRES_DB_NAME: str
POSTGRES_HOST: str
POSTGRES_PORT: int
POSTGRES_USER: str
POSTGRES_PASSWORD: str
REDIS_HOST: str
REDIS_PORT: int
REDIS_DB: int
MEILI_HOST: str
MEILI_MASTER_KEY: str
SENTRY_SDN: str
env_config = EnvConfig()

View File

@@ -1,8 +0,0 @@
import sentry_sdk
from core.config import env_config
sentry_sdk.init(
env_config.SENTRY_SDN,
)

View File

@@ -1,21 +0,0 @@
from app.services import (
update,
update_books,
update_authors,
update_sequences,
update_genres,
)
from core.arq_pool import get_redis_settings, get_arq_pool
import core.sentry # noqa: F401
async def startup(ctx):
ctx["arc_pool"] = await get_arq_pool()
class WorkerSettings:
functions = [update, update_books, update_authors, update_sequences, update_genres]
on_startup = startup
redis_settings = get_redis_settings()
max_jobs = 1
job_timeout = 15 * 60

View File

@@ -1,4 +0,0 @@
from core.app import start_app
app = start_app()

48
src/main.rs Normal file
View File

@@ -0,0 +1,48 @@
#[macro_use]
extern crate lazy_static;
pub mod config;
pub mod updater;
pub mod models;
use axum::{http::HeaderMap, routing::post, Router};
use std::net::SocketAddr;
async fn update(headers: HeaderMap) -> &'static str {
let config_api_key = config::CONFIG.api_key.clone();
let api_key = match headers.get("Authorization") {
Some(v) => v,
None => return "No api-key!",
};
if config_api_key != api_key.to_str().unwrap() {
return "Wrong api-key!";
}
tokio::spawn(async {
match updater::update().await {
Ok(_) => log::info!("Updated!"),
Err(err) => log::info!("Updater err: {:?}", err),
};
});
"Update started"
}
#[tokio::main]
async fn main() {
let _guard = sentry::init(config::CONFIG.sentry_sdn.clone());
env_logger::init();
let app = Router::new().route("/update", post(update));
let addr = SocketAddr::from(([0, 0, 0, 0], 8080));
log::info!("Start webserver...");
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
log::info!("Webserver shutdown...")
}

261
src/models.rs Normal file
View File

@@ -0,0 +1,261 @@
use serde::{Deserialize, Serialize};
use tokio_postgres::Row;
pub trait UpdateModel {
fn get_index() -> String;
fn get_query() -> String;
fn from_row(row: Row) -> Self;
fn get_searchanble_attributes() -> Vec<String>;
fn get_filterable_attributes() -> Vec<String>;
fn get_ranking_rules() -> Vec<String>;
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Book {
pub id: i32,
pub title: String,
pub lang: String,
}
impl UpdateModel for Book {
fn get_index() -> String {
"books".to_string()
}
fn get_query() -> String {
"SELECT id, title, lang FROM books WHERE is_deleted = 'f';".to_string()
}
fn from_row(row: Row) -> Self {
Self {
id: row.get(0),
title: row.get(1),
lang: row.get(2),
}
}
fn get_searchanble_attributes() -> Vec<String> {
vec!["title".to_string()]
}
fn get_filterable_attributes() -> Vec<String> {
vec!["lang".to_string()]
}
fn get_ranking_rules() -> Vec<String> {
vec![
"words".to_string(),
"typo".to_string(),
"proximity".to_string(),
"attribute".to_string(),
"sort".to_string(),
"exactness".to_string(),
]
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Author {
pub id: i32,
pub first_name: String,
pub last_name: String,
pub middle_name: String,
pub author_langs: Vec<String>,
pub translator_langs: Vec<String>,
pub books_count: i32,
}
impl UpdateModel for Author {
fn get_index() -> String {
"authors".to_string()
}
fn get_query() -> String {
"
SELECT id, first_name, last_name, middle_name,
array(
SELECT DISTINCT lang FROM book_authors
LEFT JOIN books ON book = books.id
WHERE authors.id = book_authors.author
AND books.is_deleted = 'f'
) AS author_langs,
array(
SELECT DISTINCT lang FROM translations
LEFT JOIN books ON book = books.id
WHERE authors.id = translations.author
AND books.is_deleted = 'f'
) AS translator_langs,
(
SELECT count(books.id) FROM book_authors
LEFT JOIN books ON book = books.id
WHERE authors.id = book_authors.author
AND books.is_deleted = 'f'
) AS books_count
FROM authors;
"
.to_string()
}
fn from_row(row: Row) -> Self {
Self {
id: row.get(0),
first_name: row.get(1),
last_name: row.get(2),
middle_name: row.get(3),
author_langs: row.get(4),
translator_langs: row.get(5),
books_count: row.get(6),
}
}
fn get_searchanble_attributes() -> Vec<String> {
vec![
"first_name".to_string(),
"last_name".to_string(),
"middle_name".to_string(),
]
}
fn get_filterable_attributes() -> Vec<String> {
vec!["author_langs".to_string(), "translator_langs".to_string()]
}
fn get_ranking_rules() -> Vec<String> {
vec![
"words".to_string(),
"typo".to_string(),
"proximity".to_string(),
"attribute".to_string(),
"sort".to_string(),
"exactness".to_string(),
"books_count:desc".to_string()
]
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Sequence {
pub id: i32,
pub name: String,
pub langs: Vec<String>,
pub books_count: i32
}
impl UpdateModel for Sequence {
fn get_index() -> String {
"sequences".to_string()
}
fn get_query() -> String {
"
SELECT id, name,
array(
SELECT DISTINCT lang FROM book_sequences
LEFT JOIN books ON book = books.id
WHERE sequences.id = book_sequences.sequence
AND books.is_deleted = 'f'
) as langs,
(SELECT count(books.id) FROM book_sequences
LEFT JOIN books ON book = books.id
WHERE sequences.id = book_sequences.sequence
AND books.is_deleted = 'f') as books_count
FROM sequences;
".to_string()
}
fn from_row(row: Row) -> Self {
Self {
id: row.get(0),
name: row.get(1),
langs: row.get(2),
books_count: row.get(3)
}
}
fn get_searchanble_attributes() -> Vec<String> {
vec!["name".to_string()]
}
fn get_filterable_attributes() -> Vec<String> {
vec!["langs".to_string()]
}
fn get_ranking_rules() -> Vec<String> {
vec![
"words".to_string(),
"typo".to_string(),
"proximity".to_string(),
"attribute".to_string(),
"sort".to_string(),
"exactness".to_string(),
"books_count:desc".to_string()
]
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Genre {
pub id: i32,
pub description: String,
pub meta: String,
pub langs: String,
pub books_count: String,
}
impl UpdateModel for Genre {
fn get_index() -> String {
"genres".to_string()
}
fn get_query() -> String {
"
SELECT id, description, meta,
array(
SELECT DISTINCT lang FROM book_genres
LEFT JOIN books ON book = books.id
WHERE genres.id = book_genres.genre
AND books.is_deleted = 'f'
) as langs,
(
SELECT count(*) FROM book_genres
LEFT JOIN books ON book = books.id
WHERE genres.id = book_genres.genre
AND books.is_deleted = 'f'
) as books_count
FROM genres;
".to_string()
}
fn from_row(row: Row) -> Self {
Self {
id: row.get(0),
description: row.get(1),
meta: row.get(2),
langs: row.get(3),
books_count: row.get(4)
}
}
fn get_searchanble_attributes() -> Vec<String> {
vec!["description".to_string()]
}
fn get_filterable_attributes() -> Vec<String> {
vec!["langs".to_string()]
}
fn get_ranking_rules() -> Vec<String> {
vec![
"words".to_string(),
"typo".to_string(),
"proximity".to_string(),
"attribute".to_string(),
"sort".to_string(),
"exactness".to_string(),
"books_count:desc".to_string()
]
}
}

139
src/updater.rs Normal file
View File

@@ -0,0 +1,139 @@
use deadpool_postgres::{Config, CreatePoolError, ManagerConfig, Pool, RecyclingMethod, Runtime};
use tokio_postgres::NoTls;
use futures::{StreamExt, pin_mut};
use meilisearch_sdk::client::*;
use serde::Serialize;
use crate::{config, models::{Book, UpdateModel, Author, Sequence, Genre}};
async fn get_postgres_pool() -> Result<Pool, CreatePoolError> {
let mut config = Config::new();
config.host = Some(config::CONFIG.postgres_host.clone());
config.port = Some(config::CONFIG.postgres_port);
config.dbname = Some(config::CONFIG.postgres_db_name.clone());
config.user = Some(config::CONFIG.postgres_user.clone());
config.password = Some(config::CONFIG.postgres_password.clone());
config.connect_timeout = Some(std::time::Duration::from_secs(5));
config.manager = Some(ManagerConfig {
recycling_method: RecyclingMethod::Verified,
});
match config.create_pool(Some(Runtime::Tokio1), NoTls) {
Ok(pool) => Ok(pool),
Err(err) => Err(err),
}
}
fn get_meili_client() -> Client {
Client::new(config::CONFIG.meili_host.clone(), config::CONFIG.meili_master_key.clone())
}
async fn update_model<T>(
pool: Pool,
) -> Result<(), Box<dyn std::error::Error + Send>>
where
T: UpdateModel + Serialize
{
let client = pool.get().await.unwrap();
let meili_client = get_meili_client();
let index = meili_client.index(T::get_index());
let params: Vec<String> = vec![];
let stream = match client.query_raw(
&T::get_query(), params
).await {
Ok(stream) => stream,
Err(err) => return Err(Box::new(err)),
};
pin_mut!(stream);
let mut chunks = stream.chunks(1024);
while let Some(chunk) = chunks.next().await {
let items: Vec<T> = chunk
.into_iter()
.filter_map(|result| {
match result {
Ok(v) => Some(T::from_row(v)),
Err(err) => panic!("{}", err),
}
})
.collect();
if let Err(err) = index.add_documents(&items, Some("id")).await {
return Err(Box::new(err));
};
}
if let Err(err) = index.set_searchable_attributes(T::get_searchanble_attributes()).await {
return Err(Box::new(err));
};
if let Err(err) = index.set_filterable_attributes(T::get_filterable_attributes()).await {
return Err(Box::new(err));
};
if let Err(err) = index.set_ranking_rules(T::get_ranking_rules()).await {
return Err(Box::new(err));
};
Ok(())
}
pub async fn update() -> Result<(), Box<dyn std::error::Error>> {
log::info!("Start update...");
let pool = match get_postgres_pool().await {
Ok(pool) => pool,
Err(err) => panic!("{:?}", err),
};
let pool_clone = pool.clone();
let update_books_process = tokio::spawn(async move {
match update_model::<Book>(pool_clone).await {
Ok(_) => (),
Err(err) => panic!("{}", err),
}
});
let pool_clone = pool.clone();
let update_authors_process = tokio::spawn(async move {
match update_model::<Author>(pool_clone).await {
Ok(_) => (),
Err(err) => panic!("{}", err),
}
});
let pool_clone = pool.clone();
let update_sequences_process = tokio::spawn(async move {
match update_model::<Sequence>(pool_clone).await {
Ok(_) => (),
Err(err) => panic!("{}", err),
}
});
let pool_clone = pool.clone();
let update_genres_process = tokio::spawn(async move {
match update_model::<Genre>(pool_clone).await {
Ok(_) => (),
Err(err) => panic!("{}", err),
}
});
for process in [
update_books_process,
update_authors_process,
update_sequences_process,
update_genres_process
] {
match process.await {
Ok(v) => v,
Err(err) => return Err(Box::new(err)),
};
}
Ok(())
}