endpoints.rs
1 use crate::collect_info; 2 use crate::config::Config; 3 use crate::db::Database; 4 use crate::models::{self, NotificationMethod}; 5 use axum::Json; 6 use axum::body::Body; 7 use axum::extract::rejection::JsonRejection; 8 use axum::extract::ws::{Message, WebSocket}; 9 use axum::extract::{Path, Query, Request}; 10 use axum::http::StatusCode; 11 use axum::{ 12 extract::{ConnectInfo, State, WebSocketUpgrade}, 13 http::HeaderMap, 14 response::{Html, IntoResponse}, 15 }; 16 use bollard::container::LogsOptions; 17 use futures::StreamExt; 18 use log::{debug, error, info, warn}; 19 use models::HistoricalQueryOptions; 20 use rust_embed::Embed; 21 use serde_json; 22 use std::net::SocketAddr; 23 use std::sync::{Arc, Mutex}; 24 use sysinfo::System; 25 use tokio::{ 26 self, 27 time::{Duration, interval}, 28 }; 29 30 #[derive(Embed)] 31 #[folder = "web/build/static"] 32 struct Asset; 33 34 pub async fn serve_static(request: Request<Body>) -> impl IntoResponse { 35 let mut path = request.uri().path(); 36 let cache_control: &str; 37 if path.ends_with("favicon.png") { 38 path = "favicon.png"; 39 cache_control = "private, max-age=7200"; 40 } else if path.ends_with("Inter-Regular.woff") { 41 path= "Inter-Regular.woff"; 42 cache_control = "public, max-age=15552000, immutable"; 43 } else if path.ends_with("Inter-Regular.woff2") { 44 path = "Inter-Regular.woff2"; 45 cache_control = "public, max-age=15552000, immutable"; 46 } else if path.ends_with("RobotoMono-Regular.woff") { 47 path = "RobotoMono-Regular.woff"; 48 cache_control = "public, max-age=15552000, immutable"; 49 } else if path.ends_with("RobotoMono-Regular.woff2"){ 50 path = "RobotoMono-Regular.woff2"; 51 cache_control = "public, max-age=15552000, immutable"; 52 } else if path.ends_with("auth") { 53 path = "auth.html"; 54 cache_control = "private, max-age=3600"; 55 } else { 56 path = "index.html"; 57 cache_control = "private, max-age=3600"; 58 } 59 60 match Asset::get(&path) { 61 Some(content) => { 62 let etag = hex::encode(content.metadata.sha256_hash()); 63 64 // Check If-None-Match header 65 if let Some(if_none_match) = request.headers().get("If-None-Match") { 66 if let Ok(if_none_match_str) = if_none_match.to_str() { 67 if if_none_match_str == etag { 68 return StatusCode::NOT_MODIFIED.into_response(); 69 } 70 } 71 } 72 73 ( 74 [ 75 ( 76 "Content-Type", 77 mime_guess::from_path(path) 78 .first_or_octet_stream() 79 .essence_str(), 80 ), 81 ("ETag", etag.as_str()), 82 ("Cache-Control", cache_control), 83 ], 84 content.data.into_response(), 85 ) 86 .into_response() 87 }, 88 None => StatusCode::NOT_FOUND.into_response(), 89 } 90 } 91 92 pub async fn req_info(ConnectInfo(addr): ConnectInfo<SocketAddr>, headers: HeaderMap) -> String { 93 let headers_str = headers 94 .iter() 95 .map(|(k, v)| format!("{}: {}", k, v.to_str().unwrap())) 96 .collect::<Vec<String>>() 97 .join("\n"); 98 99 info!("Request info from IP: {}", addr); 100 debug!("Headers: {}", headers_str); 101 102 format!("IP: {}\n\nHeaders:\n{}", addr, headers_str) 103 } 104 105 // docker 106 pub async fn ws_handler_d( 107 ws: WebSocketUpgrade, 108 State((_, config)): State<(Arc<Mutex<System>>, Config)>, 109 ) -> impl IntoResponse { 110 debug!("Docker websocket connection requested"); 111 ws.on_upgrade(move |socket| handle_socket_d(socket, config.update_interval)) 112 } 113 114 async fn handle_socket_d(mut socket: WebSocket, ws_interval: u64) { 115 debug!("Docker websocket connection established"); 116 let mut interval = interval(Duration::from_secs(ws_interval)); 117 118 let mut docker_accessible = true; 119 loop { 120 let json_string = match collect_info::get_docker_containers().await { 121 Some(info) => serde_json::to_string(&info).unwrap(), 122 None => { 123 warn!("Can't get docker containers info"); 124 docker_accessible = false; 125 String::from("null") 126 } 127 }; 128 if socket 129 .send(Message::Binary({ 130 let mut encoder = 131 flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default()); 132 std::io::Write::write_all(&mut encoder, json_string.as_bytes()).unwrap(); 133 encoder.finish().unwrap().into() 134 })) 135 .await 136 .is_err() 137 { 138 debug!("Docker websocket connection closed"); 139 break; 140 } 141 if !docker_accessible { 142 break; 143 } 144 interval.tick().await; 145 } 146 } 147 148 // processes 149 pub async fn ws_handler_p( 150 ws: WebSocketUpgrade, 151 State((sys, config)): State<(Arc<Mutex<System>>, Config)>, 152 ) -> impl IntoResponse { 153 debug!("Processes websocket connection requested"); 154 ws.on_upgrade(move |socket| handle_socket_p(socket, sys, config.update_interval)) 155 } 156 157 async fn handle_socket_p(mut socket: WebSocket, sys: Arc<Mutex<System>>, ws_interval: u64) { 158 debug!("Processes websocket connection established"); 159 let mut interval = interval(Duration::from_secs(ws_interval)); 160 loop { 161 let processes_info = collect_info::collect_processes_info(&sys.lock().unwrap()); 162 if socket 163 .send(Message::Binary({ 164 let json_string = serde_json::to_string(&processes_info).unwrap(); 165 let mut encoder = 166 flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default()); 167 std::io::Write::write_all(&mut encoder, json_string.as_bytes()).unwrap(); 168 encoder.finish().unwrap().into() 169 })) 170 .await 171 .is_err() 172 { 173 debug!("Processes websocket connection closed"); 174 break; 175 } 176 interval.tick().await; 177 } 178 } 179 180 // general info 181 pub async fn ws_handler_g( 182 ws: WebSocketUpgrade, 183 State((sys, config)): State<(Arc<Mutex<System>>, Config)>, 184 ) -> impl IntoResponse { 185 debug!("General system info websocket connection requested"); 186 ws.on_upgrade(move |socket| handle_socket_g(socket, sys, config.update_interval)) 187 } 188 189 async fn handle_socket_g(mut socket: WebSocket, sys: Arc<Mutex<System>>, ws_interval: u64) { 190 debug!("General system info websocket connection established"); 191 let mut interval = interval(Duration::from_secs(ws_interval)); 192 loop { 193 let general_info = collect_info::collect_general_info(&sys.lock().unwrap()); 194 if socket 195 .send(Message::Binary({ 196 let json_string = serde_json::to_string(&general_info).unwrap(); 197 let mut encoder = 198 flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default()); 199 std::io::Write::write_all(&mut encoder, json_string.as_bytes()).unwrap(); 200 encoder.finish().unwrap().into() 201 })) 202 .await 203 .is_err() 204 { 205 debug!("General system info websocket connection closed"); 206 break; 207 } 208 interval.tick().await; 209 } 210 } 211 212 pub async fn get_container_logs(Path(container_id): Path<String>) -> impl IntoResponse { 213 debug!("Getting logs for container: {}", container_id); 214 let docker = bollard::Docker::connect_with_local_defaults().unwrap(); 215 let options = Some(LogsOptions::<String> { 216 stdout: true, 217 stderr: true, 218 timestamps: true, 219 ..Default::default() 220 }); 221 222 let mut logs = String::new(); 223 // Fetch and print logs 224 let mut logs_stream = docker.logs(&container_id, options); 225 while let Some(log_result) = logs_stream.next().await { 226 match log_result { 227 Ok(log_output) => match log_output { 228 bollard::container::LogOutput::StdOut { message } => { 229 logs.push_str(format!("STDOUT|{}", String::from_utf8_lossy(&message)).as_str()); 230 } 231 bollard::container::LogOutput::StdErr { message } => { 232 logs.push_str(format!("STDERR|{}", String::from_utf8_lossy(&message)).as_str()); 233 } 234 _ => {} 235 }, 236 Err(e) => { 237 error!("Error getting logs for container {}: {}", container_id, e); 238 break; 239 } 240 } 241 } 242 243 Html(logs) 244 } 245 246 // Historical data endpoint 247 pub async fn historical_data( 248 Query(params): Query<HistoricalQueryOptions>, 249 State((_, config)): State<(Arc<Mutex<System>>, Config)>, 250 ) -> Result<impl IntoResponse, (StatusCode, String)> { 251 debug!("Historical data requested: {:?}", params); 252 // Open database connection 253 let db = match Database::new(&config.db_path) { 254 Ok(db) => db, 255 Err(e) => { 256 error!("Failed to open database: {}", e); 257 return Err(( 258 StatusCode::INTERNAL_SERVER_ERROR, 259 format!("Failed to open database: {}", e), 260 )); 261 } 262 }; 263 264 // Query historical data 265 match db.query_historical_data(¶ms) { 266 Ok(data) => { 267 debug!("Historical data query successful: {} records", data.len()); 268 Ok(Json(data)) 269 } 270 Err(e) => { 271 error!("Failed to query database: {}", e); 272 Err(( 273 StatusCode::INTERNAL_SERVER_ERROR, 274 format!("Failed to query database: {}", e), 275 )) 276 } 277 } 278 } 279 280 pub async fn add_notif_method( 281 State((_, config)): State<(Arc<Mutex<System>>, Config)>, 282 body: Result<Json<NotificationMethod>, JsonRejection>, 283 ) -> impl IntoResponse { 284 let mut notification_method = match body { 285 Ok(Json(method)) => method, 286 Err(err) => { 287 error!("Invalid notification method JSON payload: {}", err); 288 return ( 289 StatusCode::BAD_REQUEST, 290 format!("Invalid JSON payload: {}", err), 291 ) 292 .into_response(); 293 } 294 }; 295 296 info!("Adding notification method: {}", notification_method.name); 297 debug!("Notification method details: {:?}", notification_method); 298 299 let db = match Database::new(&config.db_path) { 300 Ok(db) => db, 301 Err(e) => { 302 error!("Failed to open database: {}", e); 303 return ( 304 StatusCode::INTERNAL_SERVER_ERROR, 305 format!("Failed to open database: {}", e), 306 ) 307 .into_response(); 308 } 309 }; 310 311 let mut methods: Vec<NotificationMethod> = 312 match db.get_kv_str("notification_sources").unwrap_or_default() { 313 Some(methods) => serde_json::from_str(&methods).unwrap(), 314 None => Vec::new(), 315 }; 316 317 if notification_method.id == "-1" { 318 notification_method.id = uuid::Uuid::new_v4().to_string(); 319 info!( 320 "Created new notification method with ID: {}", 321 notification_method.id 322 ); 323 } else { 324 info!( 325 "Updating notification method with ID: {}", 326 notification_method.id 327 ); 328 methods.retain(|method| method.id != notification_method.id); 329 } 330 331 methods.push(notification_method); 332 333 db.set_kv_str( 334 "notification_methods", 335 &serde_json::to_string(&methods).unwrap().to_string(), 336 ) 337 .unwrap(); 338 339 (StatusCode::CREATED, Json(methods)).into_response() 340 } 341 342 pub async fn get_notif_methods( 343 State((_, config)): State<(Arc<Mutex<System>>, Config)>, 344 ) -> Result<impl IntoResponse, (StatusCode, String)> { 345 let db = match Database::new(&config.db_path) { 346 Ok(db) => db, 347 Err(e) => { 348 return Err(( 349 StatusCode::INTERNAL_SERVER_ERROR, 350 format!("Failed to open database: {}", e), 351 )); 352 } 353 }; 354 355 let methods: Vec<NotificationMethod> = 356 match db.get_kv_str("notification_methods").unwrap_or_default() { 357 Some(methods) => serde_json::from_str(&methods).unwrap(), 358 None => Vec::new(), 359 }; 360 361 Ok(Json(methods)) 362 } 363 364 pub async fn delete_notif_method( 365 State((_, config)): State<(Arc<Mutex<System>>, Config)>, 366 Path(id): Path<String>, 367 ) -> Result<impl IntoResponse, (StatusCode, String)> { 368 let db = match Database::new(&config.db_path) { 369 Ok(db) => db, 370 Err(e) => { 371 return Err(( 372 StatusCode::INTERNAL_SERVER_ERROR, 373 format!("Failed to open database: {}", e), 374 )); 375 } 376 }; 377 378 let mut methods: Vec<NotificationMethod> = 379 match db.get_kv_str("notification_sources").unwrap_or_default() { 380 Some(methods) => serde_json::from_str(&methods).unwrap(), 381 None => Vec::new(), 382 }; 383 384 methods.retain(|source| source.id != id); 385 386 db.set_kv_str( 387 "notification_methods", 388 &serde_json::to_string(&methods).unwrap().to_string(), 389 ) 390 .unwrap(); 391 392 Ok(Json(methods)) 393 } 394 395 pub async fn add_alert( 396 State((_, config)): State<(Arc<Mutex<System>>, Config)>, 397 body: Result<Json<models::Alert>, JsonRejection>, 398 ) -> impl IntoResponse { 399 let mut alert = match body { 400 Ok(Json(alert)) => alert, 401 Err(err) => { 402 error!("Invalid alert JSON payload: {}", err); 403 return ( 404 StatusCode::BAD_REQUEST, 405 format!("Invalid JSON payload: {}", err), 406 ) 407 .into_response(); 408 } 409 }; 410 alert.firing = false; 411 412 info!("Adding alert for {}", alert.var.var); 413 debug!("Alert details: {:?}", alert); 414 415 let db = match Database::new(&config.db_path) { 416 Ok(db) => db, 417 Err(e) => { 418 error!("Failed to open database: {}", e); 419 return ( 420 StatusCode::INTERNAL_SERVER_ERROR, 421 format!("Failed to open database: {}", e), 422 ) 423 .into_response(); 424 } 425 }; 426 427 let mut alerts: Vec<models::Alert> = match db.get_kv_str("alerts").unwrap_or_default() { 428 Some(alerts) => serde_json::from_str(&alerts).unwrap(), 429 None => Vec::new(), 430 }; 431 432 if alert.id == "-1" { 433 alert.id = uuid::Uuid::new_v4().to_string(); 434 info!("Created new alert with ID: {}", alert.id); 435 } else { 436 info!("Updating alert with ID: {}", alert.id); 437 alerts.retain(|a| a.id != alert.id); 438 } 439 440 alerts.push(alert); 441 442 db.set_kv_str( 443 "alerts", 444 &serde_json::to_string(&alerts).unwrap().to_string(), 445 ) 446 .unwrap(); 447 448 (StatusCode::CREATED, Json(alerts)).into_response() 449 } 450 451 pub async fn get_alerts( 452 State((_, config)): State<(Arc<Mutex<System>>, Config)>, 453 ) -> Result<impl IntoResponse, (StatusCode, String)> { 454 let db = match Database::new(&config.db_path) { 455 Ok(db) => db, 456 Err(e) => { 457 return Err(( 458 StatusCode::INTERNAL_SERVER_ERROR, 459 format!("Failed to open database: {}", e), 460 )); 461 } 462 }; 463 464 let alerts: Vec<models::Alert> = match db.get_kv_str("alerts").unwrap_or_default() { 465 Some(alerts) => serde_json::from_str(&alerts).unwrap(), 466 None => Vec::new(), 467 }; 468 469 Ok(Json(alerts)) 470 } 471 472 pub async fn delete_alert( 473 State((_, config)): State<(Arc<Mutex<System>>, Config)>, 474 Path(id): Path<String>, 475 ) -> Result<impl IntoResponse, (StatusCode, String)> { 476 info!("Deleting alert with ID: {}", id); 477 478 let db = match Database::new(&config.db_path) { 479 Ok(db) => db, 480 Err(e) => { 481 error!("Failed to open database: {}", e); 482 return Err(( 483 StatusCode::INTERNAL_SERVER_ERROR, 484 format!("Failed to open database: {}", e), 485 )); 486 } 487 }; 488 489 let mut alerts: Vec<models::Alert> = match db.get_kv_str("alerts").unwrap_or_default() { 490 Some(alerts) => serde_json::from_str(&alerts).unwrap(), 491 None => Vec::new(), 492 }; 493 494 alerts.retain(|alert| alert.id != id); 495 496 db.set_kv_str( 497 "alerts", 498 &serde_json::to_string(&alerts).unwrap().to_string(), 499 ) 500 .unwrap(); 501 502 Ok(Json(alerts)) 503 } 504 505 pub async fn get_alert_vars( 506 State((_, config)): State<(Arc<Mutex<System>>, Config)>, 507 ) -> Result<impl IntoResponse, (StatusCode, String)> { 508 let db = match Database::new(&config.db_path) { 509 Ok(db) => db, 510 Err(e) => { 511 return Err(( 512 StatusCode::INTERNAL_SERVER_ERROR, 513 format!("Failed to open database: {}", e), 514 )); 515 } 516 }; 517 518 let vars: Vec<models::AlertVar> = match db.get_resource_list() { 519 Ok(vars) => vars, 520 Err(e) => { 521 return Err(( 522 StatusCode::INTERNAL_SERVER_ERROR, 523 format!("Failed to get resource list: {}", e), 524 )); 525 } 526 }; 527 528 Ok(Json(vars)) 529 }