mirror of
https://github.com/flibusta-apps/library_updater.git
synced 2025-12-06 15:45:36 +01:00
Add axum server
This commit is contained in:
128
Cargo.lock
generated
128
Cargo.lock
generated
@@ -76,6 +76,53 @@ version = "1.1.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
|
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "axum"
|
||||||
|
version = "0.5.16"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c9e3356844c4d6a6d6467b8da2cffb4a2820be256f50a3a386c9d152bab31043"
|
||||||
|
dependencies = [
|
||||||
|
"async-trait",
|
||||||
|
"axum-core",
|
||||||
|
"bitflags",
|
||||||
|
"bytes",
|
||||||
|
"futures-util",
|
||||||
|
"http",
|
||||||
|
"http-body",
|
||||||
|
"hyper",
|
||||||
|
"itoa",
|
||||||
|
"matchit",
|
||||||
|
"memchr",
|
||||||
|
"mime",
|
||||||
|
"percent-encoding",
|
||||||
|
"pin-project-lite",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"serde_urlencoded",
|
||||||
|
"sync_wrapper",
|
||||||
|
"tokio",
|
||||||
|
"tower",
|
||||||
|
"tower-http",
|
||||||
|
"tower-layer",
|
||||||
|
"tower-service",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "axum-core"
|
||||||
|
version = "0.2.8"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d9f0c0a60006f2a293d82d571f635042a72edf927539b7685bd62d361963839b"
|
||||||
|
dependencies = [
|
||||||
|
"async-trait",
|
||||||
|
"bytes",
|
||||||
|
"futures-util",
|
||||||
|
"http",
|
||||||
|
"http-body",
|
||||||
|
"mime",
|
||||||
|
"tower-layer",
|
||||||
|
"tower-service",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "backtrace"
|
name = "backtrace"
|
||||||
version = "0.3.66"
|
version = "0.3.66"
|
||||||
@@ -542,6 +589,12 @@ dependencies = [
|
|||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "http-range-header"
|
||||||
|
version = "0.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "httparse"
|
name = "httparse"
|
||||||
version = "1.8.0"
|
version = "1.8.0"
|
||||||
@@ -679,6 +732,7 @@ version = "0.1.0"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"async-compression",
|
"async-compression",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
"axum",
|
||||||
"chrono",
|
"chrono",
|
||||||
"deadpool-postgres",
|
"deadpool-postgres",
|
||||||
"env_logger",
|
"env_logger",
|
||||||
@@ -721,6 +775,12 @@ version = "0.1.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
|
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "matchit"
|
||||||
|
version = "0.5.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "md-5"
|
name = "md-5"
|
||||||
version = "0.10.4"
|
version = "0.10.4"
|
||||||
@@ -953,6 +1013,26 @@ dependencies = [
|
|||||||
"siphasher",
|
"siphasher",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pin-project"
|
||||||
|
version = "1.0.12"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc"
|
||||||
|
dependencies = [
|
||||||
|
"pin-project-internal",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pin-project-internal"
|
||||||
|
version = "1.0.12"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pin-project-lite"
|
name = "pin-project-lite"
|
||||||
version = "0.2.9"
|
version = "0.2.9"
|
||||||
@@ -1407,6 +1487,12 @@ dependencies = [
|
|||||||
"unicode-ident",
|
"unicode-ident",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "sync_wrapper"
|
||||||
|
version = "0.1.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tempfile"
|
name = "tempfile"
|
||||||
version = "3.3.0"
|
version = "3.3.0"
|
||||||
@@ -1583,6 +1669,47 @@ dependencies = [
|
|||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tower"
|
||||||
|
version = "0.4.13"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
|
||||||
|
dependencies = [
|
||||||
|
"futures-core",
|
||||||
|
"futures-util",
|
||||||
|
"pin-project",
|
||||||
|
"pin-project-lite",
|
||||||
|
"tokio",
|
||||||
|
"tower-layer",
|
||||||
|
"tower-service",
|
||||||
|
"tracing",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tower-http"
|
||||||
|
version = "0.3.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags",
|
||||||
|
"bytes",
|
||||||
|
"futures-core",
|
||||||
|
"futures-util",
|
||||||
|
"http",
|
||||||
|
"http-body",
|
||||||
|
"http-range-header",
|
||||||
|
"pin-project-lite",
|
||||||
|
"tower",
|
||||||
|
"tower-layer",
|
||||||
|
"tower-service",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tower-layer"
|
||||||
|
version = "0.3.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tower-service"
|
name = "tower-service"
|
||||||
version = "0.3.2"
|
version = "0.3.2"
|
||||||
@@ -1596,6 +1723,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307"
|
checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
|
"log",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"tracing-attributes",
|
"tracing-attributes",
|
||||||
"tracing-core",
|
"tracing-core",
|
||||||
|
|||||||
@@ -23,3 +23,4 @@ env_logger = "0.9.0"
|
|||||||
serde = { version = "1.0.144", features = ["derive"] }
|
serde = { version = "1.0.144", features = ["derive"] }
|
||||||
serde_json = "1.0.85"
|
serde_json = "1.0.85"
|
||||||
tokio-cron-scheduler = "0.8.1"
|
tokio-cron-scheduler = "0.8.1"
|
||||||
|
axum = "0.5.16"
|
||||||
|
|||||||
@@ -17,6 +17,8 @@ pub struct Webhook {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
|
pub api_key: String,
|
||||||
|
|
||||||
pub sentry_dsn: String,
|
pub sentry_dsn: String,
|
||||||
|
|
||||||
pub postgres_db_name: String,
|
pub postgres_db_name: String,
|
||||||
@@ -37,6 +39,8 @@ fn get_env(env: &'static str) -> String {
|
|||||||
impl Config {
|
impl Config {
|
||||||
pub fn load() -> Config {
|
pub fn load() -> Config {
|
||||||
Config {
|
Config {
|
||||||
|
api_key: get_env("API_KEY"),
|
||||||
|
|
||||||
sentry_dsn: get_env("SENTRY_DSN"),
|
sentry_dsn: get_env("SENTRY_DSN"),
|
||||||
|
|
||||||
postgres_db_name: get_env("POSTGRES_DB_NAME"),
|
postgres_db_name: get_env("POSTGRES_DB_NAME"),
|
||||||
|
|||||||
531
src/main.rs
531
src/main.rs
@@ -4,522 +4,55 @@ extern crate lazy_static;
|
|||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod types;
|
pub mod types;
|
||||||
pub mod utils;
|
pub mod utils;
|
||||||
|
pub mod updater;
|
||||||
|
|
||||||
use std::{
|
use std::net::SocketAddr;
|
||||||
fmt::Debug,
|
use axum::{Router, routing::post, http::HeaderMap};
|
||||||
sync::{Arc, Mutex}, str::FromStr
|
|
||||||
|
use crate::updater::cron_jobs;
|
||||||
|
|
||||||
|
async fn update(headers: HeaderMap) -> &'static str {
|
||||||
|
let config_api_key = config::CONFIG.api_key.clone();
|
||||||
|
|
||||||
|
let api_key = match headers.get("Authorization") {
|
||||||
|
Some(v) => v,
|
||||||
|
None => return "No api-key!",
|
||||||
};
|
};
|
||||||
|
|
||||||
use config::Webhook;
|
if config_api_key != api_key.to_str().unwrap() {
|
||||||
use deadpool_postgres::{Config, CreatePoolError, ManagerConfig, Pool, RecyclingMethod, Runtime};
|
return "Wrong api-key!"
|
||||||
use futures::{io::copy, TryStreamExt};
|
|
||||||
use reqwest::header::{HeaderMap, HeaderValue, HeaderName};
|
|
||||||
use tokio::fs::{File, remove_file};
|
|
||||||
use tokio_cron_scheduler::{JobScheduler, Job, JobSchedulerError};
|
|
||||||
use tokio_postgres::NoTls;
|
|
||||||
|
|
||||||
use async_compression::futures::bufread::GzipDecoder;
|
|
||||||
|
|
||||||
use sql_parse::{
|
|
||||||
parse_statement, InsertReplace, InsertReplaceType, ParseOptions, SQLArguments, SQLDialect,
|
|
||||||
Statement,
|
|
||||||
};
|
|
||||||
use tokio_util::compat::TokioAsyncReadCompatExt;
|
|
||||||
use types::{
|
|
||||||
Author, AuthorAnnotation, AuthorAnnotationPic, BookAnnotation, BookAnnotationPic, BookAuthor,
|
|
||||||
BookGenre, FromVecExpression, Genre, Sequence, SequenceInfo, Translator, Update,
|
|
||||||
};
|
|
||||||
use utils::read_lines;
|
|
||||||
|
|
||||||
use crate::types::Book;
|
|
||||||
|
|
||||||
async fn download_file(filename_str: &str) -> Result<(), Box<dyn std::error::Error + Send>> {
|
|
||||||
log::info!("Download {filename_str}...");
|
|
||||||
|
|
||||||
let link = format!("{}/sql/{filename_str}.gz", &config::CONFIG.fl_base_url);
|
|
||||||
|
|
||||||
let response = match reqwest::get(link).await {
|
|
||||||
Ok(v) => v,
|
|
||||||
Err(err) => return Err(Box::new(err)),
|
|
||||||
};
|
|
||||||
|
|
||||||
let response = match response.error_for_status() {
|
|
||||||
Ok(v) => v,
|
|
||||||
Err(err) => return Err(Box::new(err)),
|
|
||||||
};
|
|
||||||
|
|
||||||
match remove_file(filename_str).await {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(err) => log::debug!("Can't remove file: {:?}", err),
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut file = match File::create(filename_str).await {
|
|
||||||
Ok(v) => v.compat(),
|
|
||||||
Err(err) => {
|
|
||||||
log::error!("Can't create {filename_str}: {:?}", err);
|
|
||||||
return Err(Box::new(err))
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
let data = response
|
|
||||||
.bytes_stream()
|
|
||||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
|
|
||||||
.into_async_read();
|
|
||||||
|
|
||||||
let decoder = GzipDecoder::new(data);
|
|
||||||
|
|
||||||
match copy(decoder, &mut file).await {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(err) => {
|
|
||||||
log::error!("Can't write data {filename_str}: {}", err);
|
|
||||||
return Err(Box::new(err))
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
log::info!("{filename_str} downloaded!");
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn process<T>(
|
tokio::spawn(async {
|
||||||
pool: Pool,
|
match updater::update().await {
|
||||||
source_id: i16,
|
Ok(_) => log::info!("Updated!"),
|
||||||
file_name: &str,
|
Err(err) => log::info!("Updater err: {:?}", err),
|
||||||
deps: Vec<Arc<Mutex<Option<UpdateStatus>>>>,
|
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send>>
|
|
||||||
where
|
|
||||||
T: Debug + FromVecExpression<T> + Update,
|
|
||||||
{
|
|
||||||
if deps.len() != 0 {
|
|
||||||
loop {
|
|
||||||
let mut some_failed = false;
|
|
||||||
let mut some_none = false;
|
|
||||||
|
|
||||||
for dep in deps.iter() {
|
|
||||||
let status = dep.lock().unwrap();
|
|
||||||
match &*status {
|
|
||||||
Some(status) => match status {
|
|
||||||
UpdateStatus::Success => (),
|
|
||||||
UpdateStatus::Fail => some_failed = true,
|
|
||||||
},
|
|
||||||
None => some_none = true,
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if !some_failed && !some_none {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
match download_file(file_name).await {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(err) => return Err(err),
|
|
||||||
};
|
|
||||||
|
|
||||||
let parse_options = ParseOptions::new()
|
|
||||||
.dialect(SQLDialect::MariaDB)
|
|
||||||
.arguments(SQLArguments::QuestionMark)
|
|
||||||
.warn_unquoted_identifiers(true);
|
|
||||||
|
|
||||||
let lines = read_lines(file_name);
|
|
||||||
|
|
||||||
let lines = match lines {
|
|
||||||
Ok(v) => v,
|
|
||||||
Err(err) => return Err(Box::new(err)),
|
|
||||||
};
|
|
||||||
|
|
||||||
match T::before_update(&pool.get().await.unwrap()).await {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(err) => return Err(err),
|
|
||||||
};
|
|
||||||
|
|
||||||
log::info!("Start update {file_name}...");
|
|
||||||
|
|
||||||
for line in lines.into_iter() {
|
|
||||||
let line = match line {
|
|
||||||
Ok(line) => line,
|
|
||||||
Err(err) => return Err(Box::new(err)),
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut issues = Vec::new();
|
|
||||||
let ast = parse_statement(&line, &mut issues, &parse_options);
|
|
||||||
|
|
||||||
match ast {
|
|
||||||
Some(Statement::InsertReplace(
|
|
||||||
i @ InsertReplace {
|
|
||||||
type_: InsertReplaceType::Insert(_),
|
|
||||||
..
|
|
||||||
},
|
|
||||||
)) => {
|
|
||||||
for value in i.values.into_iter() {
|
|
||||||
for t_value in value.1.into_iter() {
|
|
||||||
let value = T::from_vec_expression(&t_value);
|
|
||||||
let client = pool.get().await.unwrap();
|
|
||||||
|
|
||||||
match value.update(&client, source_id).await {
|
|
||||||
Ok(_) => {
|
|
||||||
// log::info!("{:?}", value);
|
|
||||||
()
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
log::error!("Update error: {:?} : {:?}", value, err);
|
|
||||||
return Err(err)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => (),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log::info!("Updated {file_name}...");
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_postgres_pool() -> Result<Pool, CreatePoolError> {
|
|
||||||
let mut config = Config::new();
|
|
||||||
|
|
||||||
config.host = Some(config::CONFIG.postgres_host.clone());
|
|
||||||
config.port = Some(config::CONFIG.postgres_port);
|
|
||||||
config.dbname = Some(config::CONFIG.postgres_db_name.clone());
|
|
||||||
config.user = Some(config::CONFIG.postgres_user.clone());
|
|
||||||
config.password = Some(config::CONFIG.postgres_password.clone());
|
|
||||||
config.connect_timeout = Some(std::time::Duration::from_secs(5));
|
|
||||||
config.manager = Some(ManagerConfig {
|
|
||||||
recycling_method: RecyclingMethod::Verified,
|
|
||||||
});
|
});
|
||||||
|
|
||||||
match config.create_pool(Some(Runtime::Tokio1), NoTls) {
|
"Update started"
|
||||||
Ok(pool) => Ok(pool),
|
|
||||||
Err(err) => Err(err),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_source(pool: Pool) -> Result<i16, Box<dyn std::error::Error>> {
|
async fn start_app() {
|
||||||
let client = pool.get().await.unwrap();
|
let app = Router::new()
|
||||||
|
.route("/update", post(update));
|
||||||
|
|
||||||
let row = match client
|
let addr = SocketAddr::from(([0, 0, 0, 0], 8080));
|
||||||
.query_one("SELECT id FROM sources WHERE name = 'flibusta';", &[])
|
|
||||||
|
axum::Server::bind(&addr)
|
||||||
|
.serve(app.into_make_service())
|
||||||
.await
|
.await
|
||||||
{
|
.unwrap();
|
||||||
Ok(v) => v,
|
|
||||||
Err(err) => return Err(Box::new(err)),
|
|
||||||
};
|
|
||||||
|
|
||||||
let id = row.get(0);
|
|
||||||
|
|
||||||
Ok(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
enum UpdateStatus {
|
|
||||||
Success,
|
|
||||||
Fail,
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn send_webhooks() -> Result<(), Box<reqwest::Error>> {
|
|
||||||
for webhook in config::CONFIG.webhooks.clone().into_iter() {
|
|
||||||
let Webhook { method, url, headers } = webhook;
|
|
||||||
|
|
||||||
let client = reqwest::Client::new();
|
|
||||||
|
|
||||||
let builder = match method {
|
|
||||||
config::Method::Get => {
|
|
||||||
client.get(url)
|
|
||||||
},
|
|
||||||
config::Method::Post => {
|
|
||||||
client.post(url)
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
let t_headers: Vec<(HeaderName, HeaderValue)> = headers.into_iter().map(|(key, val)| {
|
|
||||||
let value = match val {
|
|
||||||
serde_json::Value::String(v) => v,
|
|
||||||
_ => panic!("Header value not string!")
|
|
||||||
};
|
|
||||||
|
|
||||||
(
|
|
||||||
HeaderName::from_str(key.as_ref()).unwrap(),
|
|
||||||
HeaderValue::from_str(&value).unwrap()
|
|
||||||
)
|
|
||||||
}).collect();
|
|
||||||
|
|
||||||
let headers = HeaderMap::from_iter(t_headers.into_iter());
|
|
||||||
|
|
||||||
let response = builder.headers(headers).send().await;
|
|
||||||
|
|
||||||
let response = match response {
|
|
||||||
Ok(v) => v,
|
|
||||||
Err(err) => return Err(Box::new(err)),
|
|
||||||
};
|
|
||||||
|
|
||||||
match response.error_for_status() {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(err) => return Err(Box::new(err)),
|
|
||||||
};
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update() -> Result<(), Box<dyn std::error::Error>> {
|
|
||||||
log::info!("Start update...");
|
|
||||||
|
|
||||||
let pool = match get_postgres_pool().await {
|
|
||||||
Ok(pool) => pool,
|
|
||||||
Err(err) => panic!("{:?}", err),
|
|
||||||
};
|
|
||||||
|
|
||||||
let source_id = match get_source(pool.clone()).await {
|
|
||||||
Ok(v) => Arc::new(v),
|
|
||||||
Err(err) => panic!("{:?}", err),
|
|
||||||
};
|
|
||||||
|
|
||||||
let author_status: Arc<Mutex<Option<UpdateStatus>>> = Arc::new(Mutex::new(None));
|
|
||||||
let book_status: Arc<Mutex<Option<UpdateStatus>>> = Arc::new(Mutex::new(None));
|
|
||||||
let sequence_status: Arc<Mutex<Option<UpdateStatus>>> = Arc::new(Mutex::new(None));
|
|
||||||
let book_annotation_status: Arc<Mutex<Option<UpdateStatus>>> = Arc::new(Mutex::new(None));
|
|
||||||
let author_annotation_status: Arc<Mutex<Option<UpdateStatus>>> = Arc::new(Mutex::new(None));
|
|
||||||
let genre_status: Arc<Mutex<Option<UpdateStatus>>> = Arc::new(Mutex::new(None));
|
|
||||||
|
|
||||||
let pool_clone = pool.clone();
|
|
||||||
let author_status_clone = author_status.clone();
|
|
||||||
let source_id_clone = source_id.clone();
|
|
||||||
let author_process = tokio::spawn(async move {
|
|
||||||
match process::<Author>(pool_clone, *source_id_clone, "lib.libavtorname.sql", vec![]).await
|
|
||||||
{
|
|
||||||
Ok(_) => {
|
|
||||||
let mut status = author_status_clone.lock().unwrap();
|
|
||||||
*status = Some(UpdateStatus::Success);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
let mut status = author_status_clone.lock().unwrap();
|
|
||||||
*status = Some(UpdateStatus::Success);
|
|
||||||
Err(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let pool_clone = pool.clone();
|
|
||||||
let book_status_clone = book_status.clone();
|
|
||||||
let source_id_clone = source_id.clone();
|
|
||||||
let book_process = tokio::spawn(async move {
|
|
||||||
match process::<Book>(pool_clone, *source_id_clone, "lib.libbook.sql", vec![]).await {
|
|
||||||
Ok(_) => {
|
|
||||||
let mut status = book_status_clone.lock().unwrap();
|
|
||||||
*status = Some(UpdateStatus::Success);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
let mut status = book_status_clone.lock().unwrap();
|
|
||||||
*status = Some(UpdateStatus::Fail);
|
|
||||||
Err(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let pool_clone = pool.clone();
|
|
||||||
let deps = vec![author_status.clone(), book_status.clone()];
|
|
||||||
let source_id_clone = source_id.clone();
|
|
||||||
let book_author_process = tokio::spawn(async move {
|
|
||||||
process::<BookAuthor>(pool_clone, *source_id_clone, "lib.libavtor.sql", deps).await
|
|
||||||
});
|
|
||||||
|
|
||||||
let pool_clone = pool.clone();
|
|
||||||
let deps = vec![author_status.clone(), book_status.clone()];
|
|
||||||
let source_id_clone = source_id.clone();
|
|
||||||
let translator_process = tokio::spawn(async move {
|
|
||||||
process::<Translator>(pool_clone, *source_id_clone, "lib.libtranslator.sql", deps).await
|
|
||||||
});
|
|
||||||
|
|
||||||
let pool_clone = pool.clone();
|
|
||||||
let sequence_status_clone = sequence_status.clone();
|
|
||||||
let source_id_clone = source_id.clone();
|
|
||||||
let sequence_process = tokio::spawn(async move {
|
|
||||||
match process::<Sequence>(pool_clone, *source_id_clone, "lib.libseqname.sql", vec![]).await
|
|
||||||
{
|
|
||||||
Ok(_) => {
|
|
||||||
let mut status = sequence_status_clone.lock().unwrap();
|
|
||||||
*status = Some(UpdateStatus::Success);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
let mut status = sequence_status_clone.lock().unwrap();
|
|
||||||
*status = Some(UpdateStatus::Fail);
|
|
||||||
Err(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let pool_clone = pool.clone();
|
|
||||||
let deps = vec![book_status.clone(), sequence_status.clone()];
|
|
||||||
let source_id_clone = source_id.clone();
|
|
||||||
let sequence_info_process = tokio::spawn(async move {
|
|
||||||
process::<SequenceInfo>(pool_clone, *source_id_clone, "lib.libseq.sql", deps).await
|
|
||||||
});
|
|
||||||
|
|
||||||
let pool_clone = pool.clone();
|
|
||||||
let deps = vec![book_status.clone()];
|
|
||||||
let book_annotation_status_clone = book_annotation_status.clone();
|
|
||||||
let source_id_clone = source_id.clone();
|
|
||||||
let book_annotation_process = tokio::spawn(async move {
|
|
||||||
match process::<BookAnnotation>(pool_clone, *source_id_clone, "lib.b.annotations.sql", deps)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(_) => {
|
|
||||||
let mut status = book_annotation_status_clone.lock().unwrap();
|
|
||||||
*status = Some(UpdateStatus::Success);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
let mut status = book_annotation_status_clone.lock().unwrap();
|
|
||||||
*status = Some(UpdateStatus::Fail);
|
|
||||||
Err(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let pool_clone = pool.clone();
|
|
||||||
let deps = vec![book_annotation_status.clone()];
|
|
||||||
let source_id_clone = source_id.clone();
|
|
||||||
let book_annotation_pics_process = tokio::spawn(async move {
|
|
||||||
process::<BookAnnotationPic>(
|
|
||||||
pool_clone,
|
|
||||||
*source_id_clone,
|
|
||||||
"lib.b.annotations_pics.sql",
|
|
||||||
deps,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
});
|
|
||||||
|
|
||||||
let pool_clone = pool.clone();
|
|
||||||
let deps = vec![author_status.clone()];
|
|
||||||
let author_annotation_status_clone = author_annotation_status.clone();
|
|
||||||
let source_id_clone = source_id.clone();
|
|
||||||
let author_annotation_process = tokio::spawn(async move {
|
|
||||||
match process::<AuthorAnnotation>(
|
|
||||||
pool_clone,
|
|
||||||
*source_id_clone,
|
|
||||||
"lib.a.annotations.sql",
|
|
||||||
deps,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(_) => {
|
|
||||||
let mut status = author_annotation_status_clone.lock().unwrap();
|
|
||||||
*status = Some(UpdateStatus::Success);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
let mut status = author_annotation_status_clone.lock().unwrap();
|
|
||||||
*status = Some(UpdateStatus::Fail);
|
|
||||||
Err(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let pool_clone = pool.clone();
|
|
||||||
let deps = vec![author_annotation_status.clone()];
|
|
||||||
let source_id_clone = source_id.clone();
|
|
||||||
let author_annotation_pics_process = tokio::spawn(async move {
|
|
||||||
process::<AuthorAnnotationPic>(
|
|
||||||
pool_clone,
|
|
||||||
*source_id_clone,
|
|
||||||
"lib.a.annotations_pics.sql",
|
|
||||||
deps,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
});
|
|
||||||
|
|
||||||
let pool_clone = pool.clone();
|
|
||||||
let genre_status_clone = genre_status.clone();
|
|
||||||
let source_id_clone = source_id.clone();
|
|
||||||
let genre_annotation_process = tokio::spawn(async move {
|
|
||||||
match process::<Genre>(pool_clone, *source_id_clone, "lib.libgenrelist.sql", vec![]).await {
|
|
||||||
Ok(_) => {
|
|
||||||
let mut status = genre_status_clone.lock().unwrap();
|
|
||||||
*status = Some(UpdateStatus::Success);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
let mut status = genre_status_clone.lock().unwrap();
|
|
||||||
*status = Some(UpdateStatus::Fail);
|
|
||||||
Err(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let pool_clone = pool.clone();
|
|
||||||
let deps = vec![genre_status.clone(), book_status.clone()];
|
|
||||||
let source_id_clone = source_id.clone();
|
|
||||||
let book_genre_process = tokio::spawn(async move {
|
|
||||||
process::<BookGenre>(pool_clone, *source_id_clone, "lib.libgenre.sql", deps).await
|
|
||||||
});
|
|
||||||
|
|
||||||
for process in [
|
|
||||||
author_process,
|
|
||||||
book_process,
|
|
||||||
book_author_process,
|
|
||||||
translator_process,
|
|
||||||
sequence_process,
|
|
||||||
sequence_info_process,
|
|
||||||
book_annotation_process,
|
|
||||||
book_annotation_pics_process,
|
|
||||||
author_annotation_process,
|
|
||||||
author_annotation_pics_process,
|
|
||||||
genre_annotation_process,
|
|
||||||
book_genre_process
|
|
||||||
] {
|
|
||||||
let process_result = match process.await {
|
|
||||||
Ok(v) => v,
|
|
||||||
Err(err) => return Err(Box::new(err)),
|
|
||||||
};
|
|
||||||
|
|
||||||
match process_result {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(err) => panic!("{:?}", err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
match send_webhooks().await {
|
|
||||||
Ok(_) => {
|
|
||||||
log::info!("Webhooks sended!");
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
log::info!("Webhooks send failed : {err}");
|
|
||||||
return Err(Box::new(err))
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), JobSchedulerError> {
|
async fn main() {
|
||||||
let _guard = sentry::init(config::CONFIG.sentry_dsn.clone());
|
let _guard = sentry::init(config::CONFIG.sentry_dsn.clone());
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
|
|
||||||
let job_scheduler = JobScheduler::new().await.unwrap();
|
tokio::spawn(async {
|
||||||
|
cron_jobs().await
|
||||||
|
});
|
||||||
|
|
||||||
let update_job = match Job::new_async("* 0 5 * * *", |_uuid, _l| Box::pin(async {
|
start_app().await;
|
||||||
match update().await {
|
|
||||||
Ok(_) => log::info!("Updated"),
|
|
||||||
Err(err) => log::info!("Update err: {:?}", err),
|
|
||||||
};
|
|
||||||
})) {
|
|
||||||
Ok(v) => v,
|
|
||||||
Err(err) => panic!("{:?}", err),
|
|
||||||
};
|
|
||||||
|
|
||||||
job_scheduler.add(update_job).await.unwrap();
|
|
||||||
|
|
||||||
match job_scheduler.start().await {
|
|
||||||
Ok(_) => Ok(()),
|
|
||||||
Err(err) => Err(err),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
514
src/updater.rs
Normal file
514
src/updater.rs
Normal file
@@ -0,0 +1,514 @@
|
|||||||
|
use std::{
|
||||||
|
fmt::Debug,
|
||||||
|
sync::{Arc, Mutex}, str::FromStr
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::config::{Webhook, self};
|
||||||
|
use deadpool_postgres::{Config, CreatePoolError, ManagerConfig, Pool, RecyclingMethod, Runtime};
|
||||||
|
use futures::{io::copy, TryStreamExt};
|
||||||
|
use reqwest::header::{HeaderMap, HeaderValue, HeaderName};
|
||||||
|
use tokio::fs::{File, remove_file};
|
||||||
|
use tokio_cron_scheduler::{JobScheduler, Job, JobSchedulerError};
|
||||||
|
use tokio_postgres::NoTls;
|
||||||
|
|
||||||
|
use async_compression::futures::bufread::GzipDecoder;
|
||||||
|
|
||||||
|
use sql_parse::{
|
||||||
|
parse_statement, InsertReplace, InsertReplaceType, ParseOptions, SQLArguments, SQLDialect,
|
||||||
|
Statement,
|
||||||
|
};
|
||||||
|
use tokio_util::compat::TokioAsyncReadCompatExt;
|
||||||
|
use crate::types::{
|
||||||
|
Author, AuthorAnnotation, AuthorAnnotationPic, BookAnnotation, BookAnnotationPic, BookAuthor,
|
||||||
|
BookGenre, FromVecExpression, Genre, Sequence, SequenceInfo, Translator, Update,
|
||||||
|
};
|
||||||
|
use crate::utils::read_lines;
|
||||||
|
|
||||||
|
use crate::types::Book;
|
||||||
|
|
||||||
|
async fn download_file(filename_str: &str) -> Result<(), Box<dyn std::error::Error + Send>> {
|
||||||
|
log::info!("Download {filename_str}...");
|
||||||
|
|
||||||
|
let link = format!("{}/sql/{filename_str}.gz", &config::CONFIG.fl_base_url);
|
||||||
|
|
||||||
|
let response = match reqwest::get(link).await {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(err) => return Err(Box::new(err)),
|
||||||
|
};
|
||||||
|
|
||||||
|
let response = match response.error_for_status() {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(err) => return Err(Box::new(err)),
|
||||||
|
};
|
||||||
|
|
||||||
|
match remove_file(filename_str).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(err) => log::debug!("Can't remove file: {:?}", err),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut file = match File::create(filename_str).await {
|
||||||
|
Ok(v) => v.compat(),
|
||||||
|
Err(err) => {
|
||||||
|
log::error!("Can't create {filename_str}: {:?}", err);
|
||||||
|
return Err(Box::new(err))
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
let data = response
|
||||||
|
.bytes_stream()
|
||||||
|
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
|
||||||
|
.into_async_read();
|
||||||
|
|
||||||
|
let decoder = GzipDecoder::new(data);
|
||||||
|
|
||||||
|
match copy(decoder, &mut file).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(err) => {
|
||||||
|
log::error!("Can't write data {filename_str}: {}", err);
|
||||||
|
return Err(Box::new(err))
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
log::info!("{filename_str} downloaded!");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process<T>(
|
||||||
|
pool: Pool,
|
||||||
|
source_id: i16,
|
||||||
|
file_name: &str,
|
||||||
|
deps: Vec<Arc<Mutex<Option<UpdateStatus>>>>,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error + Send>>
|
||||||
|
where
|
||||||
|
T: Debug + FromVecExpression<T> + Update,
|
||||||
|
{
|
||||||
|
if deps.len() != 0 {
|
||||||
|
loop {
|
||||||
|
let mut some_failed = false;
|
||||||
|
let mut some_none = false;
|
||||||
|
|
||||||
|
for dep in deps.iter() {
|
||||||
|
let status = dep.lock().unwrap();
|
||||||
|
match &*status {
|
||||||
|
Some(status) => match status {
|
||||||
|
UpdateStatus::Success => (),
|
||||||
|
UpdateStatus::Fail => some_failed = true,
|
||||||
|
},
|
||||||
|
None => some_none = true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !some_failed && !some_none {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match download_file(file_name).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(err) => return Err(err),
|
||||||
|
};
|
||||||
|
|
||||||
|
let parse_options = ParseOptions::new()
|
||||||
|
.dialect(SQLDialect::MariaDB)
|
||||||
|
.arguments(SQLArguments::QuestionMark)
|
||||||
|
.warn_unquoted_identifiers(true);
|
||||||
|
|
||||||
|
let lines = read_lines(file_name);
|
||||||
|
|
||||||
|
let lines = match lines {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(err) => return Err(Box::new(err)),
|
||||||
|
};
|
||||||
|
|
||||||
|
match T::before_update(&pool.get().await.unwrap()).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(err) => return Err(err),
|
||||||
|
};
|
||||||
|
|
||||||
|
log::info!("Start update {file_name}...");
|
||||||
|
|
||||||
|
for line in lines.into_iter() {
|
||||||
|
let line = match line {
|
||||||
|
Ok(line) => line,
|
||||||
|
Err(err) => return Err(Box::new(err)),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut issues = Vec::new();
|
||||||
|
let ast = parse_statement(&line, &mut issues, &parse_options);
|
||||||
|
|
||||||
|
match ast {
|
||||||
|
Some(Statement::InsertReplace(
|
||||||
|
i @ InsertReplace {
|
||||||
|
type_: InsertReplaceType::Insert(_),
|
||||||
|
..
|
||||||
|
},
|
||||||
|
)) => {
|
||||||
|
for value in i.values.into_iter() {
|
||||||
|
for t_value in value.1.into_iter() {
|
||||||
|
let value = T::from_vec_expression(&t_value);
|
||||||
|
let client = pool.get().await.unwrap();
|
||||||
|
|
||||||
|
match value.update(&client, source_id).await {
|
||||||
|
Ok(_) => {
|
||||||
|
// log::info!("{:?}", value);
|
||||||
|
()
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
log::error!("Update error: {:?} : {:?}", value, err);
|
||||||
|
return Err(err)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log::info!("Updated {file_name}...");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_postgres_pool() -> Result<Pool, CreatePoolError> {
|
||||||
|
let mut config = Config::new();
|
||||||
|
|
||||||
|
config.host = Some(config::CONFIG.postgres_host.clone());
|
||||||
|
config.port = Some(config::CONFIG.postgres_port);
|
||||||
|
config.dbname = Some(config::CONFIG.postgres_db_name.clone());
|
||||||
|
config.user = Some(config::CONFIG.postgres_user.clone());
|
||||||
|
config.password = Some(config::CONFIG.postgres_password.clone());
|
||||||
|
config.connect_timeout = Some(std::time::Duration::from_secs(5));
|
||||||
|
config.manager = Some(ManagerConfig {
|
||||||
|
recycling_method: RecyclingMethod::Verified,
|
||||||
|
});
|
||||||
|
|
||||||
|
match config.create_pool(Some(Runtime::Tokio1), NoTls) {
|
||||||
|
Ok(pool) => Ok(pool),
|
||||||
|
Err(err) => Err(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_source(pool: Pool) -> Result<i16, Box<dyn std::error::Error>> {
|
||||||
|
let client = pool.get().await.unwrap();
|
||||||
|
|
||||||
|
let row = match client
|
||||||
|
.query_one("SELECT id FROM sources WHERE name = 'flibusta';", &[])
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(err) => return Err(Box::new(err)),
|
||||||
|
};
|
||||||
|
|
||||||
|
let id = row.get(0);
|
||||||
|
|
||||||
|
Ok(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
enum UpdateStatus {
|
||||||
|
Success,
|
||||||
|
Fail,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_webhooks() -> Result<(), Box<reqwest::Error>> {
|
||||||
|
for webhook in config::CONFIG.webhooks.clone().into_iter() {
|
||||||
|
let Webhook { method, url, headers } = webhook;
|
||||||
|
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
|
||||||
|
let builder = match method {
|
||||||
|
config::Method::Get => {
|
||||||
|
client.get(url)
|
||||||
|
},
|
||||||
|
config::Method::Post => {
|
||||||
|
client.post(url)
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
let t_headers: Vec<(HeaderName, HeaderValue)> = headers.into_iter().map(|(key, val)| {
|
||||||
|
let value = match val {
|
||||||
|
serde_json::Value::String(v) => v,
|
||||||
|
_ => panic!("Header value not string!")
|
||||||
|
};
|
||||||
|
|
||||||
|
(
|
||||||
|
HeaderName::from_str(key.as_ref()).unwrap(),
|
||||||
|
HeaderValue::from_str(&value).unwrap()
|
||||||
|
)
|
||||||
|
}).collect();
|
||||||
|
|
||||||
|
let headers = HeaderMap::from_iter(t_headers.into_iter());
|
||||||
|
|
||||||
|
let response = builder.headers(headers).send().await;
|
||||||
|
|
||||||
|
let response = match response {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(err) => return Err(Box::new(err)),
|
||||||
|
};
|
||||||
|
|
||||||
|
match response.error_for_status() {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(err) => return Err(Box::new(err)),
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
log::info!("Start update...");
|
||||||
|
|
||||||
|
let pool = match get_postgres_pool().await {
|
||||||
|
Ok(pool) => pool,
|
||||||
|
Err(err) => panic!("{:?}", err),
|
||||||
|
};
|
||||||
|
|
||||||
|
let source_id = match get_source(pool.clone()).await {
|
||||||
|
Ok(v) => Arc::new(v),
|
||||||
|
Err(err) => panic!("{:?}", err),
|
||||||
|
};
|
||||||
|
|
||||||
|
let author_status: Arc<Mutex<Option<UpdateStatus>>> = Arc::new(Mutex::new(None));
|
||||||
|
let book_status: Arc<Mutex<Option<UpdateStatus>>> = Arc::new(Mutex::new(None));
|
||||||
|
let sequence_status: Arc<Mutex<Option<UpdateStatus>>> = Arc::new(Mutex::new(None));
|
||||||
|
let book_annotation_status: Arc<Mutex<Option<UpdateStatus>>> = Arc::new(Mutex::new(None));
|
||||||
|
let author_annotation_status: Arc<Mutex<Option<UpdateStatus>>> = Arc::new(Mutex::new(None));
|
||||||
|
let genre_status: Arc<Mutex<Option<UpdateStatus>>> = Arc::new(Mutex::new(None));
|
||||||
|
|
||||||
|
let pool_clone = pool.clone();
|
||||||
|
let author_status_clone = author_status.clone();
|
||||||
|
let source_id_clone = source_id.clone();
|
||||||
|
let author_process = tokio::spawn(async move {
|
||||||
|
match process::<Author>(pool_clone, *source_id_clone, "lib.libavtorname.sql", vec![]).await
|
||||||
|
{
|
||||||
|
Ok(_) => {
|
||||||
|
let mut status = author_status_clone.lock().unwrap();
|
||||||
|
*status = Some(UpdateStatus::Success);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
let mut status = author_status_clone.lock().unwrap();
|
||||||
|
*status = Some(UpdateStatus::Success);
|
||||||
|
Err(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let pool_clone = pool.clone();
|
||||||
|
let book_status_clone = book_status.clone();
|
||||||
|
let source_id_clone = source_id.clone();
|
||||||
|
let book_process = tokio::spawn(async move {
|
||||||
|
match process::<Book>(pool_clone, *source_id_clone, "lib.libbook.sql", vec![]).await {
|
||||||
|
Ok(_) => {
|
||||||
|
let mut status = book_status_clone.lock().unwrap();
|
||||||
|
*status = Some(UpdateStatus::Success);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
let mut status = book_status_clone.lock().unwrap();
|
||||||
|
*status = Some(UpdateStatus::Fail);
|
||||||
|
Err(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let pool_clone = pool.clone();
|
||||||
|
let deps = vec![author_status.clone(), book_status.clone()];
|
||||||
|
let source_id_clone = source_id.clone();
|
||||||
|
let book_author_process = tokio::spawn(async move {
|
||||||
|
process::<BookAuthor>(pool_clone, *source_id_clone, "lib.libavtor.sql", deps).await
|
||||||
|
});
|
||||||
|
|
||||||
|
let pool_clone = pool.clone();
|
||||||
|
let deps = vec![author_status.clone(), book_status.clone()];
|
||||||
|
let source_id_clone = source_id.clone();
|
||||||
|
let translator_process = tokio::spawn(async move {
|
||||||
|
process::<Translator>(pool_clone, *source_id_clone, "lib.libtranslator.sql", deps).await
|
||||||
|
});
|
||||||
|
|
||||||
|
let pool_clone = pool.clone();
|
||||||
|
let sequence_status_clone = sequence_status.clone();
|
||||||
|
let source_id_clone = source_id.clone();
|
||||||
|
let sequence_process = tokio::spawn(async move {
|
||||||
|
match process::<Sequence>(pool_clone, *source_id_clone, "lib.libseqname.sql", vec![]).await
|
||||||
|
{
|
||||||
|
Ok(_) => {
|
||||||
|
let mut status = sequence_status_clone.lock().unwrap();
|
||||||
|
*status = Some(UpdateStatus::Success);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
let mut status = sequence_status_clone.lock().unwrap();
|
||||||
|
*status = Some(UpdateStatus::Fail);
|
||||||
|
Err(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let pool_clone = pool.clone();
|
||||||
|
let deps = vec![book_status.clone(), sequence_status.clone()];
|
||||||
|
let source_id_clone = source_id.clone();
|
||||||
|
let sequence_info_process = tokio::spawn(async move {
|
||||||
|
process::<SequenceInfo>(pool_clone, *source_id_clone, "lib.libseq.sql", deps).await
|
||||||
|
});
|
||||||
|
|
||||||
|
let pool_clone = pool.clone();
|
||||||
|
let deps = vec![book_status.clone()];
|
||||||
|
let book_annotation_status_clone = book_annotation_status.clone();
|
||||||
|
let source_id_clone = source_id.clone();
|
||||||
|
let book_annotation_process = tokio::spawn(async move {
|
||||||
|
match process::<BookAnnotation>(pool_clone, *source_id_clone, "lib.b.annotations.sql", deps)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(_) => {
|
||||||
|
let mut status = book_annotation_status_clone.lock().unwrap();
|
||||||
|
*status = Some(UpdateStatus::Success);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
let mut status = book_annotation_status_clone.lock().unwrap();
|
||||||
|
*status = Some(UpdateStatus::Fail);
|
||||||
|
Err(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let pool_clone = pool.clone();
|
||||||
|
let deps = vec![book_annotation_status.clone()];
|
||||||
|
let source_id_clone = source_id.clone();
|
||||||
|
let book_annotation_pics_process = tokio::spawn(async move {
|
||||||
|
process::<BookAnnotationPic>(
|
||||||
|
pool_clone,
|
||||||
|
*source_id_clone,
|
||||||
|
"lib.b.annotations_pics.sql",
|
||||||
|
deps,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
});
|
||||||
|
|
||||||
|
let pool_clone = pool.clone();
|
||||||
|
let deps = vec![author_status.clone()];
|
||||||
|
let author_annotation_status_clone = author_annotation_status.clone();
|
||||||
|
let source_id_clone = source_id.clone();
|
||||||
|
let author_annotation_process = tokio::spawn(async move {
|
||||||
|
match process::<AuthorAnnotation>(
|
||||||
|
pool_clone,
|
||||||
|
*source_id_clone,
|
||||||
|
"lib.a.annotations.sql",
|
||||||
|
deps,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(_) => {
|
||||||
|
let mut status = author_annotation_status_clone.lock().unwrap();
|
||||||
|
*status = Some(UpdateStatus::Success);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
let mut status = author_annotation_status_clone.lock().unwrap();
|
||||||
|
*status = Some(UpdateStatus::Fail);
|
||||||
|
Err(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let pool_clone = pool.clone();
|
||||||
|
let deps = vec![author_annotation_status.clone()];
|
||||||
|
let source_id_clone = source_id.clone();
|
||||||
|
let author_annotation_pics_process = tokio::spawn(async move {
|
||||||
|
process::<AuthorAnnotationPic>(
|
||||||
|
pool_clone,
|
||||||
|
*source_id_clone,
|
||||||
|
"lib.a.annotations_pics.sql",
|
||||||
|
deps,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
});
|
||||||
|
|
||||||
|
let pool_clone = pool.clone();
|
||||||
|
let genre_status_clone = genre_status.clone();
|
||||||
|
let source_id_clone = source_id.clone();
|
||||||
|
let genre_annotation_process = tokio::spawn(async move {
|
||||||
|
match process::<Genre>(pool_clone, *source_id_clone, "lib.libgenrelist.sql", vec![]).await {
|
||||||
|
Ok(_) => {
|
||||||
|
let mut status = genre_status_clone.lock().unwrap();
|
||||||
|
*status = Some(UpdateStatus::Success);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
let mut status = genre_status_clone.lock().unwrap();
|
||||||
|
*status = Some(UpdateStatus::Fail);
|
||||||
|
Err(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let pool_clone = pool.clone();
|
||||||
|
let deps = vec![genre_status.clone(), book_status.clone()];
|
||||||
|
let source_id_clone = source_id.clone();
|
||||||
|
let book_genre_process = tokio::spawn(async move {
|
||||||
|
process::<BookGenre>(pool_clone, *source_id_clone, "lib.libgenre.sql", deps).await
|
||||||
|
});
|
||||||
|
|
||||||
|
for process in [
|
||||||
|
author_process,
|
||||||
|
book_process,
|
||||||
|
book_author_process,
|
||||||
|
translator_process,
|
||||||
|
sequence_process,
|
||||||
|
sequence_info_process,
|
||||||
|
book_annotation_process,
|
||||||
|
book_annotation_pics_process,
|
||||||
|
author_annotation_process,
|
||||||
|
author_annotation_pics_process,
|
||||||
|
genre_annotation_process,
|
||||||
|
book_genre_process
|
||||||
|
] {
|
||||||
|
let process_result = match process.await {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(err) => return Err(Box::new(err)),
|
||||||
|
};
|
||||||
|
|
||||||
|
match process_result {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(err) => panic!("{:?}", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match send_webhooks().await {
|
||||||
|
Ok(_) => {
|
||||||
|
log::info!("Webhooks sended!");
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
log::info!("Webhooks send failed : {err}");
|
||||||
|
return Err(Box::new(err))
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn cron_jobs() -> Result<(), JobSchedulerError> {
|
||||||
|
let job_scheduler = JobScheduler::new().await.unwrap();
|
||||||
|
|
||||||
|
let update_job = match Job::new_async("* 0 5 * * *", |_uuid, _l| Box::pin(async {
|
||||||
|
match update().await {
|
||||||
|
Ok(_) => log::info!("Updated"),
|
||||||
|
Err(err) => log::info!("Update err: {:?}", err),
|
||||||
|
};
|
||||||
|
})) {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(err) => panic!("{:?}", err),
|
||||||
|
};
|
||||||
|
|
||||||
|
job_scheduler.add(update_job).await.unwrap();
|
||||||
|
|
||||||
|
match job_scheduler.start().await {
|
||||||
|
Ok(_) => Ok(()),
|
||||||
|
Err(err) => Err(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user