db.rs
1 use crate::models::{AlertVar, GeneralInfo, HistoricalQueryOptions, HistoricalSeries, ALERT_VARIABLES}; 2 use log::error; 3 use rusqlite::{Connection, Result, params}; 4 use std::sync::{Arc, Mutex}; 5 use std::time::{Duration, SystemTime, UNIX_EPOCH}; 6 use sysinfo::System; 7 8 use crate::collect_info::collect_general_info; 9 10 const STORE_INTERVAL: u64 = 2; 11 pub struct Database { 12 pub conn: Arc<Mutex<Connection>>, 13 } 14 15 impl Database { 16 pub fn new(db_path: &str) -> Result<Self> { 17 let conn = Connection::open(db_path)?; 18 19 // Enable Write-Ahead Logging for better concurrency and performance 20 let _ = conn.query_row("PRAGMA journal_mode = WAL", [], |row| { 21 row.get::<_, String>(0) 22 })?; 23 conn.execute("PRAGMA synchronous = NORMAL", [])?; 24 conn.execute("PRAGMA cache_size = 8000", [])?; 25 conn.execute("PRAGMA temp_store = MEMORY", [])?; 26 27 // Create tables 28 for table_name in ["general_s", "general_m", "general_h", "general_d"] { 29 conn.execute( 30 format!( 31 "CREATE TABLE IF NOT EXISTS {} ( 32 id INTEGER PRIMARY KEY, 33 timestamp INTEGER, 34 cpu_usage REAL, 35 mem_usage REAL, 36 swap_usage REAL, 37 load_avg_1 REAL, 38 load_avg_5 REAL, 39 load_avg_15 REAL 40 )", 41 table_name 42 ) 43 .as_str(), 44 [], 45 )?; 46 } 47 48 for table_name in ["net_s", "net_m", "net_h", "net_d"] { 49 conn.execute( 50 format!( 51 "CREATE TABLE IF NOT EXISTS {} ( 52 id INTEGER PRIMARY KEY, 53 timestamp INTEGER, 54 name TEXT, 55 rx REAL, 56 tx REAL, 57 rx_rate REAL, 58 tx_rate REAL 59 )", 60 table_name 61 ) 62 .as_str(), 63 [], 64 )?; 65 } 66 67 for table_name in ["disk_s", "disk_m", "disk_h", "disk_d"] { 68 conn.execute( 69 format!( 70 "CREATE TABLE IF NOT EXISTS {} ( 71 id INTEGER PRIMARY KEY, 72 timestamp INTEGER, 73 name TEXT, 74 total_read REAL, 75 total_write REAL, 76 read_rate REAL, 77 write_rate REAL, 78 disk_usage REAL 79 )", 80 table_name 81 ) 82 .as_str(), 83 [], 84 )?; 85 } 86 87 conn.execute( 88 "CREATE TABLE IF NOT EXISTS kv ( 89 key TEXT PRIMARY KEY, 90 value BLOB 91 )", 92 [], 93 )?; 94 95 // Create indexes 96 // Timestamp indexes for all tables 97 for table in [ 98 "general_s", 99 "general_m", 100 "general_h", 101 "general_d", 102 "net_s", 103 "net_m", 104 "net_h", 105 "net_d", 106 "disk_s", 107 "disk_m", 108 "disk_h", 109 "disk_d", 110 ] { 111 conn.execute( 112 &format!( 113 "CREATE INDEX IF NOT EXISTS idx_{}_timestamp ON {} (timestamp)", 114 table, table 115 ), 116 [], 117 )?; 118 } 119 120 // Compound indexes for network and disk tables to optimize queries by name and timestamp 121 for table in [ 122 "net_s", "net_m", "net_h", "net_d", "disk_s", "disk_m", "disk_h", "disk_d", 123 ] { 124 conn.execute( 125 &format!( 126 "CREATE INDEX IF NOT EXISTS idx_{}_name_timestamp ON {} (name, timestamp)", 127 table, table 128 ), 129 [], 130 )?; 131 } 132 133 conn.execute("CREATE INDEX IF NOT EXISTS idx_kv_key ON kv (key)", [])?; 134 135 Ok(Database { 136 conn: Arc::new(Mutex::new(conn)), 137 }) 138 } 139 140 pub fn get_kv_str(&self, key: &str) -> Result<Option<String>> { 141 let conn = self.conn.lock().unwrap(); 142 let mut stmt = conn.prepare("SELECT value FROM kv WHERE key = ?")?; 143 let mut rows = stmt.query(params![key])?; 144 match rows.next()? { 145 Some(row) => Ok(Some(row.get(0)?)), 146 _ => Ok(None), 147 } 148 } 149 150 pub fn set_kv_str(&self, key: &str, value: &str) -> Result<()> { 151 let conn = self.conn.lock().unwrap(); 152 conn.execute( 153 "INSERT OR REPLACE INTO kv (key, value) VALUES (?, ?)", 154 params![key, value], 155 )?; 156 Ok(()) 157 } 158 159 pub fn query_historical_data( 160 &self, 161 options: &HistoricalQueryOptions, 162 ) -> Result<Vec<HistoricalSeries>> { 163 let resolution = match options.resolution.as_str() { 164 "second" => "s", 165 "minute" => "m", 166 "hour" => "h", 167 "day" => "d", 168 _ => "m", // Default to minute metrics 169 }; 170 171 let mut series_results: Vec<HistoricalSeries> = Vec::new(); 172 173 for cat in ["general", "net", "disk"] { 174 let table_name = format!("{}_{}", cat, resolution); 175 // Build the query 176 let mut query = format!("SELECT * FROM {}", table_name); 177 let mut query_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new(); 178 179 // Fix query construction - first condition shouldn't have AND 180 let mut has_where = false; 181 if let Some(start) = options.start_time { 182 query.push_str(" WHERE timestamp >= ?"); 183 query_params.push(Box::new(start)); 184 has_where = true; 185 } 186 187 if let Some(end) = options.end_time { 188 if has_where { 189 query.push_str(" AND timestamp <= ?"); 190 } else { 191 query.push_str(" WHERE timestamp <= ?"); 192 } 193 query_params.push(Box::new(end)); 194 } 195 196 query.push_str(" ORDER BY timestamp"); 197 198 if let Some(limit) = options.limit { 199 query.push_str(" LIMIT ?"); 200 query_params.push(Box::new(limit)); 201 } 202 203 // create HistoricalSeries from query results for each column 204 let mut series_map: std::collections::HashMap<(String, String), HistoricalSeries> = 205 std::collections::HashMap::new(); 206 207 let conn = self.conn.lock().unwrap(); 208 let mut stmt = match conn.prepare(&query) { 209 Ok(stmt) => stmt, 210 Err(e) => { 211 error!("Failed to prepare query for {}: {}", table_name, e); 212 continue; 213 } 214 }; 215 216 // Get column names 217 let column_names: Vec<String> = 218 stmt.column_names().iter().map(|s| s.to_string()).collect(); 219 220 // Create parameter references 221 let param_refs: Vec<&dyn rusqlite::ToSql> = 222 query_params.iter().map(|p| p.as_ref()).collect(); 223 224 // Execute the query 225 let mut rows = match stmt.query(rusqlite::params_from_iter(param_refs.iter())) { 226 Ok(rows) => rows, 227 Err(e) => { 228 error!("Failed to execute query for {}: {}", table_name, e); 229 continue; 230 } 231 }; 232 233 // Process rows 234 while let Some(row) = rows.next().ok().flatten() { 235 let timestamp: i64 = row.get(1)?; // timestamp is always index 1 236 237 if cat == "general" { 238 // General tables have no name column, use "system" as name 239 for (idx, col_name) in column_names.iter().enumerate().skip(2) { 240 // Skip id and timestamp 241 if let Ok(value) = row.get::<_, f64>(idx) { 242 let key = (col_name.clone(), "system".to_string()); 243 let entry = series_map.entry(key).or_insert_with(|| HistoricalSeries { 244 cat: cat.to_string(), 245 stype: col_name.clone(), 246 name: "system".to_string(), 247 timestamps: Vec::new(), 248 values: Vec::new(), 249 }); 250 251 entry.timestamps.push(timestamp); 252 entry.values.push(value); 253 } 254 } 255 } else { 256 // Net and disk tables have name column at index 2 257 let name: String = match row.get(2) { 258 Ok(name) => name, 259 Err(_) => continue, // Skip if name can't be retrieved 260 }; 261 262 for (idx, col_name) in column_names.iter().enumerate().skip(3) { 263 // skip(3) to skip id, timestamp, name 264 if ["rx", "tx", "total_read", "total_write"].contains(&col_name.as_str()) { 265 // Skip total read/write columns 266 continue; 267 } 268 // Skip id, timestamp, name 269 if let Ok(value) = row.get::<_, f64>(idx) { 270 let key = (col_name.clone(), name.clone()); 271 let entry = series_map.entry(key).or_insert_with(|| HistoricalSeries { 272 cat: cat.to_string(), 273 stype: col_name.clone(), 274 name: name.clone(), 275 timestamps: Vec::new(), 276 values: Vec::new(), 277 }); 278 279 entry.timestamps.push(timestamp); 280 entry.values.push(value); 281 } 282 } 283 } 284 } 285 286 // Add all series from this table to the results 287 series_results.extend(series_map.into_values()); 288 } 289 Ok(series_results) 290 } 291 292 pub fn get_resource_list(&self) -> Result<Vec<AlertVar>> { 293 let mut alert_vars: Vec<AlertVar> = Vec::new(); 294 295 // get all names in net_s and disk_s 296 for c in ["net", "disk"] { 297 let conn = self.conn.lock().unwrap(); 298 let mut stmt = conn.prepare(&format!("SELECT DISTINCT name FROM {}_s", c))?; 299 let rows = stmt.query_map([], |row| row.get(0))?; 300 let resources: Vec<String> = rows.collect::<Result<Vec<String>>>()?; 301 302 let cols: Vec<&(&str, &str)> = ALERT_VARIABLES.iter().filter(|v| v.0 == c).collect(); 303 304 // every resource_col combination 305 for resource in resources.clone() { 306 for &(_, col) in cols.iter() { 307 alert_vars.push(AlertVar { 308 cat: c.to_string(), 309 var: col.to_string(), 310 resrc: resource.clone(), 311 }); 312 } 313 } 314 } 315 316 // system vars 317 let cols: Vec<&(&str, &str)> = ALERT_VARIABLES.iter().filter(|v| v.0 == "sys").collect(); 318 for &(_, col) in cols.iter() { 319 alert_vars.push(AlertVar { 320 cat: "sys".to_string(), 321 var: col.to_string(), 322 resrc: "sys".to_string(), 323 }); 324 } 325 326 Ok(alert_vars) 327 } 328 } 329 330 pub async fn db_update(sys: Arc<Mutex<System>>, db_path: &str) { 331 let db = match Database::new(db_path) { 332 Ok(db) => Arc::new(db), 333 Err(e) => { 334 error!("Failed to initialize database: {}", e); 335 return; 336 } 337 }; 338 let mut last_info: Option<GeneralInfo> = None; 339 let mut last_timestamp: Option<u64> = None; 340 loop { 341 let general_info = { 342 let sys = sys.lock().unwrap(); 343 collect_general_info(&sys) 344 }; 345 { 346 let timestamp = SystemTime::now() 347 .duration_since(UNIX_EPOCH) 348 .unwrap() 349 .as_secs(); 350 351 let conn = db.conn.lock().unwrap(); 352 conn.execute( 353 "INSERT INTO general_s ( 354 timestamp, cpu_usage, mem_usage, swap_usage, load_avg_1, load_avg_5, load_avg_15 355 ) VALUES (?, ROUND(?,2), ROUND(?,2), ROUND(?,2), ROUND(?,2), ROUND(?,2), ROUND(?,2))", 356 params![ 357 timestamp, 358 general_info.cpu.avg_usage, 359 100.0 * general_info.mem.used_mem as f32 / general_info.mem.total_mem as f32, 360 100.0 * general_info.mem.used_swap as f32 / general_info.mem.total_swap as f32, 361 general_info.sys.load_avg[0], 362 general_info.sys.load_avg[1], 363 general_info.sys.load_avg[2] 364 ], 365 ) 366 .unwrap(); 367 368 for interface in general_info.net.interfaces.iter() { 369 let mut rx_rate = 0.0; 370 let mut tx_rate = 0.0; 371 // check if last info is initialized 372 if last_info.is_some() && last_timestamp.is_some() { 373 // Find the matching interface in last_info 374 let last_interface = last_info 375 .as_ref() 376 .unwrap() 377 .net 378 .interfaces 379 .iter() 380 .find(|last_iface| last_iface.name == interface.name); 381 382 if let Some(last_iface) = last_interface { 383 // Calculate time difference in seconds 384 let elapsed_secs = timestamp as f64 - last_timestamp.unwrap() as f64; 385 386 // Calculate rates 387 rx_rate = if interface.rx > last_iface.rx { 388 (interface.rx - last_iface.rx) as f64 / elapsed_secs 389 } else { 390 0.0 391 }; 392 tx_rate = if interface.tx > last_iface.tx { 393 (interface.tx - last_iface.tx) as f64 / elapsed_secs 394 } else { 395 0.0 396 }; 397 } 398 } 399 conn.execute( 400 "INSERT INTO net_s ( 401 timestamp, name, rx, tx, rx_rate, tx_rate 402 ) VALUES (?, ?, ?, ?, ROUND(?), ROUND(?))", 403 params![ 404 timestamp, 405 interface.name, 406 interface.rx, 407 interface.tx, 408 rx_rate, 409 tx_rate 410 ], 411 ) 412 .unwrap(); 413 } 414 415 for disk in general_info.disk.disks.iter() { 416 let mut read_rate = 0.0; 417 let mut write_rate = 0.0; 418 // check if last info is initialized 419 if last_info.is_some() && last_timestamp.is_some() { 420 // Find the matching disk in last_info 421 let last_disk = last_info 422 .as_ref() 423 .unwrap() 424 .disk 425 .disks 426 .iter() 427 .find(|last_disk| last_disk.mount_point == disk.mount_point); 428 429 if let Some(last_disk) = last_disk { 430 // Calculate time difference in seconds 431 let elapsed_secs = timestamp as f64 - last_timestamp.unwrap() as f64; 432 433 // Calculate rates 434 read_rate = if disk.io[2] > last_disk.io[2] { 435 (disk.io[2] - last_disk.io[2]) as f64 / elapsed_secs 436 } else { 437 0.0 438 }; 439 write_rate = if disk.io[3] > last_disk.io[3] { 440 (disk.io[3] - last_disk.io[3]) as f64 / elapsed_secs 441 } else { 442 0.0 443 }; 444 } 445 } 446 conn.execute( 447 "INSERT INTO disk_s ( 448 timestamp, name, total_read, total_write, read_rate, write_rate, disk_usage 449 ) VALUES (?, ?, ?, ?, ROUND(?), ROUND(?), ROUND(?,2))", 450 params![ 451 timestamp, 452 disk.mount_point, 453 disk.io[2], 454 disk.io[3], 455 read_rate, 456 write_rate, 457 100.0 * (1.0 - disk.free_space as f32 / disk.total_space as f32) 458 ], 459 ) 460 .unwrap(); 461 } 462 463 // if skipped over the minute mark still need to aggregate the last minute's data 464 465 if timestamp % 60 < STORE_INTERVAL { 466 // We have passed (or are on) a minute boundary 467 // Set timestamp to the minute boundary 468 let timestamp = timestamp - (timestamp % 60); 469 // Aggregate last minute's data 470 let _ = conn.execute( 471 "INSERT INTO general_m 472 ( 473 timestamp, 474 cpu_usage, 475 mem_usage, 476 swap_usage, 477 load_avg_1, 478 load_avg_5, 479 load_avg_15 480 ) 481 SELECT 482 ?2, 483 round(AVG(cpu_usage), 2), 484 round(AVG(mem_usage), 2), 485 round(AVG(swap_usage), 2), 486 round(AVG(load_avg_1), 2), 487 round(AVG(load_avg_5), 2), 488 round(AVG(load_avg_15), 2) 489 FROM general_s 490 WHERE timestamp >= ?1 AND timestamp <= ?2;", 491 params![timestamp - 60, timestamp], 492 ); 493 let _ = conn.execute( 494 "INSERT INTO net_m 495 ( 496 timestamp, 497 name, 498 rx, 499 tx, 500 rx_rate, 501 tx_rate 502 ) 503 SELECT 504 ?2, 505 name, 506 MAX(rx), 507 MAX(tx), 508 round(AVG(rx_rate)), 509 round(AVG(tx_rate)) 510 FROM net_s 511 WHERE timestamp >= ?1 AND timestamp <= ?2 512 GROUP BY name;", 513 params![timestamp - 60, timestamp], 514 ); 515 let _ = conn.execute( 516 "INSERT INTO disk_m 517 ( 518 timestamp, 519 name, 520 total_read, 521 total_write, 522 read_rate, 523 write_rate, 524 disk_usage 525 ) 526 SELECT 527 ?2, 528 name, 529 MAX(total_read), 530 MAX(total_write), 531 round(AVG(read_rate)), 532 round(AVG(write_rate)), 533 round(AVG(disk_usage), 2) 534 FROM disk_s 535 WHERE timestamp >= ?1 AND timestamp <= ?2 536 GROUP BY name;", 537 params![timestamp - 60, timestamp], 538 ); 539 540 // Check if it's an hour boundary 541 if (timestamp / 60) % 60 == 0 { 542 // Aggregate minute_metrics for the last hour 543 let _ = conn.execute( 544 "INSERT INTO general_h 545 ( 546 timestamp, 547 cpu_usage, 548 mem_usage, 549 swap_usage, 550 load_avg_1, 551 load_avg_5, 552 load_avg_15 553 ) 554 SELECT 555 ?2, 556 round(AVG(cpu_usage)), 557 round(AVG(mem_usage)), 558 round(AVG(swap_usage)), 559 round(AVG(load_avg_1)), 560 round(AVG(load_avg_5)), 561 round(AVG(load_avg_15)) 562 FROM general_m 563 WHERE timestamp >= ?1 AND timestamp <= ?2;", 564 params![timestamp - 3600, timestamp], 565 ); 566 let _ = conn.execute( 567 "INSERT INTO net_h 568 ( 569 timestamp, 570 name, 571 rx, 572 tx, 573 rx_rate, 574 tx_rate 575 ) 576 SELECT 577 ?2, 578 name, 579 MAX(rx), 580 MAX(tx), 581 round(AVG(rx_rate)), 582 round(AVG(tx_rate)) 583 FROM net_m 584 WHERE timestamp >= ?1 AND timestamp <= ?2 585 GROUP BY name;", 586 params![timestamp - 3600, timestamp], 587 ); 588 let _ = conn.execute( 589 "INSERT INTO disk_h 590 ( 591 timestamp, 592 name, 593 total_read, 594 total_write, 595 read_rate, 596 write_rate, 597 disk_usage 598 ) 599 SELECT 600 ?2, 601 name, 602 MAX(total_read), 603 MAX(total_write), 604 round(AVG(read_rate)), 605 round(AVG(write_rate)), 606 round(AVG(disk_usage), 2) 607 FROM disk_m 608 WHERE timestamp >= ?1 AND timestamp <= ?2 609 GROUP BY name;", 610 params![timestamp - 3600, timestamp], 611 ); 612 // Check if it's a day boundary (midnight) 613 if (timestamp / 3600) % 24 == 0 { 614 // Aggregate hour_metrics for the last day 615 let _ = conn.execute( 616 "INSERT INTO general_d 617 ( 618 timestamp, 619 cpu_usage, 620 mem_usage, 621 swap_usage, 622 load_avg_1, 623 load_avg_5, 624 load_avg_15 625 ) 626 SELECT 627 ?2, 628 round(AVG(cpu_usage)), 629 round(AVG(mem_usage)), 630 round(AVG(swap_usage)), 631 round(AVG(load_avg_1)), 632 round(AVG(load_avg_5)), 633 round(AVG(load_avg_15)) 634 FROM general_h 635 WHERE timestamp >= ?1 AND timestamp <= ?2;", 636 params![timestamp - 86400, timestamp], 637 ); 638 let _ = conn.execute( 639 "INSERT INTO net_d 640 ( 641 timestamp, 642 name, 643 rx, 644 tx, 645 rx_rate, 646 tx_rate 647 ) 648 SELECT 649 ?2, 650 name, 651 MAX(rx), 652 MAX(tx), 653 round(AVG(rx_rate)), 654 round(AVG(tx_rate)) 655 FROM net_h 656 WHERE timestamp >= ?1 AND timestamp <= ?2 657 GROUP BY name;", 658 params![timestamp - 86400, timestamp], 659 ); 660 let _ = conn.execute( 661 "INSERT INTO disk_d 662 ( 663 timestamp, 664 name, 665 total_read, 666 total_write, 667 read_rate, 668 write_rate, 669 disk_usage 670 ) 671 SELECT 672 ?2, 673 name, 674 MAX(total_read), 675 MAX(total_write), 676 round(AVG(read_rate)), 677 round(AVG(write_rate)), 678 round(AVG(disk_usage), 2) 679 FROM disk_h 680 WHERE timestamp >= ?1 AND timestamp <= ?2 681 GROUP BY name;", 682 params![timestamp - 86400, timestamp], 683 ); 684 685 // Clean up older hour metrics (keep 365 days) 686 let cutoff = timestamp - (86400 * 365); 687 for table_name in ["general_h", "net_h", "disk_h"] { 688 conn.execute( 689 format!("DELETE FROM {} WHERE timestamp < ?", table_name).as_str(), 690 params![cutoff], 691 ) 692 .unwrap(); 693 } 694 // Run VACCUM 695 conn.execute("VACUUM", []).unwrap(); 696 conn.execute("pragma optimize", []).unwrap(); 697 } 698 // Clean up older second data (keep 1 hours) 699 let cutoff = timestamp - 3600; 700 for table_name in ["general_s", "net_s", "disk_s"] { 701 conn.execute( 702 format!("DELETE FROM {} WHERE timestamp < ?", table_name).as_str(), 703 params![cutoff], 704 ) 705 .unwrap(); 706 } 707 // Clean up older minute metrics (keep 96 hours) 708 let cutoff = timestamp - (86400 * 4); 709 for table_name in ["general_m", "net_m", "disk_m"] { 710 conn.execute( 711 format!("DELETE FROM {} WHERE timestamp < ?", table_name).as_str(), 712 params![cutoff], 713 ) 714 .unwrap(); 715 } 716 } 717 } 718 last_info = Some(general_info.clone()); 719 last_timestamp = Some(timestamp); 720 } 721 722 tokio::time::sleep(Duration::from_secs(STORE_INTERVAL)).await; 723 } 724 }