From f31a4645c9e3fc35883c22c7886f2b6a9a6d3c41 Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Sun, 6 Aug 2023 22:22:57 +0200 Subject: [PATCH] Refactor --- .github/workflows/build_docker_image.yml | 8 +- src/services/downloader.rs | 63 ++++ src/services/minio.rs | 19 ++ src/services/mod.rs | 1 + src/services/task_creator.rs | 354 +++++++++-------------- src/services/utils.rs | 112 ++++++- src/structures.rs | 7 +- src/views.rs | 2 +- 8 files changed, 330 insertions(+), 236 deletions(-) create mode 100644 src/services/minio.rs diff --git a/.github/workflows/build_docker_image.yml b/.github/workflows/build_docker_image.yml index 31f04ca..1bf4ab2 100644 --- a/.github/workflows/build_docker_image.yml +++ b/.github/workflows/build_docker_image.yml @@ -13,12 +13,6 @@ jobs: name: Checkout uses: actions/checkout@v3 - - - name: Set up QEMU - uses: docker/setup-qemu-action@v2 - with: - platforms: arm64 - - name: Set up Docker Buildx uses: docker/setup-buildx-action@v2 @@ -44,7 +38,7 @@ jobs: IMAGE: ${{ steps.repository_name.outputs.lowercase }} with: push: true - platforms: linux/amd64,linux/arm64 + platforms: linux/amd64 tags: ghcr.io/${{ env.IMAGE }}:latest context: . file: ./docker/production.dockerfile diff --git a/src/services/downloader.rs b/src/services/downloader.rs index e69de29..462e6c6 100644 --- a/src/services/downloader.rs +++ b/src/services/downloader.rs @@ -0,0 +1,63 @@ +use std::fmt; + +use base64::{engine::general_purpose, Engine}; +use reqwest::StatusCode; +use tempfile::SpooledTempFile; +use smartstring::alias::String as SmartString; + +use crate::config; + +use super::utils::response_to_tempfile; + + +#[derive(Debug, Clone)] +struct DownloadError { + status_code: StatusCode, +} + +impl fmt::Display for DownloadError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Status code is {0}", self.status_code) + } +} + +impl std::error::Error for DownloadError {} + + +pub async fn download( + book_id: u64, + file_type: SmartString, +) -> Result<(SpooledTempFile, String), Box> { + let mut response = reqwest::Client::new() + .get(format!( + "{}/api/v1/download/{book_id}/{file_type}", + &config::CONFIG.cache_url + )) + .header("Authorization", &config::CONFIG.cache_api_key) + .send() + .await? + .error_for_status()?; + + if response.status() != StatusCode::OK { + return Err(Box::new(DownloadError { + status_code: response.status(), + })); + }; + + let headers = response.headers(); + + let base64_encoder = general_purpose::STANDARD; + + let filename = std::str::from_utf8( + &base64_encoder + .decode(headers.get("x-filename-b64").unwrap()) + .unwrap(), + ) + .unwrap() + .to_string(); + + let output_file = response_to_tempfile(&mut response).await.unwrap(); + + Ok((output_file.0, filename)) +} + diff --git a/src/services/minio.rs b/src/services/minio.rs new file mode 100644 index 0000000..26c5eb5 --- /dev/null +++ b/src/services/minio.rs @@ -0,0 +1,19 @@ +use minio_rsc::{provider::StaticProvider, Minio}; + +use crate::config; + + +pub fn get_minio() -> Minio { + let provider = StaticProvider::new( + &config::CONFIG.minio_access_key, + &config::CONFIG.minio_secret_key, + None + ); + + return Minio::builder() + .host(&config::CONFIG.minio_host) + .provider(provider) + .secure(false) + .build() + .unwrap() +} diff --git a/src/services/mod.rs b/src/services/mod.rs index f6ff111..49f9b4e 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -2,3 +2,4 @@ pub mod task_creator; pub mod library_client; pub mod utils; pub mod downloader; +pub mod minio; diff --git a/src/services/task_creator.rs b/src/services/task_creator.rs index 15da263..df507cf 100644 --- a/src/services/task_creator.rs +++ b/src/services/task_creator.rs @@ -1,37 +1,23 @@ -use std::{fmt, io::{Seek, Read}}; +use std::io::Seek; -use base64::{engine::general_purpose, Engine}; -use bytes::Bytes; -use minio_rsc::{provider::StaticProvider, Minio, types::args::{ObjectArgs, PresignedArgs}, errors::MinioError}; -use reqwest::StatusCode; +use minio_rsc::types::args::{ObjectArgs, PresignedArgs}; use smallvec::SmallVec; use smartstring::alias::String as SmartString; use tempfile::SpooledTempFile; +use tracing::log; use translit::{Transliterator, gost779b_ru, CharsMapping}; use zip::write::FileOptions; -use async_stream::stream; -use crate::{structures::{CreateTask, Task, ObjectType}, config, views::TASK_RESULTS}; +use crate::{structures::{CreateTask, Task, ObjectType}, config, views::TASK_RESULTS, services::{downloader::download, utils::{get_stream, get_filename}, minio::get_minio}}; -use super::{library_client::{Book, get_sequence_books, get_author_books, get_translator_books, Page, get_sequence, get_author}, utils::response_to_tempfile}; - - -pub fn get_key( - input_data: CreateTask -) -> String { - let mut data = input_data.clone(); - data.allowed_langs.sort(); - - let data_string = serde_json::to_string(&data).unwrap(); - - format!("{:x}", md5::compute(data_string)) -} +use super::{library_client::{Book, get_sequence_books, get_author_books, get_translator_books, Page, get_sequence, get_author}, utils::get_key}; pub async fn get_books( object_id: u32, allowed_langs: SmallVec<[SmartString; 3]>, - books_getter: fn(id: u32, page: u32, allowed_langs: SmallVec<[SmartString; 3]>) -> Fut + books_getter: fn(id: u32, page: u32, allowed_langs: SmallVec<[SmartString; 3]>) -> Fut, + file_format: SmartString ) -> Result, Box> where Fut: std::future::Future, Box>>, @@ -58,106 +44,80 @@ where current_page += 1; }; + let result = result + .iter() + .filter(|book| book.available_types.contains(&file_format.to_string())) + .map(|b| b.clone()) + .collect(); + Ok(result) } -#[derive(Debug, Clone)] -struct DownloadError { - status_code: StatusCode, + +pub async fn set_task_error(key: String, error_message: String) { + let task = Task { + id: key.clone(), + status: crate::structures::TaskStatus::Failled, + status_description: "Ошибка!".to_string(), + error_message: Some(error_message), + result_filename: None, + result_link: None + }; + + TASK_RESULTS.insert(key, task.clone()).await; } -impl fmt::Display for DownloadError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Status code is {0}", self.status_code) + +pub async fn set_progress_description(key: String, description: String) { + let task = Task { + id: key.clone(), + status: crate::structures::TaskStatus::InProgress, + status_description: description, + error_message: None, + result_filename: None, + result_link: None + }; + + TASK_RESULTS.insert(key, task.clone()).await; +} + + + +pub async fn upload_to_minio(archive: SpooledTempFile, filename: String) -> Result> { + let minio = get_minio(); + + let is_bucket_exist = match minio.bucket_exists(&config::CONFIG.minio_bucket).await { + Ok(v) => v, + Err(err) => return Err(Box::new(err)), + }; + + if !is_bucket_exist { + let _ = minio.make_bucket(&config::CONFIG.minio_bucket, false).await; } -} -impl std::error::Error for DownloadError {} + let data_stream = get_stream(Box::new(archive)); -pub async fn download( - book_id: u64, - file_type: String, -) -> Result, Box> { - let mut response = reqwest::Client::new() - .get(format!( - "{}/api/v1/download/{book_id}/{file_type}", - &config::CONFIG.cache_url - )) - .header("Authorization", &config::CONFIG.cache_api_key) - .send() - .await? - .error_for_status()?; - - if response.status() != StatusCode::OK { - return Err(Box::new(DownloadError { - status_code: response.status(), - })); - }; - - let headers = response.headers(); - - let base64_encoder = general_purpose::STANDARD; - - let filename = std::str::from_utf8( - &base64_encoder - .decode(headers.get("x-filename-b64").unwrap()) - .unwrap(), - ) - .unwrap() - .to_string(); - - let output_file = match response_to_tempfile(&mut response).await { - Some(v) => v.0, - None => return Ok(None), - }; - - Ok(Some((output_file, filename))) -} - - -fn get_stream(mut temp_file: Box) -> impl futures_core::Stream> { - stream! { - let mut buf = [0; 2048]; - - loop { - match temp_file.read(&mut buf) { - Ok(count) => { - if count == 0 { - break; - } - - yield Ok(Bytes::copy_from_slice(&buf[0..count])) - }, - Err(_) => break - } - } + if let Err(err) = minio.put_object_stream( + ObjectArgs::new(&config::CONFIG.minio_bucket, filename.clone()), + Box::pin(data_stream) + ).await { + return Err(Box::new(err)); } -} - -pub async fn create_archive_task(key: String, data: CreateTask) { - let books = match data.object_type { - ObjectType::Sequence => get_books(data.object_id, data.allowed_langs, get_sequence_books).await, - ObjectType::Author => get_books(data.object_id, data.allowed_langs, get_author_books).await, - ObjectType::Translator => get_books(data.object_id, data.allowed_langs, get_translator_books).await, - }; - - let books = match books { + let link = match minio.presigned_get_object( + PresignedArgs::new(&config::CONFIG.minio_bucket, filename) + ).await { Ok(v) => v, Err(err) => { - return; // log error and task error + return Err(Box::new(err)); }, }; - let books: Vec<_> = books - .iter() - .filter(|book| book.available_types.contains(&data.file_format)) - .collect(); + Ok(link) +} - if books.is_empty() { - return; // log error and task error - } +pub async fn create_archive(key: String, books: Vec, file_format: SmartString) -> Result> { let output_file = tempfile::spooled_tempfile(5 * 1024 * 1024); let mut archive = zip::ZipWriter::new(output_file); @@ -166,152 +126,96 @@ pub async fn create_archive_task(key: String, data: CreateTask) { .compression_method(zip::CompressionMethod::Deflated) .unix_permissions(0o755); - for book in books { - let (mut tmp_file, filename) = match download(book.id, data.file_format.clone()).await { - Ok(v) => { - match v { - Some(v) => v, - None => { - return; // log error and task error - }, - } - }, - Err(err) => { - return; // log error and task error - }, + let books_count = books.len(); + + for (index, book) in books.iter().enumerate() { + let (mut tmp_file, filename) = match download(book.id, file_format.clone()).await { + Ok(v) => v, + Err(_) => continue, }; match archive.start_file(filename, options) { Ok(_) => (), - Err(_) => return, // log error and task error + Err(err) => return Err(Box::new(err)), }; match std::io::copy(&mut tmp_file, &mut archive) { Ok(_) => (), - Err(_) => return, // log error and task error + Err(err) => return Err(Box::new(err)), }; + + set_progress_description(key.clone(), format!("Обработано: {}/{}", index + 1, books_count)).await; } let mut archive_result = match archive.finish() { Ok(v) => v, - Err(err) => return, // log error and task error + Err(err) => return Err(Box::new(err)), }; archive_result.rewind().unwrap(); - let result_filename = match data.object_type { - ObjectType::Sequence => { - match get_sequence(data.object_id).await { - Ok(v) => v.name, - Err(err) => { - println!("{}", err); - return; // log error and task error - }, - } - }, - ObjectType::Author | ObjectType::Translator => { - match get_author(data.object_id).await { - Ok(v) => { - vec![v.first_name, v.last_name, v.middle_name.unwrap_or("".to_string())] - .into_iter() - .filter(|v| !v.is_empty()) - .collect::>() - .join("_") - }, - Err(err) => { - println!("{}", err); - return; // log error and task error - }, - } + Ok(archive_result) +} + + +pub async fn create_archive_task(key: String, data: CreateTask) { + let books = match data.object_type { + ObjectType::Sequence => get_books(data.object_id, data.allowed_langs, get_sequence_books, data.file_format.clone()).await, + ObjectType::Author => get_books(data.object_id, data.allowed_langs, get_author_books, data.file_format.clone()).await, + ObjectType::Translator => get_books(data.object_id, data.allowed_langs, get_translator_books, data.file_format.clone()).await, + }; + + let books = match books { + Ok(v) => v, + Err(err) => { + set_task_error(key.clone(), "Failed getting books!".to_string()).await; + log::error!("{}", err); + return; }, }; - let final_filename = { - let transliterator = Transliterator::new(gost779b_ru()); - - let mut filename_without_type = transliterator.convert(&result_filename, false); - - "(),….’!\"?»«':".get(..).into_iter().for_each(|char| { - filename_without_type = filename_without_type.replace(char, ""); - }); - - let replace_char_map: CharsMapping = [ - ("—", "-"), - ("/", "_"), - ("№", "N"), - (" ", "_"), - ("–", "-"), - ("á", "a"), - (" ", "_"), - ("'", ""), - ("`", ""), - ("[", ""), - ("]", ""), - ("\"", ""), - ].to_vec(); - - let replace_transliterator = Transliterator::new(replace_char_map); - let normal_filename = replace_transliterator.convert(&filename_without_type, false); - - let normal_filename = normal_filename.replace(|c: char| !c.is_ascii(), ""); - - let right_part = format!(".zip"); - let normal_filename_slice = std::cmp::min(64 - right_part.len() - 1, normal_filename.len() - 1); - - let left_part = if normal_filename_slice == normal_filename.len() - 1 { - &normal_filename - } else { - normal_filename.get(..normal_filename_slice).unwrap_or_else(|| panic!("Can't slice left part: {:?} {:?}", normal_filename, normal_filename_slice)) - }; - - format!("{left_part}{right_part}") - }; - - let provider = StaticProvider::new( - &config::CONFIG.minio_access_key, - &config::CONFIG.minio_secret_key, - None - ); - let minio = Minio::builder() - .host(&config::CONFIG.minio_host) - .provider(provider) - .secure(false) - .build() - .unwrap(); - - let is_bucket_exist = match minio.bucket_exists(&config::CONFIG.minio_bucket).await { - Ok(v) => v, - Err(err) => { - println!("{}", err); - return; // log error and task error - }, // log error and task error - }; - - if !is_bucket_exist { - minio.make_bucket(&config::CONFIG.minio_bucket, false).await; + if books.is_empty() { + set_task_error(key.clone(), "No books!".to_string()).await; + return; } - let data_stream = get_stream(Box::new(archive_result)); - - if let Err(err) = minio.put_object_stream( - ObjectArgs::new(&config::CONFIG.minio_bucket, final_filename.clone()), - Box::pin(data_stream) - ).await { - println!("{}", err); - return; // log error and task error - } - - let link = match minio.presigned_get_object( - PresignedArgs::new(&config::CONFIG.minio_bucket, final_filename) - ).await { + let final_filename = match get_filename(data.object_type, data.object_id).await { Ok(v) => v, Err(err) => { - println!("{}", err); - return; // log error and task error - }, // log error and task error + set_task_error(key.clone(), "Can't get archive name!".to_string()).await; + log::error!("{}", err); + return; + }, }; - println!("{}", link); + let archive_result = match create_archive(books, data.file_format).await { + Ok(v) => v, + Err(err) => { + set_task_error(key.clone(), "Failed downloading books!".to_string()).await; + log::error!("{}", err); + return; + }, + }; + + let link = match upload_to_minio(archive_result, final_filename.clone()).await { + Ok(v) => v, + Err(err) => { + set_task_error(key.clone(), "Failed uploading archive!".to_string()).await; + log::error!("{}", err); + return; + }, + }; + + let task = Task { + id: key.clone(), + status: crate::structures::TaskStatus::Complete, + status_description: "Архив готов!".to_string(), + error_message: None, + result_filename: Some(final_filename), + result_link: Some(link) + }; + + TASK_RESULTS.insert(key.clone(), task.clone()).await; } @@ -323,6 +227,8 @@ pub async fn create_task( let task = Task { id: key.clone(), status: crate::structures::TaskStatus::InProgress, + status_description: "Подготовка".to_string(), + error_message: None, result_filename: None, result_link: None }; diff --git a/src/services/utils.rs b/src/services/utils.rs index b59db88..326629d 100644 --- a/src/services/utils.rs +++ b/src/services/utils.rs @@ -1,9 +1,27 @@ +use minio_rsc::errors::MinioError; use reqwest::Response; use tempfile::SpooledTempFile; -use bytes::Buf; +use bytes::{Buf, Bytes}; +use async_stream::stream; +use translit::{gost779b_ru, Transliterator, CharsMapping}; + +use std::io::{Seek, SeekFrom, Write, Read}; + +use crate::structures::{CreateTask, ObjectType}; + +use super::library_client::{get_sequence, get_author}; -use std::io::{Seek, SeekFrom, Write}; +pub fn get_key( + input_data: CreateTask +) -> String { + let mut data = input_data.clone(); + data.allowed_langs.sort(); + + let data_string = serde_json::to_string(&data).unwrap(); + + format!("{:x}", md5::compute(data_string)) +} pub async fn response_to_tempfile(res: &mut Response) -> Option<(SpooledTempFile, usize)> { @@ -38,3 +56,93 @@ pub async fn response_to_tempfile(res: &mut Response) -> Option<(SpooledTempFile Some((tmp_file, data_size)) } + + +pub fn get_stream(mut temp_file: Box) -> impl futures_core::Stream> { + stream! { + let mut buf = [0; 2048]; + + loop { + match temp_file.read(&mut buf) { + Ok(count) => { + if count == 0 { + break; + } + + yield Ok(Bytes::copy_from_slice(&buf[0..count])) + }, + Err(_) => break + } + } + } +} + +pub async fn get_filename(object_type: ObjectType, object_id: u32) -> Result> { + let result_filename = match object_type { + ObjectType::Sequence => { + match get_sequence(object_id).await { + Ok(v) => v.name, + Err(err) => { + return Err(err); + }, + } + }, + ObjectType::Author | ObjectType::Translator => { + match get_author(object_id).await { + Ok(v) => { + vec![v.first_name, v.last_name, v.middle_name.unwrap_or("".to_string())] + .into_iter() + .filter(|v| !v.is_empty()) + .collect::>() + .join("_") + }, + Err(err) => { + return Err(err); + }, + } + }, + }; + + let final_filename = { + let transliterator = Transliterator::new(gost779b_ru()); + + let mut filename_without_type = transliterator.convert(&result_filename, false); + + "(),….’!\"?»«':".get(..).into_iter().for_each(|char| { + filename_without_type = filename_without_type.replace(char, ""); + }); + + let replace_char_map: CharsMapping = [ + ("—", "-"), + ("/", "_"), + ("№", "N"), + (" ", "_"), + ("–", "-"), + ("á", "a"), + (" ", "_"), + ("'", ""), + ("`", ""), + ("[", ""), + ("]", ""), + ("\"", ""), + ].to_vec(); + + let replace_transliterator = Transliterator::new(replace_char_map); + let normal_filename = replace_transliterator.convert(&filename_without_type, false); + + let normal_filename = normal_filename.replace(|c: char| !c.is_ascii(), ""); + + let right_part = format!(".zip"); + let normal_filename_slice = std::cmp::min(64 - right_part.len() - 1, normal_filename.len() - 1); + + let left_part = if normal_filename_slice == normal_filename.len() - 1 { + &normal_filename + } else { + normal_filename.get(..normal_filename_slice).unwrap_or_else(|| panic!("Can't slice left part: {:?} {:?}", normal_filename, normal_filename_slice)) + }; + + format!("{left_part}{right_part}") + }; + + Ok(final_filename) +} diff --git a/src/structures.rs b/src/structures.rs index a10cf82..235df77 100644 --- a/src/structures.rs +++ b/src/structures.rs @@ -8,7 +8,8 @@ use smartstring::alias::String as SmartString; pub enum TaskStatus { InProgress, Archiving, - Complete + Complete, + Failled } #[derive(Serialize, Deserialize, Clone)] @@ -23,7 +24,7 @@ pub enum ObjectType { pub struct CreateTask{ pub object_id: u32, pub object_type: ObjectType, - pub file_format: String, + pub file_format: SmartString, pub allowed_langs: SmallVec<[SmartString; 3]> } @@ -31,6 +32,8 @@ pub struct CreateTask{ pub struct Task { pub id: String, pub status: TaskStatus, + pub status_description: String, + pub error_message: Option, pub result_filename: Option, pub result_link: Option } diff --git a/src/views.rs b/src/views.rs index 1af0c56..e1932b8 100644 --- a/src/views.rs +++ b/src/views.rs @@ -8,7 +8,7 @@ use tower_http::trace::{TraceLayer, self}; use tracing::Level; -use crate::{config::CONFIG, structures::{Task, CreateTask}, services::task_creator::{get_key, create_task}}; +use crate::{config::CONFIG, structures::{Task, CreateTask}, services::{task_creator::create_task, utils::get_key}}; pub static TASK_RESULTS: Lazy> = Lazy::new(|| {