/ src / alerts.rs
alerts.rs
  1  use crate::db::Database;
  2  use crate::models::{Alert, AlertVar, NotificationConfig, NotificationMethod, WebHookNotif, EmailNotif, TelegramNotif};
  3  use log::{debug, error, info, trace};
  4  use std::collections::HashMap;
  5  use rusqlite::params;
  6  use tokio::time::{interval, Duration};
  7  use reqwest::Client;
  8  use std::sync::Arc;
  9  use lettre::{Message, SmtpTransport, Transport, message::header::ContentType};
 10  use lettre::transport::smtp::authentication::Credentials;
 11  use teloxide::prelude::*;
 12  use teloxide::types::Recipient;
 13  
 14  /// Main function to check alerts and send notifications
 15  pub async fn check_alerts(db_path: &str) {
 16      let db = match Database::new(db_path) {
 17          Ok(db) => Arc::new(db),
 18          Err(e) => {
 19              error!("Failed to open database: {}", e);
 20              return;
 21          }
 22      };
 23  
 24      // Run alert checking loop
 25      let mut interval_timer = interval(Duration::from_secs(60)); // Check every minute
 26      tokio::time::sleep(Duration::from_secs(70)).await;
 27      loop {
 28          debug!("Checking alerts");
 29          interval_timer.tick().await;
 30          
 31          let alerts = match get_alerts(&db) {
 32              Ok(alerts) => alerts,
 33              Err(e) => {
 34                  error!("Failed to get alerts: {}", e);
 35                  continue;
 36              }
 37          };
 38  
 39          if alerts.is_empty() {
 40              continue;
 41          }
 42  
 43          let notification_methods = match get_notification_methods(&db) {
 44              Ok(methods) => methods,
 45              Err(e) => {
 46                  error!("Failed to get notification methods: {}", e);
 47                  continue;
 48              }
 49          };
 50  
 51          let method_map: HashMap<String, NotificationMethod> = notification_methods
 52              .into_iter()
 53              .map(|m| (m.id.clone(), m))
 54              .collect();
 55  
 56          // Process each alert
 57          for mut alert in alerts {
 58              trace!("Checking alert: {:?}", alert);
 59              if !alert.enabled {
 60                  continue;
 61              }
 62  
 63              // Check if alert condition is met
 64              let is_firing = match check_alert_condition(&db, &alert) {
 65                  Ok(firing) => firing,
 66                  Err(e) => {
 67                      error!("Failed to check alert condition for {:?}: {}", alert, e);
 68                      continue;
 69                  }
 70              };
 71              trace!("Alert firing: {}", is_firing);
 72              
 73              // If alert state has changed, update it in the database
 74              if is_firing != alert.firing {
 75                  alert.firing = is_firing;
 76                  if let Err(e) = update_alert_state(&db, &alert) {
 77                      error!("Failed to update alert state: {}", e);
 78                  }
 79  
 80                  if is_firing {
 81                      info!("Alert fired: {:?}", alert);
 82                      let notification_message = format_alert_message(&alert, true);
 83                      
 84                      // Send notifications to all configured methods for this alert
 85                      for method_id in &alert.notif_methods {
 86                          if let Some(method) = method_map.get(method_id) {
 87                              if method.enabled {
 88                                  if let Err(e) = send_notification(method, &notification_message).await {
 89                                      error!("Failed to send notification: {}", e);
 90                                  }
 91                              }
 92                          }
 93                      }
 94                  }
 95                  else {
 96                      // send the relief
 97                      info!("Alert relief: {:?}", alert);
 98                      let notification_message = format_alert_message(&alert, false);
 99  
100                      // Send notifications to all configured methods for this alert
101                      for method_id in &alert.notif_methods {
102                          if let Some(method) = method_map.get(method_id) {
103                              if method.enabled {
104                                  if let Err(e) = send_notification(method, &notification_message).await {
105                                      error!("Failed to send notification: {}", e);
106                                  }
107                              }
108                          }
109                      }
110                  }
111              }
112          }
113      }
114  }
115  
116  fn get_alerts(db: &Database) -> Result<Vec<Alert>, String> {
117      let alerts_json = match db.get_kv_str("alerts") {
118          Ok(Some(json)) => json,
119          Ok(None) => return Ok(Vec::new()),
120          Err(e) => return Err(format!("Database error: {}", e)),
121      };
122  
123      serde_json::from_str::<Vec<Alert>>(&alerts_json)
124          .map_err(|e| format!("Failed to parse alerts JSON: {}", e))
125  }
126  
127  fn get_notification_methods(db: &Database) -> Result<Vec<NotificationMethod>, String> {
128      let methods_json = match db.get_kv_str("notification_methods") {
129          Ok(Some(json)) => json,
130          Ok(None) => return Ok(Vec::new()),
131          Err(e) => return Err(format!("Database error: {}", e)),
132      };
133  
134      serde_json::from_str::<Vec<NotificationMethod>>(&methods_json)
135          .map_err(|e| format!("Failed to parse notification methods JSON: {}", e))
136  }
137  
138  /// Check if an alert condition is met consistently across the entire time window
139  fn check_alert_condition(db: &Database, alert: &Alert) -> Result<bool, String> {
140      let time_window_secs = alert.time_window * 60; 
141      let now = std::time::SystemTime::now()
142          .duration_since(std::time::UNIX_EPOCH)
143          .map_err(|e| format!("Time error: {}", e))?
144          .as_secs();
145      
146      let start_time = now - time_window_secs as u64;
147      
148      // Choose the appropriate table based on the time window
149      let table_suffix = if time_window_secs <= 7200 {
150          "m" // Use minute-level data for windows <= 2 hours
151      } else {
152          "h" // Use hour-level data for longer windows
153      };
154      
155      // Build query
156      let conn = db.conn.lock().unwrap();
157      
158      let (agg_function, comparison_op) = match alert.operator.as_str() {
159          ">" => ("MIN", ">"), // If MIN value > threshold, then ALL values > threshold
160          "<" => ("MAX", "<"), // If MAX value < threshold, then ALL values < threshold
161          _ => return Err(format!("Unknown operator: {}", alert.operator)),
162      };
163      
164      let query_result = match alert.var.cat.as_str() {
165          "sys" => {
166              // System metrics are in general_* tables
167              let query = format!(
168                  "SELECT {}({}) FROM general_{} WHERE timestamp >= ?",
169                  agg_function, alert.var.var, table_suffix
170              );
171              conn.query_row(&query, params![start_time], |row| row.get::<_, f64>(0))
172          },
173          "net" | "disk" => {
174              // Network or disk metrics need to filter by resource name
175              let query = format!(
176                  "SELECT {}({}) FROM {}_{} WHERE timestamp >= ? AND name = ?",
177                  agg_function, alert.var.var, alert.var.cat, table_suffix
178              );
179              conn.query_row(&query, params![start_time, alert.var.resrc], |row| row.get::<_, f64>(0))
180          },
181          _ => return Err(format!("Unknown category: {}", alert.var.cat)),
182      };
183      
184      let agg_value = match query_result {
185          Ok(value) => value,
186          Err(rusqlite::Error::QueryReturnedNoRows) => {
187              // No data in the time window
188              return Ok(false); // firing=false if there's no data
189          },
190          Err(e) => return Err(format!("Database query error: {}", e)),
191      };
192      
193      match comparison_op {
194          ">" => Ok(agg_value > alert.threshold),
195          "<" => Ok(agg_value < alert.threshold),
196          _ => Err(format!("Unknown comparison operator: {}", comparison_op)),
197      }
198  }
199  
200  /// Update alert state in the database
201  fn update_alert_state(db: &Database, alert: &Alert) -> Result<(), String> {
202      // Get current alerts
203      let alerts_json = match db.get_kv_str("alerts") {
204          Ok(Some(json)) => json,
205          Ok(None) => "[]".to_string(),
206          Err(e) => return Err(format!("Database error: {}", e)),
207      };
208  
209      let mut alerts = serde_json::from_str::<Vec<Alert>>(&alerts_json)
210          .map_err(|e| format!("Failed to parse alerts JSON: {}", e))?;
211      
212      // Update the alert in the list
213      for a in &mut alerts {
214          if a.id == alert.id {
215              a.firing = alert.firing;
216              break;
217          }
218      }
219      
220      // Save back to database
221      let updated_json = serde_json::to_string(&alerts)
222          .map_err(|e| format!("Failed to serialize alerts: {}", e))?;
223      
224      db.set_kv_str("alerts", &updated_json)
225          .map_err(|e| format!("Failed to save alerts: {}", e))
226  }
227  
228  /// Send a notification through the configured method
229  async fn send_notification(method: &NotificationMethod, message: &str) -> Result<(), String> {
230      match &method.config {
231          NotificationConfig::WebHook(webhook) => send_webhook_notification(webhook, message).await,
232          NotificationConfig::Email(email) => send_email_notification(email, message).await,
233          NotificationConfig::Telegram(telegram) => send_telegram_notification(telegram, message).await,
234      }
235  }
236  
237  /// Send a webhook notification
238  async fn send_webhook_notification(webhook: &WebHookNotif, message: &str) -> Result<(), String> {
239      let client = Client::builder().use_rustls_tls().build()
240          .map_err(|e| format!("Failed to build HTTP client: {}", e))?;
241  
242      // Replace placeholder with actual message
243      let url = webhook.url.replace("{notif_msg}", message);
244      
245      // Prepare headers
246      let mut headers = reqwest::header::HeaderMap::new();
247      for (key, value) in &webhook.headers {
248          headers.insert(
249              reqwest::header::HeaderName::from_bytes(key.as_bytes())
250                  .map_err(|e| format!("Invalid header name: {}", e))?,
251              reqwest::header::HeaderValue::from_str(value)
252                  .map_err(|e| format!("Invalid header value: {}", e))?,
253          );
254      }
255      
256      // Build request based on method
257      let mut request_builder = match webhook.method.to_uppercase().as_str() {
258          "GET" => client.get(&url),
259          "POST" => {
260              let body = webhook.body.replace("{notif_msg}", message);
261              client.post(&url).body(body)
262          },
263          "PUT" => {
264              let body = webhook.body.replace("{notif_msg}", message);
265              client.put(&url).body(body)
266          },
267          "PATCH" => {
268              let body = webhook.body.replace("{notif_msg}", message);
269              client.patch(&url).body(body)
270          },
271          "DELETE" => client.delete(&url),
272          _ => return Err(format!("Unsupported HTTP method: {}", webhook.method)),
273      };
274      
275      // Add headers
276      request_builder = request_builder.headers(headers);
277      
278      // Send request
279      let response = request_builder.send()
280          .await
281          .map_err(|e| format!("Failed to send webhook request: {:?}", e))?;
282      
283      if response.status().is_success() {
284          info!("Webhook notification sent successfully");
285          Ok(())
286      } else {
287          Err(format!("Webhook request failed with status: {}", response.status()))
288      }
289  }
290  
291  /// Send an email notification
292  async fn send_email_notification(email: &EmailNotif, message: &str) -> Result<(), String> {
293      // Create email message
294      let email_message = Message::builder()
295          .from(email.from.parse().map_err(|e| format!("Invalid from address: {}", e))?)
296          .to(email.to.parse().map_err(|e| format!("Invalid to address: {}", e))?)
297          .subject(email.subject.clone())
298          .header(ContentType::TEXT_PLAIN)
299          .body(email.body.replace("{notif_msg}", message))
300          .map_err(|e| format!("Failed to create email message: {}", e))?;
301  
302      // Create SMTP transport
303      let credentials = Credentials::new(email.username.clone(), email.password.clone());
304      
305      let mailer = SmtpTransport::relay(&email.server)
306          .map_err(|e| format!("Failed to create SMTP transport: {}", e))?
307          .credentials(credentials)
308          .port(email.port)
309          .build();
310  
311      // Send email
312      match mailer.send(&email_message) {
313          Ok(_) => {
314              info!("Email notification sent successfully");
315              Ok(())
316          },
317          Err(e) => Err(format!("Failed to send email: {}", e)),
318      }
319  }
320  
321  /// Send a telegram notification
322  async fn send_telegram_notification(telegram: &TelegramNotif, message: &str) -> Result<(), String> {
323      // Create Telegram bot
324      let bot = Bot::new(&telegram.token);
325      
326      // Parse chat ID
327      let chat_id = telegram.chat_id.parse::<i64>()
328          .map_err(|_| format!("Invalid chat ID: {}", telegram.chat_id))?;
329      
330      // Send message
331      match bot.send_message(Recipient::Id(ChatId(chat_id)), message).await {
332          Ok(_) => {
333              info!("Telegram notification sent successfully");
334              Ok(())
335          },
336          Err(e) => Err(format!("Failed to send Telegram message: {}", e)),
337      }
338  }
339  
340  /// Get a user-friendly name for a variable
341  fn get_var_friendly_name(var: &AlertVar) -> String {
342      match (var.cat.as_str(), var.var.as_str()) {
343          ("sys", "cpu_usage") => "CPU Usage".to_string(),
344          ("sys", "mem_usage") => "Memory Usage".to_string(),
345          ("sys", "swap_usage") => "Swap Usage".to_string(),
346          ("sys", "load_avg_1") => "1 Min Load Average".to_string(),
347          ("sys", "load_avg_5") => "5 Min Load Average".to_string(),
348          ("sys", "load_avg_15") => "15 Min Load Average".to_string(),
349          ("net", "rx_rate") => "Network Receive Rate".to_string(),
350          ("net", "tx_rate") => "Network Transmit Rate".to_string(),
351          ("disk", "read_rate") => "Disk Read Rate".to_string(),
352          ("disk", "write_rate") => "Disk Write Rate".to_string(),
353          ("disk", "disk_usage") => "Disk Usage".to_string(),
354          _ => format!("{} {}", var.cat, var.var),
355      }
356  }
357  
358  /// Format an alert message with appropriate units and verbs
359  fn format_alert_message(alert: &Alert, is_firing: bool) -> String {
360      // Get the resource identifier if applicable
361      let resource = if alert.var.cat != "sys" {
362          format!(" ({})", alert.var.resrc)
363      } else {
364          "".to_string()
365      };
366  
367      // Get unit for the metric
368      let (value_with_unit, verb) = match (alert.var.cat.as_str(), alert.var.var.as_str(), alert.operator.as_str()) {
369          // System metrics
370          (_, "cpu_usage", ">") => (format!("{}%", alert.threshold), "exceeded"),
371          (_, "cpu_usage", "<") => (format!("{}%", alert.threshold), "dropped below"),
372          (_, "mem_usage", ">") => (format!("{}%", alert.threshold), "exceeded"),
373          (_, "mem_usage", "<") => (format!("{}%", alert.threshold), "dropped below"),
374          (_, "swap_usage", ">") => (format!("{}%", alert.threshold), "exceeded"),
375          (_, "swap_usage", "<") => (format!("{}%", alert.threshold), "dropped below"),
376          (_, "disk_usage", ">") => (format!("{}%", alert.threshold), "exceeded"),
377          (_, "disk_usage", "<") => (format!("{}%", alert.threshold), "dropped below"),
378          (_, "load_avg_1", ">") => (format!("{}", alert.threshold), "exceeded"),
379          (_, "load_avg_1", "<") => (format!("{}", alert.threshold), "dropped below"),
380          (_, "load_avg_5", ">") => (format!("{}", alert.threshold), "exceeded"),
381          (_, "load_avg_5", "<") => (format!("{}", alert.threshold), "dropped below"),
382          (_, "load_avg_15", ">") => (format!("{}", alert.threshold), "exceeded"),
383          (_, "load_avg_15", "<") => (format!("{}", alert.threshold), "dropped below"),
384          
385          // Network metrics
386          ("net", "rx_rate", ">") => (format_bytes_per_sec(alert.threshold), "exceeded"),
387          ("net", "rx_rate", "<") => (format_bytes_per_sec(alert.threshold), "dropped below"),
388          ("net", "tx_rate", ">") => (format_bytes_per_sec(alert.threshold), "exceeded"),
389          ("net", "tx_rate", "<") => (format_bytes_per_sec(alert.threshold), "dropped below"),
390          
391          // Disk metrics
392          ("disk", "read_rate", ">") => (format_bytes_per_sec(alert.threshold), "exceeded"),
393          ("disk", "read_rate", "<") => (format_bytes_per_sec(alert.threshold), "dropped below"),
394          ("disk", "write_rate", ">") => (format_bytes_per_sec(alert.threshold), "exceeded"),
395          ("disk", "write_rate", "<") => (format_bytes_per_sec(alert.threshold), "dropped below"),
396          
397          // Default case
398          (_, _, ">") => (format!("{}", alert.threshold), "exceeded"),
399          (_, _, "<") => (format!("{}", alert.threshold), "is below"),
400          _ => (format!("{}", alert.threshold), "equals"),
401      };
402  
403      if is_firing {
404          format!(
405              "ALERT: {}{} {} {} (sustained for {} min)",
406              get_var_friendly_name(&alert.var),
407              resource,
408              verb,
409              value_with_unit,
410              alert.time_window
411          )
412      } else {
413          format!(
414              "RESOLVED: {}{} no longer {} {} (back to normal)",
415              get_var_friendly_name(&alert.var),
416              resource,
417              verb,
418              value_with_unit
419          )
420      }
421  }
422  
423  /// Format bytes per second with appropriate units
424  fn format_bytes_per_sec(bytes_per_sec: f64) -> String {
425      const KIB: f64 = 1024.0;
426      
427      if bytes_per_sec <= 0.0 {
428          return "0.00 B/s".to_string();
429      }
430      
431      let units = ["B/s", "KiB/s", "MiB/s", "GiB/s"];
432      let exp = (bytes_per_sec.ln() / KIB.ln()).floor() as i32;
433      let exp = exp.clamp(0, (units.len() - 1) as i32);
434      
435      let value = bytes_per_sec / KIB.powi(exp);
436      format!("{:.2} {}", value, units[exp as usize])
437  }