/ src / db.rs
db.rs
  1  use crate::models::{AlertVar, GeneralInfo, HistoricalQueryOptions, HistoricalSeries, ALERT_VARIABLES};
  2  use log::error;
  3  use rusqlite::{Connection, Result, params};
  4  use std::sync::{Arc, Mutex};
  5  use std::time::{Duration, SystemTime, UNIX_EPOCH};
  6  use sysinfo::System;
  7  
  8  use crate::collect_info::collect_general_info;
  9  
 10  const STORE_INTERVAL: u64 = 2;
 11  pub struct Database {
 12      pub conn: Arc<Mutex<Connection>>,
 13  }
 14  
 15  impl Database {
 16      pub fn new(db_path: &str) -> Result<Self> {
 17          let conn = Connection::open(db_path)?;
 18  
 19          // Enable Write-Ahead Logging for better concurrency and performance
 20          let _ = conn.query_row("PRAGMA journal_mode = WAL", [], |row| {
 21              row.get::<_, String>(0)
 22          })?;
 23          conn.execute("PRAGMA synchronous = NORMAL", [])?;
 24          conn.execute("PRAGMA cache_size = 8000", [])?;
 25          conn.execute("PRAGMA temp_store = MEMORY", [])?;
 26  
 27          // Create tables
 28          for table_name in ["general_s", "general_m", "general_h", "general_d"] {
 29              conn.execute(
 30                  format!(
 31                      "CREATE TABLE IF NOT EXISTS {} (
 32                      id INTEGER PRIMARY KEY,
 33                      timestamp INTEGER,
 34                      cpu_usage REAL,
 35                      mem_usage REAL,
 36                      swap_usage REAL,
 37                      load_avg_1 REAL,
 38                      load_avg_5 REAL,
 39                      load_avg_15 REAL
 40                  )",
 41                      table_name
 42                  )
 43                  .as_str(),
 44                  [],
 45              )?;
 46          }
 47  
 48          for table_name in ["net_s", "net_m", "net_h", "net_d"] {
 49              conn.execute(
 50                  format!(
 51                      "CREATE TABLE IF NOT EXISTS {} (
 52                      id INTEGER PRIMARY KEY,
 53                      timestamp INTEGER,
 54                      name TEXT,
 55                      rx REAL,
 56                      tx REAL,
 57                      rx_rate REAL,
 58                      tx_rate REAL
 59                  )",
 60                      table_name
 61                  )
 62                  .as_str(),
 63                  [],
 64              )?;
 65          }
 66  
 67          for table_name in ["disk_s", "disk_m", "disk_h", "disk_d"] {
 68              conn.execute(
 69                  format!(
 70                      "CREATE TABLE IF NOT EXISTS {} (
 71                      id INTEGER PRIMARY KEY,
 72                      timestamp INTEGER,
 73                      name TEXT,
 74                      total_read REAL,
 75                      total_write REAL,
 76                      read_rate REAL,
 77                      write_rate REAL,
 78                      disk_usage REAL
 79                  )",
 80                      table_name
 81                  )
 82                  .as_str(),
 83                  [],
 84              )?;
 85          }
 86  
 87          conn.execute(
 88              "CREATE TABLE IF NOT EXISTS kv (
 89                  key TEXT PRIMARY KEY,
 90                  value BLOB
 91              )",
 92              [],
 93          )?;
 94  
 95          // Create indexes
 96          // Timestamp indexes for all tables
 97          for table in [
 98              "general_s",
 99              "general_m",
100              "general_h",
101              "general_d",
102              "net_s",
103              "net_m",
104              "net_h",
105              "net_d",
106              "disk_s",
107              "disk_m",
108              "disk_h",
109              "disk_d",
110          ] {
111              conn.execute(
112                  &format!(
113                      "CREATE INDEX IF NOT EXISTS idx_{}_timestamp ON {} (timestamp)",
114                      table, table
115                  ),
116                  [],
117              )?;
118          }
119  
120          // Compound indexes for network and disk tables to optimize queries by name and timestamp
121          for table in [
122              "net_s", "net_m", "net_h", "net_d", "disk_s", "disk_m", "disk_h", "disk_d",
123          ] {
124              conn.execute(
125                  &format!(
126                      "CREATE INDEX IF NOT EXISTS idx_{}_name_timestamp ON {} (name, timestamp)",
127                      table, table
128                  ),
129                  [],
130              )?;
131          }
132  
133          conn.execute("CREATE INDEX IF NOT EXISTS idx_kv_key ON kv (key)", [])?;
134  
135          Ok(Database {
136              conn: Arc::new(Mutex::new(conn)),
137          })
138      }
139  
140      pub fn get_kv_str(&self, key: &str) -> Result<Option<String>> {
141          let conn = self.conn.lock().unwrap();
142          let mut stmt = conn.prepare("SELECT value FROM kv WHERE key = ?")?;
143          let mut rows = stmt.query(params![key])?;
144          match rows.next()? {
145              Some(row) => Ok(Some(row.get(0)?)),
146              _ => Ok(None),
147          }
148      }
149  
150      pub fn set_kv_str(&self, key: &str, value: &str) -> Result<()> {
151          let conn = self.conn.lock().unwrap();
152          conn.execute(
153              "INSERT OR REPLACE INTO kv (key, value) VALUES (?, ?)",
154              params![key, value],
155          )?;
156          Ok(())
157      }
158  
159      pub fn query_historical_data(
160          &self,
161          options: &HistoricalQueryOptions,
162      ) -> Result<Vec<HistoricalSeries>> {
163          let resolution = match options.resolution.as_str() {
164              "second" => "s",
165              "minute" => "m",
166              "hour" => "h",
167              "day" => "d",
168              _ => "m", // Default to minute metrics
169          };
170  
171          let mut series_results: Vec<HistoricalSeries> = Vec::new();
172  
173          for cat in ["general", "net", "disk"] {
174              let table_name = format!("{}_{}", cat, resolution);
175              // Build the query
176              let mut query = format!("SELECT * FROM {}", table_name);
177              let mut query_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
178  
179              // Fix query construction - first condition shouldn't have AND
180              let mut has_where = false;
181              if let Some(start) = options.start_time {
182                  query.push_str(" WHERE timestamp >= ?");
183                  query_params.push(Box::new(start));
184                  has_where = true;
185              }
186  
187              if let Some(end) = options.end_time {
188                  if has_where {
189                      query.push_str(" AND timestamp <= ?");
190                  } else {
191                      query.push_str(" WHERE timestamp <= ?");
192                  }
193                  query_params.push(Box::new(end));
194              }
195  
196              query.push_str(" ORDER BY timestamp");
197  
198              if let Some(limit) = options.limit {
199                  query.push_str(" LIMIT ?");
200                  query_params.push(Box::new(limit));
201              }
202  
203              // create HistoricalSeries from query results for each column
204              let mut series_map: std::collections::HashMap<(String, String), HistoricalSeries> =
205                  std::collections::HashMap::new();
206  
207              let conn = self.conn.lock().unwrap();
208              let mut stmt = match conn.prepare(&query) {
209                  Ok(stmt) => stmt,
210                  Err(e) => {
211                      error!("Failed to prepare query for {}: {}", table_name, e);
212                      continue;
213                  }
214              };
215  
216              // Get column names
217              let column_names: Vec<String> =
218                  stmt.column_names().iter().map(|s| s.to_string()).collect();
219  
220              // Create parameter references
221              let param_refs: Vec<&dyn rusqlite::ToSql> =
222                  query_params.iter().map(|p| p.as_ref()).collect();
223  
224              // Execute the query
225              let mut rows = match stmt.query(rusqlite::params_from_iter(param_refs.iter())) {
226                  Ok(rows) => rows,
227                  Err(e) => {
228                      error!("Failed to execute query for {}: {}", table_name, e);
229                      continue;
230                  }
231              };
232  
233              // Process rows
234              while let Some(row) = rows.next().ok().flatten() {
235                  let timestamp: i64 = row.get(1)?; // timestamp is always index 1
236  
237                  if cat == "general" {
238                      // General tables have no name column, use "system" as name
239                      for (idx, col_name) in column_names.iter().enumerate().skip(2) {
240                          // Skip id and timestamp
241                          if let Ok(value) = row.get::<_, f64>(idx) {
242                              let key = (col_name.clone(), "system".to_string());
243                              let entry = series_map.entry(key).or_insert_with(|| HistoricalSeries {
244                                  cat: cat.to_string(),
245                                  stype: col_name.clone(),
246                                  name: "system".to_string(),
247                                  timestamps: Vec::new(),
248                                  values: Vec::new(),
249                              });
250  
251                              entry.timestamps.push(timestamp);
252                              entry.values.push(value);
253                          }
254                      }
255                  } else {
256                      // Net and disk tables have name column at index 2
257                      let name: String = match row.get(2) {
258                          Ok(name) => name,
259                          Err(_) => continue, // Skip if name can't be retrieved
260                      };
261  
262                      for (idx, col_name) in column_names.iter().enumerate().skip(3) {
263                          // skip(3) to skip id, timestamp, name
264                          if ["rx", "tx", "total_read", "total_write"].contains(&col_name.as_str()) {
265                              // Skip total read/write columns
266                              continue;
267                          }
268                          // Skip id, timestamp, name
269                          if let Ok(value) = row.get::<_, f64>(idx) {
270                              let key = (col_name.clone(), name.clone());
271                              let entry = series_map.entry(key).or_insert_with(|| HistoricalSeries {
272                                  cat: cat.to_string(),
273                                  stype: col_name.clone(),
274                                  name: name.clone(),
275                                  timestamps: Vec::new(),
276                                  values: Vec::new(),
277                              });
278  
279                              entry.timestamps.push(timestamp);
280                              entry.values.push(value);
281                          }
282                      }
283                  }
284              }
285  
286              // Add all series from this table to the results
287              series_results.extend(series_map.into_values());
288          }
289          Ok(series_results)
290      }
291  
292      pub fn get_resource_list(&self) -> Result<Vec<AlertVar>> {
293          let mut alert_vars: Vec<AlertVar> = Vec::new();
294  
295          // get all names in net_s and disk_s
296          for c in ["net", "disk"] {
297              let conn = self.conn.lock().unwrap();
298              let mut stmt = conn.prepare(&format!("SELECT DISTINCT name FROM {}_s", c))?;
299              let rows = stmt.query_map([], |row| row.get(0))?;
300              let resources: Vec<String> = rows.collect::<Result<Vec<String>>>()?;
301  
302              let cols: Vec<&(&str, &str)> = ALERT_VARIABLES.iter().filter(|v| v.0 == c).collect();
303  
304              // every resource_col combination
305              for resource in resources.clone() {
306                  for &(_, col) in cols.iter() {
307                      alert_vars.push(AlertVar {
308                          cat: c.to_string(),
309                          var: col.to_string(),
310                          resrc: resource.clone(),
311                      });
312                  }
313              }
314          }
315  
316          // system vars
317          let cols: Vec<&(&str, &str)> = ALERT_VARIABLES.iter().filter(|v| v.0 == "sys").collect();
318          for &(_, col) in cols.iter() {
319              alert_vars.push(AlertVar {
320                  cat: "sys".to_string(),
321                  var: col.to_string(),
322                  resrc: "sys".to_string(),
323              });
324          }
325  
326          Ok(alert_vars)
327      }
328  }
329  
330  pub async fn db_update(sys: Arc<Mutex<System>>, db_path: &str) {
331      let db = match Database::new(db_path) {
332          Ok(db) => Arc::new(db),
333          Err(e) => {
334              error!("Failed to initialize database: {}", e);
335              return;
336          }
337      };
338      let mut last_info: Option<GeneralInfo> = None;
339      let mut last_timestamp: Option<u64> = None;
340      loop {
341          let general_info = {
342              let sys = sys.lock().unwrap();
343              collect_general_info(&sys)
344          };
345          {
346              let timestamp = SystemTime::now()
347                  .duration_since(UNIX_EPOCH)
348                  .unwrap()
349                  .as_secs();
350  
351              let conn = db.conn.lock().unwrap();
352              conn.execute(
353                  "INSERT INTO general_s (
354                      timestamp, cpu_usage, mem_usage, swap_usage, load_avg_1, load_avg_5, load_avg_15
355                  ) VALUES (?, ROUND(?,2), ROUND(?,2), ROUND(?,2), ROUND(?,2), ROUND(?,2), ROUND(?,2))",
356                  params![
357                      timestamp,
358                      general_info.cpu.avg_usage,
359                      100.0 * general_info.mem.used_mem as f32 / general_info.mem.total_mem as f32,
360                      100.0 * general_info.mem.used_swap as f32 / general_info.mem.total_swap as f32,
361                      general_info.sys.load_avg[0],
362                      general_info.sys.load_avg[1],
363                      general_info.sys.load_avg[2]
364                  ],
365              )
366              .unwrap();
367  
368              for interface in general_info.net.interfaces.iter() {
369                  let mut rx_rate = 0.0;
370                  let mut tx_rate = 0.0;
371                  // check if last info is initialized
372                  if last_info.is_some() && last_timestamp.is_some() {
373                      // Find the matching interface in last_info
374                      let last_interface = last_info
375                          .as_ref()
376                          .unwrap()
377                          .net
378                          .interfaces
379                          .iter()
380                          .find(|last_iface| last_iface.name == interface.name);
381  
382                      if let Some(last_iface) = last_interface {
383                          // Calculate time difference in seconds
384                          let elapsed_secs = timestamp as f64 - last_timestamp.unwrap() as f64;
385  
386                          // Calculate rates
387                          rx_rate = if interface.rx > last_iface.rx {
388                              (interface.rx - last_iface.rx) as f64 / elapsed_secs
389                          } else {
390                              0.0
391                          };
392                          tx_rate = if interface.tx > last_iface.tx {
393                              (interface.tx - last_iface.tx) as f64 / elapsed_secs
394                          } else {
395                              0.0
396                          };
397                      }
398                  }
399                  conn.execute(
400                      "INSERT INTO net_s (
401                          timestamp, name, rx, tx, rx_rate, tx_rate
402                      ) VALUES (?, ?, ?, ?, ROUND(?), ROUND(?))",
403                      params![
404                          timestamp,
405                          interface.name,
406                          interface.rx,
407                          interface.tx,
408                          rx_rate,
409                          tx_rate
410                      ],
411                  )
412                  .unwrap();
413              }
414  
415              for disk in general_info.disk.disks.iter() {
416                  let mut read_rate = 0.0;
417                  let mut write_rate = 0.0;
418                  // check if last info is initialized
419                  if last_info.is_some() && last_timestamp.is_some() {
420                      // Find the matching disk in last_info
421                      let last_disk = last_info
422                          .as_ref()
423                          .unwrap()
424                          .disk
425                          .disks
426                          .iter()
427                          .find(|last_disk| last_disk.mount_point == disk.mount_point);
428  
429                      if let Some(last_disk) = last_disk {
430                          // Calculate time difference in seconds
431                          let elapsed_secs = timestamp as f64 - last_timestamp.unwrap() as f64;
432  
433                          // Calculate rates
434                          read_rate = if disk.io[2] > last_disk.io[2] {
435                              (disk.io[2] - last_disk.io[2]) as f64 / elapsed_secs
436                          } else {
437                              0.0
438                          };
439                          write_rate = if disk.io[3] > last_disk.io[3] {
440                              (disk.io[3] - last_disk.io[3]) as f64 / elapsed_secs
441                          } else {
442                              0.0
443                          };
444                      }
445                  }
446                  conn.execute(
447                      "INSERT INTO disk_s (
448                          timestamp, name, total_read, total_write, read_rate, write_rate, disk_usage
449                      ) VALUES (?, ?, ?, ?, ROUND(?), ROUND(?), ROUND(?,2))",
450                      params![
451                          timestamp,
452                          disk.mount_point,
453                          disk.io[2],
454                          disk.io[3],
455                          read_rate,
456                          write_rate,
457                          100.0 * (1.0 - disk.free_space as f32 / disk.total_space as f32)
458                      ],
459                  )
460                  .unwrap();
461              }
462  
463              // if skipped over the minute mark still need to aggregate the last minute's data
464  
465              if timestamp % 60 < STORE_INTERVAL {
466                  // We have passed (or are on) a minute boundary
467                  // Set timestamp to the minute boundary
468                  let timestamp = timestamp - (timestamp % 60);
469                  // Aggregate last minute's data
470                  let _ = conn.execute(
471                      "INSERT INTO general_m
472                                      (
473                                      timestamp,
474                                      cpu_usage,
475                                      mem_usage,
476                                      swap_usage,
477                                      load_avg_1,
478                                      load_avg_5,
479                                      load_avg_15
480                                      )
481                                      SELECT 
482                                          ?2,
483                                          round(AVG(cpu_usage), 2),
484                                          round(AVG(mem_usage), 2),
485                                          round(AVG(swap_usage), 2),
486                                          round(AVG(load_avg_1), 2),
487                                          round(AVG(load_avg_5), 2),
488                                          round(AVG(load_avg_15), 2)
489                                      FROM general_s
490                                      WHERE timestamp >= ?1 AND timestamp <= ?2;",
491                      params![timestamp - 60, timestamp],
492                  );
493                  let _ = conn.execute(
494                      "INSERT INTO net_m
495                                      (
496                                      timestamp,
497                                      name,
498                                      rx,
499                                      tx,
500                                      rx_rate,
501                                      tx_rate
502                                      )
503                                      SELECT 
504                                          ?2,
505                                          name,
506                                          MAX(rx),
507                                          MAX(tx),
508                                          round(AVG(rx_rate)),
509                                          round(AVG(tx_rate))
510                                      FROM net_s
511                                      WHERE timestamp >= ?1 AND timestamp <= ?2
512                                      GROUP BY name;",
513                      params![timestamp - 60, timestamp],
514                  );
515                  let _ = conn.execute(
516                      "INSERT INTO disk_m
517                                      (
518                                      timestamp,
519                                      name,
520                                      total_read,
521                                      total_write,
522                                      read_rate,
523                                      write_rate,
524                                      disk_usage
525                                      )
526                                      SELECT 
527                                          ?2,
528                                          name,
529                                          MAX(total_read),
530                                          MAX(total_write),
531                                          round(AVG(read_rate)),
532                                          round(AVG(write_rate)),
533                                          round(AVG(disk_usage), 2)
534                                      FROM disk_s
535                                      WHERE timestamp >= ?1 AND timestamp <= ?2
536                                      GROUP BY name;",
537                      params![timestamp - 60, timestamp],
538                  );
539  
540                  // Check if it's an hour boundary
541                  if (timestamp / 60) % 60 == 0 {
542                      // Aggregate minute_metrics for the last hour
543                      let _ = conn.execute(
544                          "INSERT INTO general_h
545                                          (
546                                          timestamp,
547                                          cpu_usage,
548                                          mem_usage,
549                                          swap_usage,
550                                          load_avg_1,
551                                          load_avg_5,
552                                          load_avg_15
553                                          )
554                                          SELECT 
555                                              ?2,
556                                              round(AVG(cpu_usage)),
557                                              round(AVG(mem_usage)),
558                                              round(AVG(swap_usage)),
559                                              round(AVG(load_avg_1)),
560                                              round(AVG(load_avg_5)),
561                                              round(AVG(load_avg_15))
562                                          FROM general_m
563                                          WHERE timestamp >= ?1 AND timestamp <= ?2;",
564                          params![timestamp - 3600, timestamp],
565                      );
566                      let _ = conn.execute(
567                          "INSERT INTO net_h
568                                          (
569                                          timestamp,
570                                          name,
571                                          rx,
572                                          tx,
573                                          rx_rate,
574                                          tx_rate
575                                          )
576                                          SELECT 
577                                              ?2,
578                                              name,
579                                              MAX(rx),
580                                              MAX(tx),
581                                              round(AVG(rx_rate)),
582                                              round(AVG(tx_rate))
583                                          FROM net_m
584                                          WHERE timestamp >= ?1 AND timestamp <= ?2
585                                          GROUP BY name;",
586                          params![timestamp - 3600, timestamp],
587                      );
588                      let _ = conn.execute(
589                          "INSERT INTO disk_h
590                                          (
591                                          timestamp,
592                                          name,
593                                          total_read,
594                                          total_write,
595                                          read_rate,
596                                          write_rate,
597                                          disk_usage
598                                          )
599                                          SELECT 
600                                              ?2,
601                                              name,
602                                              MAX(total_read),
603                                              MAX(total_write),
604                                              round(AVG(read_rate)),
605                                              round(AVG(write_rate)),
606                                              round(AVG(disk_usage), 2)
607                                          FROM disk_m
608                                          WHERE timestamp >= ?1 AND timestamp <= ?2
609                                          GROUP BY name;",
610                          params![timestamp - 3600, timestamp],
611                      );
612                      // Check if it's a day boundary (midnight)
613                      if (timestamp / 3600) % 24 == 0 {
614                          // Aggregate hour_metrics for the last day
615                          let _ = conn.execute(
616                              "INSERT INTO general_d
617                                              (
618                                              timestamp,
619                                              cpu_usage,
620                                              mem_usage,
621                                              swap_usage,
622                                              load_avg_1,
623                                              load_avg_5,
624                                              load_avg_15
625                                              )
626                                              SELECT 
627                                                  ?2,
628                                                  round(AVG(cpu_usage)),
629                                                  round(AVG(mem_usage)),
630                                                  round(AVG(swap_usage)),
631                                                  round(AVG(load_avg_1)),
632                                                  round(AVG(load_avg_5)),
633                                                  round(AVG(load_avg_15))
634                                              FROM general_h
635                                              WHERE timestamp >= ?1 AND timestamp <= ?2;",
636                              params![timestamp - 86400, timestamp],
637                          );
638                          let _ = conn.execute(
639                              "INSERT INTO net_d
640                                              (
641                                              timestamp,
642                                              name,
643                                              rx,
644                                              tx,
645                                              rx_rate,
646                                              tx_rate
647                                              )
648                                              SELECT 
649                                                  ?2,
650                                                  name,
651                                                  MAX(rx),
652                                                  MAX(tx),
653                                                  round(AVG(rx_rate)),
654                                                  round(AVG(tx_rate))
655                                              FROM net_h
656                                              WHERE timestamp >= ?1 AND timestamp <= ?2
657                                              GROUP BY name;",
658                              params![timestamp - 86400, timestamp],
659                          );
660                          let _ = conn.execute(
661                              "INSERT INTO disk_d
662                                              (
663                                              timestamp,
664                                              name,
665                                              total_read,
666                                              total_write,
667                                              read_rate,
668                                              write_rate,
669                                              disk_usage
670                                              )
671                                              SELECT 
672                                                  ?2,
673                                                  name,
674                                                  MAX(total_read),
675                                                  MAX(total_write),
676                                                  round(AVG(read_rate)),
677                                                  round(AVG(write_rate)),
678                                                  round(AVG(disk_usage), 2)
679                                              FROM disk_h
680                                              WHERE timestamp >= ?1 AND timestamp <= ?2
681                                              GROUP BY name;",
682                              params![timestamp - 86400, timestamp],
683                          );
684  
685                          // Clean up older hour metrics (keep 365 days)
686                          let cutoff = timestamp - (86400 * 365);
687                          for table_name in ["general_h", "net_h", "disk_h"] {
688                              conn.execute(
689                                  format!("DELETE FROM {} WHERE timestamp < ?", table_name).as_str(),
690                                  params![cutoff],
691                              )
692                              .unwrap();
693                          }
694                          // Run VACCUM
695                          conn.execute("VACUUM", []).unwrap();
696                          conn.execute("pragma optimize", []).unwrap();
697                      }
698                      // Clean up older second data (keep 1 hours)
699                      let cutoff = timestamp - 3600;
700                      for table_name in ["general_s", "net_s", "disk_s"] {
701                          conn.execute(
702                              format!("DELETE FROM {} WHERE timestamp < ?", table_name).as_str(),
703                              params![cutoff],
704                          )
705                          .unwrap();
706                      }
707                      // Clean up older minute metrics (keep 96 hours)
708                      let cutoff = timestamp - (86400 * 4);
709                      for table_name in ["general_m", "net_m", "disk_m"] {
710                          conn.execute(
711                              format!("DELETE FROM {} WHERE timestamp < ?", table_name).as_str(),
712                              params![cutoff],
713                          )
714                          .unwrap();
715                      }
716                  }
717              }
718              last_info = Some(general_info.clone());
719              last_timestamp = Some(timestamp);
720          }
721  
722          tokio::time::sleep(Duration::from_secs(STORE_INTERVAL)).await;
723      }
724  }