This commit is contained in:
2024-05-05 20:51:25 +02:00
parent 39f9a9961b
commit bb6c5e9aa1
11 changed files with 77 additions and 390 deletions

View File

@@ -7,14 +7,6 @@ fn get_env(env: &'static str) -> String {
pub struct Config {
pub api_key: String,
pub minio_host: String,
pub internal_minio_host: String,
pub minio_bucket: String,
pub minio_access_key: String,
pub minio_secret_key: String,
pub minio_share_books_bucket: String,
pub library_api_key: String,
pub library_url: String,
@@ -29,14 +21,6 @@ impl Config {
Config {
api_key: get_env("API_KEY"),
minio_host: get_env("MINIO_HOST"),
internal_minio_host: get_env("INTERNAL_MINIO_HOST"),
minio_bucket: get_env("MINIO_BUCKET"),
minio_access_key: get_env("MINIO_ACCESS_KEY"),
minio_secret_key: get_env("MINIO_SECRET_KEY"),
minio_share_books_bucket: get_env("MINIO_SHARE_BOOKS_BUCKET"),
library_api_key: get_env("LIBRARY_API_KEY"),
library_url: get_env("LIBRARY_URL"),

View File

@@ -5,10 +5,9 @@ pub mod views;
use sentry::{integrations::debug_images::DebugImagesIntegration, types::Dsn, ClientOptions};
use std::{net::SocketAddr, str::FromStr};
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::info;
use crate::{services::files_cleaner::clean_files, views::get_router};
use crate::views::get_router;
async fn start_app() {
tracing_subscriber::fmt()
@@ -26,35 +25,6 @@ async fn start_app() {
info!("Webserver shutdown...");
}
async fn start_job_scheduler() {
let job_scheduler = JobScheduler::new().await.unwrap();
let clean_files_job = match Job::new_async("0 */5 * * * *", |_uuid, _l| {
Box::pin(async {
match clean_files(config::CONFIG.minio_bucket.clone()).await {
Ok(_) => info!("Archive files cleaned!"),
Err(err) => info!("Clean archive files err: {:?}", err),
};
match clean_files(config::CONFIG.minio_share_books_bucket.clone()).await {
Ok(_) => info!("Share files cleaned!"),
Err(err) => info!("Clean share files err: {:?}", err),
};
})
}) {
Ok(v) => v,
Err(err) => panic!("{:?}", err),
};
job_scheduler.add(clean_files_job).await.unwrap();
info!("Scheduler start...");
match job_scheduler.start().await {
Ok(v) => v,
Err(err) => panic!("{:?}", err),
};
}
#[tokio::main]
async fn main() {
let options = ClientOptions {
@@ -66,5 +36,5 @@ async fn main() {
let _guard = sentry::init(options);
tokio::join![start_app(), start_job_scheduler()];
start_app().await
}

View File

@@ -1,27 +0,0 @@
use chrono::{DateTime, Duration, Utc};
use minio_rsc::{client::ListObjectsArgs, datatype::Object};
use super::minio::get_internal_minio;
pub async fn clean_files(bucket: String) -> Result<(), Box<dyn std::error::Error>> {
let minio_client = get_internal_minio();
let objects = minio_client
.list_objects(&bucket, ListObjectsArgs::default())
.await?;
let delete_before = Utc::now() - Duration::hours(3);
for Object {
key, last_modified, ..
} in objects.contents
{
let last_modified_date: DateTime<Utc> =
DateTime::parse_from_rfc3339(&last_modified)?.into();
if last_modified_date <= delete_before {
let _ = minio_client.remove_object(&bucket, key).await;
}
}
Ok(())
}

View File

@@ -1,33 +0,0 @@
use minio_rsc::{provider::StaticProvider, Minio};
use crate::config;
pub fn get_minio() -> Minio {
let provider = StaticProvider::new(
&config::CONFIG.minio_access_key,
&config::CONFIG.minio_secret_key,
None,
);
Minio::builder()
.endpoint(&config::CONFIG.minio_host)
.provider(provider)
.secure(true)
.build()
.unwrap()
}
pub fn get_internal_minio() -> Minio {
let provider = StaticProvider::new(
&config::CONFIG.minio_access_key,
&config::CONFIG.minio_secret_key,
None,
);
Minio::builder()
.endpoint(&config::CONFIG.internal_minio_host)
.provider(provider)
.secure(false)
.build()
.unwrap()
}

View File

@@ -1,6 +1,4 @@
pub mod downloader;
pub mod files_cleaner;
pub mod library_client;
pub mod minio;
pub mod task_creator;
pub mod utils;

View File

@@ -1,26 +1,18 @@
use std::io::Seek;
use std::{fs::File, io::Seek};
use minio_rsc::client::PresignedArgs;
use smallvec::SmallVec;
use smartstring::alias::String as SmartString;
use tempfile::SpooledTempFile;
use tracing::log;
use zip::write::FileOptions;
use crate::{
config,
services::{
downloader::download,
minio::get_minio,
utils::{get_filename, get_stream},
},
services::{downloader::download, utils::get_filename},
structures::{CreateTask, ObjectType, Task},
views::TASK_RESULTS,
};
use super::{
library_client::{get_author_books, get_sequence_books, get_translator_books, Book, Page},
minio::get_internal_minio,
utils::get_key,
};
@@ -71,8 +63,6 @@ pub async fn set_task_error(key: String, error_message: String) {
status_description: "Ошибка!".to_string(),
error_message: Some(error_message),
result_filename: None,
result_internal_link: None,
result_link: None,
content_size: None,
};
@@ -86,96 +76,18 @@ pub async fn set_progress_description(key: String, description: String) {
status_description: description,
error_message: None,
result_filename: None,
result_internal_link: None,
result_link: None,
content_size: None,
};
TASK_RESULTS.insert(key, task.clone()).await;
}
pub async fn upload_to_minio(
archive: SpooledTempFile,
folder_name: String,
filename: String,
) -> Result<(String, String, u64), Box<dyn std::error::Error + Send + Sync>> {
let full_filename = format!("{}/{}", folder_name, filename);
let internal_minio = get_internal_minio();
let is_bucket_exist = match internal_minio
.bucket_exists(&config::CONFIG.minio_bucket)
.await
{
Ok(v) => v,
Err(err) => return Err(Box::new(err)),
};
if !is_bucket_exist {
let _ = internal_minio
.make_bucket(&config::CONFIG.minio_bucket, false)
.await;
}
let data_stream = get_stream(Box::new(archive));
if let Err(err) = internal_minio
.put_object_stream(
&config::CONFIG.minio_bucket,
full_filename.clone(),
Box::pin(data_stream),
None,
)
.await
{
return Err(Box::new(err));
}
let minio = get_minio();
let link = match minio
.presigned_get_object(PresignedArgs::new(
&config::CONFIG.minio_bucket,
full_filename.clone(),
))
.await
{
Ok(v) => v,
Err(err) => {
return Err(Box::new(err));
}
};
let internal_link = match internal_minio
.presigned_get_object(PresignedArgs::new(
&config::CONFIG.minio_bucket,
full_filename.clone(),
))
.await
{
Ok(v) => v,
Err(err) => {
return Err(Box::new(err));
}
};
let obj_size = match internal_minio
.stat_object(&config::CONFIG.minio_bucket, full_filename.clone())
.await
{
Ok(v) => v.unwrap().size().try_into().unwrap(),
Err(_) => todo!(),
};
Ok((link, internal_link, obj_size))
}
pub async fn create_archive(
key: String,
books: Vec<Book>,
file_format: SmartString,
) -> Result<(SpooledTempFile, u64), Box<dyn std::error::Error + Send + Sync>> {
let output_file = tempfile::spooled_tempfile(5 * 1024 * 1024);
) -> Result<(File, u64), Box<dyn std::error::Error + Send + Sync>> {
let output_file = File::create(format!("/tmp/{}", key))?;
let mut archive = zip::ZipWriter::new(output_file);
let options: FileOptions<_> = FileOptions::default()
@@ -290,31 +202,13 @@ pub async fn create_archive_task(key: String, data: CreateTask) {
set_progress_description(key.clone(), "Загрузка архива...".to_string()).await;
let folder_name = {
let mut langs = data.allowed_langs.clone();
langs.sort();
langs.join("_")
};
let (link, internal_link, content_size) =
match upload_to_minio(archive_result, folder_name, final_filename.clone()).await {
Ok(v) => v,
Err(err) => {
set_task_error(key.clone(), "Failed uploading archive!".to_string()).await;
log::error!("{}", err);
return;
}
};
let task = Task {
id: key.clone(),
status: crate::structures::TaskStatus::Complete,
status_description: "Архив готов! Ожидайте файл".to_string(),
error_message: None,
result_filename: Some(final_filename),
result_internal_link: Some(internal_link),
result_link: Some(link),
content_size: Some(content_size),
content_size: Some(archive_result.metadata().unwrap().len()),
};
TASK_RESULTS.insert(key.clone(), task.clone()).await;
@@ -329,8 +223,6 @@ pub async fn create_task(data: CreateTask) -> Task {
status_description: "Подготовка".to_string(),
error_message: None,
result_filename: None,
result_internal_link: None,
result_link: None,
content_size: None,
};

View File

@@ -1,12 +1,14 @@
use async_stream::stream;
use bytes::{Buf, Bytes};
use minio_rsc::error::Error;
use reqwest::Response;
use smartstring::alias::String as SmartString;
use tempfile::SpooledTempFile;
use translit::{gost779b_ru, CharsMapping, Transliterator};
use std::io::{Read, Seek, SeekFrom, Write};
use std::{
error::Error,
io::{Read, Seek, SeekFrom, Write},
};
use crate::structures::{CreateTask, ObjectType};
@@ -56,7 +58,7 @@ pub async fn response_to_tempfile(res: &mut Response) -> Option<(SpooledTempFile
pub fn get_stream(
mut temp_file: Box<dyn Read + Send + Sync>,
) -> impl futures_core::Stream<Item = Result<Bytes, Error>> {
) -> impl futures_core::Stream<Item = Result<Bytes, Box<dyn Error + Sync>>> {
stream! {
let mut buf = [0; 2048];

View File

@@ -33,8 +33,7 @@ pub struct Task {
pub status: TaskStatus,
pub status_description: String,
pub error_message: Option<String>,
pub result_filename: Option<String>,
pub result_link: Option<String>,
pub result_internal_link: Option<String>,
pub content_size: Option<u64>,
}

View File

@@ -1,6 +1,7 @@
use std::time::Duration;
use axum::{
body::Body,
extract::Path,
http::{self, Request, StatusCode},
middleware::{self, Next},
@@ -11,6 +12,8 @@ use axum::{
use axum_prometheus::PrometheusMetricLayer;
use moka::future::Cache;
use once_cell::sync::Lazy;
use tokio::fs::File;
use tokio_util::io::ReaderStream;
use tower_http::trace::{self, TraceLayer};
use tracing::Level;
@@ -71,6 +74,22 @@ async fn auth(req: Request<axum::body::Body>, next: Next) -> Result<Response, St
Ok(next.run(req).await)
}
async fn download(Path(task_id): Path<String>) -> impl IntoResponse {
let task = match TASK_RESULTS.get(&task_id).await {
Some(result) => result,
None => return StatusCode::NOT_FOUND.into_response(),
};
let file = match File::open(format!("/tmp/{}", task.id)).await {
Ok(v) => v,
Err(_) => return StatusCode::NOT_FOUND.into_response(),
};
let stream = ReaderStream::new(file);
Body::from_stream(stream).into_response()
}
pub async fn get_router() -> Router {
let (prometheus_layer, metric_handle) = PrometheusMetricLayer::pair();
@@ -80,6 +99,7 @@ pub async fn get_router() -> Router {
"/api/check_archive/:task_id",
get(check_archive_task_status),
)
.route("/api/download/:task_id", get(download))
.layer(middleware::from_fn(auth))
.layer(prometheus_layer);