pub mod axum_server; pub mod bot_manager_client; pub mod closable_sender; pub mod internal; pub mod utils; use once_cell::sync::Lazy; use smartstring::alias::String as SmartString; use teloxide::stop::StopToken; use tokio::task::JoinSet; use tracing::log; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use smallvec::SmallVec; use tokio::time::{sleep, Duration}; use tokio::sync::Semaphore; use teloxide::prelude::*; use moka::future::Cache; use self::axum_server::start_axum_server; use self::bot_manager_client::get_bots; pub use self::bot_manager_client::{BotCache, BotData}; use self::closable_sender::ClosableSender; use self::internal::set_webhook; pub static USER_ACTIVITY_CACHE: Lazy> = Lazy::new(|| { Cache::builder() .time_to_idle(Duration::from_secs(30 * 60)) .max_capacity(4096) .build() }); pub static USER_LANGS_CACHE: Lazy>> = Lazy::new(|| { Cache::builder() .time_to_idle(Duration::from_secs(30 * 60)) .max_capacity(4096) .build() }); pub static CHAT_DONATION_NOTIFICATIONS_CACHE: Lazy> = Lazy::new(|| { Cache::builder() .time_to_idle(Duration::from_secs(24 * 60 * 60)) .max_capacity(4098) .build() }); pub static WEBHOOK_CHECK_ERRORS_COUNT: Lazy> = Lazy::new(|| { Cache::builder() .time_to_idle(Duration::from_secs(600)) .max_capacity(128) .build() }); type StopTokenWithSender = ( StopToken, ClosableSender>, ); pub static BOTS_ROUTES: Lazy> = Lazy::new(|| { Cache::builder() .time_to_idle(Duration::from_secs(60 * 60)) .max_capacity(100) .eviction_listener(|token, value: StopTokenWithSender, _cause| { log::info!("Stop Bot(token={})!", token); let (stop_token, mut sender) = value; stop_token.stop(); sender.close(); }) .build() }); pub static BOTS_DATA: Lazy> = Lazy::new(|| Cache::builder().build()); pub struct BotsManager; impl BotsManager { async fn check() { let bots_data = get_bots().await; let bots_data = match bots_data { Ok(v) => v, Err(err) => { log::info!("{:?}", err); return; } }; let semaphore = Arc::new(Semaphore::const_new(10)); let mut set_webhook_tasks = JoinSet::new(); for bot_data in bots_data.iter() { if BOTS_DATA.contains_key(&bot_data.token) { continue; } let bot_data: BotData = bot_data.clone(); let semphore = semaphore.clone(); set_webhook_tasks.spawn(async move { let _permit = semphore.acquire().await.unwrap(); let webhook_status = set_webhook(&bot_data).await; if webhook_status { BOTS_DATA .insert(bot_data.token.clone(), bot_data.clone()) .await; } drop(_permit); }); } loop { if set_webhook_tasks.join_next().await.is_none() { break; } } } pub async fn stop_all() { for (_, (stop_token, _)) in BOTS_ROUTES.iter() { stop_token.stop(); } BOTS_ROUTES.invalidate_all(); sleep(Duration::from_secs(5)).await; } pub async fn check_pending_updates() { for (token, bot_data) in BOTS_DATA.iter() { let error_count = WEBHOOK_CHECK_ERRORS_COUNT.get(&bot_data.id).await.unwrap_or(0); if error_count >= 3 { continue; } let bot = Bot::new(token.clone().as_str()); let result = bot.get_webhook_info().send().await; match result { Ok(webhook_info) => { if webhook_info.pending_update_count != 0 { continue; } if webhook_info.last_error_message.is_some() { log::error!( "Webhook last error: {:?}", webhook_info.last_error_message ); set_webhook(&bot_data).await; } }, Err(err) => { log::error!("Error getting webhook info: {:?}", err); WEBHOOK_CHECK_ERRORS_COUNT.insert(bot_data.id, error_count + 1).await; }, } } } pub async fn start(running: Arc) { start_axum_server(running.clone()).await; let mut tick_number: i32 = 0; loop { tokio::time::sleep(Duration::from_secs(1)).await; if !running.load(Ordering::SeqCst) { BotsManager::stop_all().await; return; } if tick_number % 30 == 0 { BotsManager::check().await; } if tick_number % 180 == 0 { BotsManager::check_pending_updates().await; } tick_number = (tick_number + 1) % 180; } } }