alerts.rs
1 use crate::db::Database; 2 use crate::models::{Alert, AlertVar, NotificationConfig, NotificationMethod, WebHookNotif, EmailNotif, TelegramNotif}; 3 use log::{debug, error, info, trace}; 4 use std::collections::HashMap; 5 use rusqlite::params; 6 use tokio::time::{interval, Duration}; 7 use reqwest::Client; 8 use std::sync::Arc; 9 use lettre::{Message, SmtpTransport, Transport, message::header::ContentType}; 10 use lettre::transport::smtp::authentication::Credentials; 11 use teloxide::prelude::*; 12 use teloxide::types::Recipient; 13 14 /// Main function to check alerts and send notifications 15 pub async fn check_alerts(db_path: &str) { 16 let db = match Database::new(db_path) { 17 Ok(db) => Arc::new(db), 18 Err(e) => { 19 error!("Failed to open database: {}", e); 20 return; 21 } 22 }; 23 24 // Run alert checking loop 25 let mut interval_timer = interval(Duration::from_secs(60)); // Check every minute 26 tokio::time::sleep(Duration::from_secs(70)).await; 27 loop { 28 debug!("Checking alerts"); 29 interval_timer.tick().await; 30 31 let alerts = match get_alerts(&db) { 32 Ok(alerts) => alerts, 33 Err(e) => { 34 error!("Failed to get alerts: {}", e); 35 continue; 36 } 37 }; 38 39 if alerts.is_empty() { 40 continue; 41 } 42 43 let notification_methods = match get_notification_methods(&db) { 44 Ok(methods) => methods, 45 Err(e) => { 46 error!("Failed to get notification methods: {}", e); 47 continue; 48 } 49 }; 50 51 let method_map: HashMap<String, NotificationMethod> = notification_methods 52 .into_iter() 53 .map(|m| (m.id.clone(), m)) 54 .collect(); 55 56 // Process each alert 57 for mut alert in alerts { 58 trace!("Checking alert: {:?}", alert); 59 if !alert.enabled { 60 continue; 61 } 62 63 // Check if alert condition is met 64 let is_firing = match check_alert_condition(&db, &alert) { 65 Ok(firing) => firing, 66 Err(e) => { 67 error!("Failed to check alert condition for {:?}: {}", alert, e); 68 continue; 69 } 70 }; 71 trace!("Alert firing: {}", is_firing); 72 73 // If alert state has changed, update it in the database 74 if is_firing != alert.firing { 75 alert.firing = is_firing; 76 if let Err(e) = update_alert_state(&db, &alert) { 77 error!("Failed to update alert state: {}", e); 78 } 79 80 if is_firing { 81 info!("Alert fired: {:?}", alert); 82 let notification_message = format_alert_message(&alert, true); 83 84 // Send notifications to all configured methods for this alert 85 for method_id in &alert.notif_methods { 86 if let Some(method) = method_map.get(method_id) { 87 if method.enabled { 88 if let Err(e) = send_notification(method, ¬ification_message).await { 89 error!("Failed to send notification: {}", e); 90 } 91 } 92 } 93 } 94 } 95 else { 96 // send the relief 97 info!("Alert relief: {:?}", alert); 98 let notification_message = format_alert_message(&alert, false); 99 100 // Send notifications to all configured methods for this alert 101 for method_id in &alert.notif_methods { 102 if let Some(method) = method_map.get(method_id) { 103 if method.enabled { 104 if let Err(e) = send_notification(method, ¬ification_message).await { 105 error!("Failed to send notification: {}", e); 106 } 107 } 108 } 109 } 110 } 111 } 112 } 113 } 114 } 115 116 fn get_alerts(db: &Database) -> Result<Vec<Alert>, String> { 117 let alerts_json = match db.get_kv_str("alerts") { 118 Ok(Some(json)) => json, 119 Ok(None) => return Ok(Vec::new()), 120 Err(e) => return Err(format!("Database error: {}", e)), 121 }; 122 123 serde_json::from_str::<Vec<Alert>>(&alerts_json) 124 .map_err(|e| format!("Failed to parse alerts JSON: {}", e)) 125 } 126 127 fn get_notification_methods(db: &Database) -> Result<Vec<NotificationMethod>, String> { 128 let methods_json = match db.get_kv_str("notification_methods") { 129 Ok(Some(json)) => json, 130 Ok(None) => return Ok(Vec::new()), 131 Err(e) => return Err(format!("Database error: {}", e)), 132 }; 133 134 serde_json::from_str::<Vec<NotificationMethod>>(&methods_json) 135 .map_err(|e| format!("Failed to parse notification methods JSON: {}", e)) 136 } 137 138 /// Check if an alert condition is met consistently across the entire time window 139 fn check_alert_condition(db: &Database, alert: &Alert) -> Result<bool, String> { 140 let time_window_secs = alert.time_window * 60; 141 let now = std::time::SystemTime::now() 142 .duration_since(std::time::UNIX_EPOCH) 143 .map_err(|e| format!("Time error: {}", e))? 144 .as_secs(); 145 146 let start_time = now - time_window_secs as u64; 147 148 // Choose the appropriate table based on the time window 149 let table_suffix = if time_window_secs <= 7200 { 150 "m" // Use minute-level data for windows <= 2 hours 151 } else { 152 "h" // Use hour-level data for longer windows 153 }; 154 155 // Build query 156 let conn = db.conn.lock().unwrap(); 157 158 let (agg_function, comparison_op) = match alert.operator.as_str() { 159 ">" => ("MIN", ">"), // If MIN value > threshold, then ALL values > threshold 160 "<" => ("MAX", "<"), // If MAX value < threshold, then ALL values < threshold 161 _ => return Err(format!("Unknown operator: {}", alert.operator)), 162 }; 163 164 let query_result = match alert.var.cat.as_str() { 165 "sys" => { 166 // System metrics are in general_* tables 167 let query = format!( 168 "SELECT {}({}) FROM general_{} WHERE timestamp >= ?", 169 agg_function, alert.var.var, table_suffix 170 ); 171 conn.query_row(&query, params![start_time], |row| row.get::<_, f64>(0)) 172 }, 173 "net" | "disk" => { 174 // Network or disk metrics need to filter by resource name 175 let query = format!( 176 "SELECT {}({}) FROM {}_{} WHERE timestamp >= ? AND name = ?", 177 agg_function, alert.var.var, alert.var.cat, table_suffix 178 ); 179 conn.query_row(&query, params![start_time, alert.var.resrc], |row| row.get::<_, f64>(0)) 180 }, 181 _ => return Err(format!("Unknown category: {}", alert.var.cat)), 182 }; 183 184 let agg_value = match query_result { 185 Ok(value) => value, 186 Err(rusqlite::Error::QueryReturnedNoRows) => { 187 // No data in the time window 188 return Ok(false); // firing=false if there's no data 189 }, 190 Err(e) => return Err(format!("Database query error: {}", e)), 191 }; 192 193 match comparison_op { 194 ">" => Ok(agg_value > alert.threshold), 195 "<" => Ok(agg_value < alert.threshold), 196 _ => Err(format!("Unknown comparison operator: {}", comparison_op)), 197 } 198 } 199 200 /// Update alert state in the database 201 fn update_alert_state(db: &Database, alert: &Alert) -> Result<(), String> { 202 // Get current alerts 203 let alerts_json = match db.get_kv_str("alerts") { 204 Ok(Some(json)) => json, 205 Ok(None) => "[]".to_string(), 206 Err(e) => return Err(format!("Database error: {}", e)), 207 }; 208 209 let mut alerts = serde_json::from_str::<Vec<Alert>>(&alerts_json) 210 .map_err(|e| format!("Failed to parse alerts JSON: {}", e))?; 211 212 // Update the alert in the list 213 for a in &mut alerts { 214 if a.id == alert.id { 215 a.firing = alert.firing; 216 break; 217 } 218 } 219 220 // Save back to database 221 let updated_json = serde_json::to_string(&alerts) 222 .map_err(|e| format!("Failed to serialize alerts: {}", e))?; 223 224 db.set_kv_str("alerts", &updated_json) 225 .map_err(|e| format!("Failed to save alerts: {}", e)) 226 } 227 228 /// Send a notification through the configured method 229 async fn send_notification(method: &NotificationMethod, message: &str) -> Result<(), String> { 230 match &method.config { 231 NotificationConfig::WebHook(webhook) => send_webhook_notification(webhook, message).await, 232 NotificationConfig::Email(email) => send_email_notification(email, message).await, 233 NotificationConfig::Telegram(telegram) => send_telegram_notification(telegram, message).await, 234 } 235 } 236 237 /// Send a webhook notification 238 async fn send_webhook_notification(webhook: &WebHookNotif, message: &str) -> Result<(), String> { 239 let client = Client::builder().use_rustls_tls().build() 240 .map_err(|e| format!("Failed to build HTTP client: {}", e))?; 241 242 // Replace placeholder with actual message 243 let url = webhook.url.replace("{notif_msg}", message); 244 245 // Prepare headers 246 let mut headers = reqwest::header::HeaderMap::new(); 247 for (key, value) in &webhook.headers { 248 headers.insert( 249 reqwest::header::HeaderName::from_bytes(key.as_bytes()) 250 .map_err(|e| format!("Invalid header name: {}", e))?, 251 reqwest::header::HeaderValue::from_str(value) 252 .map_err(|e| format!("Invalid header value: {}", e))?, 253 ); 254 } 255 256 // Build request based on method 257 let mut request_builder = match webhook.method.to_uppercase().as_str() { 258 "GET" => client.get(&url), 259 "POST" => { 260 let body = webhook.body.replace("{notif_msg}", message); 261 client.post(&url).body(body) 262 }, 263 "PUT" => { 264 let body = webhook.body.replace("{notif_msg}", message); 265 client.put(&url).body(body) 266 }, 267 "PATCH" => { 268 let body = webhook.body.replace("{notif_msg}", message); 269 client.patch(&url).body(body) 270 }, 271 "DELETE" => client.delete(&url), 272 _ => return Err(format!("Unsupported HTTP method: {}", webhook.method)), 273 }; 274 275 // Add headers 276 request_builder = request_builder.headers(headers); 277 278 // Send request 279 let response = request_builder.send() 280 .await 281 .map_err(|e| format!("Failed to send webhook request: {:?}", e))?; 282 283 if response.status().is_success() { 284 info!("Webhook notification sent successfully"); 285 Ok(()) 286 } else { 287 Err(format!("Webhook request failed with status: {}", response.status())) 288 } 289 } 290 291 /// Send an email notification 292 async fn send_email_notification(email: &EmailNotif, message: &str) -> Result<(), String> { 293 // Create email message 294 let email_message = Message::builder() 295 .from(email.from.parse().map_err(|e| format!("Invalid from address: {}", e))?) 296 .to(email.to.parse().map_err(|e| format!("Invalid to address: {}", e))?) 297 .subject(email.subject.clone()) 298 .header(ContentType::TEXT_PLAIN) 299 .body(email.body.replace("{notif_msg}", message)) 300 .map_err(|e| format!("Failed to create email message: {}", e))?; 301 302 // Create SMTP transport 303 let credentials = Credentials::new(email.username.clone(), email.password.clone()); 304 305 let mailer = SmtpTransport::relay(&email.server) 306 .map_err(|e| format!("Failed to create SMTP transport: {}", e))? 307 .credentials(credentials) 308 .port(email.port) 309 .build(); 310 311 // Send email 312 match mailer.send(&email_message) { 313 Ok(_) => { 314 info!("Email notification sent successfully"); 315 Ok(()) 316 }, 317 Err(e) => Err(format!("Failed to send email: {}", e)), 318 } 319 } 320 321 /// Send a telegram notification 322 async fn send_telegram_notification(telegram: &TelegramNotif, message: &str) -> Result<(), String> { 323 // Create Telegram bot 324 let bot = Bot::new(&telegram.token); 325 326 // Parse chat ID 327 let chat_id = telegram.chat_id.parse::<i64>() 328 .map_err(|_| format!("Invalid chat ID: {}", telegram.chat_id))?; 329 330 // Send message 331 match bot.send_message(Recipient::Id(ChatId(chat_id)), message).await { 332 Ok(_) => { 333 info!("Telegram notification sent successfully"); 334 Ok(()) 335 }, 336 Err(e) => Err(format!("Failed to send Telegram message: {}", e)), 337 } 338 } 339 340 /// Get a user-friendly name for a variable 341 fn get_var_friendly_name(var: &AlertVar) -> String { 342 match (var.cat.as_str(), var.var.as_str()) { 343 ("sys", "cpu_usage") => "CPU Usage".to_string(), 344 ("sys", "mem_usage") => "Memory Usage".to_string(), 345 ("sys", "swap_usage") => "Swap Usage".to_string(), 346 ("sys", "load_avg_1") => "1 Min Load Average".to_string(), 347 ("sys", "load_avg_5") => "5 Min Load Average".to_string(), 348 ("sys", "load_avg_15") => "15 Min Load Average".to_string(), 349 ("net", "rx_rate") => "Network Receive Rate".to_string(), 350 ("net", "tx_rate") => "Network Transmit Rate".to_string(), 351 ("disk", "read_rate") => "Disk Read Rate".to_string(), 352 ("disk", "write_rate") => "Disk Write Rate".to_string(), 353 ("disk", "disk_usage") => "Disk Usage".to_string(), 354 _ => format!("{} {}", var.cat, var.var), 355 } 356 } 357 358 /// Format an alert message with appropriate units and verbs 359 fn format_alert_message(alert: &Alert, is_firing: bool) -> String { 360 // Get the resource identifier if applicable 361 let resource = if alert.var.cat != "sys" { 362 format!(" ({})", alert.var.resrc) 363 } else { 364 "".to_string() 365 }; 366 367 // Get unit for the metric 368 let (value_with_unit, verb) = match (alert.var.cat.as_str(), alert.var.var.as_str(), alert.operator.as_str()) { 369 // System metrics 370 (_, "cpu_usage", ">") => (format!("{}%", alert.threshold), "exceeded"), 371 (_, "cpu_usage", "<") => (format!("{}%", alert.threshold), "dropped below"), 372 (_, "mem_usage", ">") => (format!("{}%", alert.threshold), "exceeded"), 373 (_, "mem_usage", "<") => (format!("{}%", alert.threshold), "dropped below"), 374 (_, "swap_usage", ">") => (format!("{}%", alert.threshold), "exceeded"), 375 (_, "swap_usage", "<") => (format!("{}%", alert.threshold), "dropped below"), 376 (_, "disk_usage", ">") => (format!("{}%", alert.threshold), "exceeded"), 377 (_, "disk_usage", "<") => (format!("{}%", alert.threshold), "dropped below"), 378 (_, "load_avg_1", ">") => (format!("{}", alert.threshold), "exceeded"), 379 (_, "load_avg_1", "<") => (format!("{}", alert.threshold), "dropped below"), 380 (_, "load_avg_5", ">") => (format!("{}", alert.threshold), "exceeded"), 381 (_, "load_avg_5", "<") => (format!("{}", alert.threshold), "dropped below"), 382 (_, "load_avg_15", ">") => (format!("{}", alert.threshold), "exceeded"), 383 (_, "load_avg_15", "<") => (format!("{}", alert.threshold), "dropped below"), 384 385 // Network metrics 386 ("net", "rx_rate", ">") => (format_bytes_per_sec(alert.threshold), "exceeded"), 387 ("net", "rx_rate", "<") => (format_bytes_per_sec(alert.threshold), "dropped below"), 388 ("net", "tx_rate", ">") => (format_bytes_per_sec(alert.threshold), "exceeded"), 389 ("net", "tx_rate", "<") => (format_bytes_per_sec(alert.threshold), "dropped below"), 390 391 // Disk metrics 392 ("disk", "read_rate", ">") => (format_bytes_per_sec(alert.threshold), "exceeded"), 393 ("disk", "read_rate", "<") => (format_bytes_per_sec(alert.threshold), "dropped below"), 394 ("disk", "write_rate", ">") => (format_bytes_per_sec(alert.threshold), "exceeded"), 395 ("disk", "write_rate", "<") => (format_bytes_per_sec(alert.threshold), "dropped below"), 396 397 // Default case 398 (_, _, ">") => (format!("{}", alert.threshold), "exceeded"), 399 (_, _, "<") => (format!("{}", alert.threshold), "is below"), 400 _ => (format!("{}", alert.threshold), "equals"), 401 }; 402 403 if is_firing { 404 format!( 405 "ALERT: {}{} {} {} (sustained for {} min)", 406 get_var_friendly_name(&alert.var), 407 resource, 408 verb, 409 value_with_unit, 410 alert.time_window 411 ) 412 } else { 413 format!( 414 "RESOLVED: {}{} no longer {} {} (back to normal)", 415 get_var_friendly_name(&alert.var), 416 resource, 417 verb, 418 value_with_unit 419 ) 420 } 421 } 422 423 /// Format bytes per second with appropriate units 424 fn format_bytes_per_sec(bytes_per_sec: f64) -> String { 425 const KIB: f64 = 1024.0; 426 427 if bytes_per_sec <= 0.0 { 428 return "0.00 B/s".to_string(); 429 } 430 431 let units = ["B/s", "KiB/s", "MiB/s", "GiB/s"]; 432 let exp = (bytes_per_sec.ln() / KIB.ln()).floor() as i32; 433 let exp = exp.clamp(0, (units.len() - 1) as i32); 434 435 let value = bytes_per_sec / KIB.powi(exp); 436 format!("{:.2} {}", value, units[exp as usize]) 437 }