/ src / endpoints.rs
endpoints.rs
  1  use crate::collect_info;
  2  use crate::config::Config;
  3  use crate::db::Database;
  4  use crate::models::{self, NotificationMethod};
  5  use axum::Json;
  6  use axum::body::Body;
  7  use axum::extract::rejection::JsonRejection;
  8  use axum::extract::ws::{Message, WebSocket};
  9  use axum::extract::{Path, Query, Request};
 10  use axum::http::StatusCode;
 11  use axum::{
 12      extract::{ConnectInfo, State, WebSocketUpgrade},
 13      http::HeaderMap,
 14      response::{Html, IntoResponse},
 15  };
 16  use bollard::container::LogsOptions;
 17  use futures::StreamExt;
 18  use log::{debug, error, info, warn};
 19  use models::HistoricalQueryOptions;
 20  use rust_embed::Embed;
 21  use serde_json;
 22  use std::net::SocketAddr;
 23  use std::sync::{Arc, Mutex};
 24  use sysinfo::System;
 25  use tokio::{
 26      self,
 27      time::{Duration, interval},
 28  };
 29  
 30  #[derive(Embed)]
 31  #[folder = "web/build/static"]
 32  struct Asset;
 33  
 34  pub async fn serve_static(request: Request<Body>) -> impl IntoResponse {
 35      let mut path = request.uri().path();
 36      let cache_control: &str;
 37      if path.ends_with("favicon.png") {
 38          path = "favicon.png";
 39          cache_control = "private, max-age=7200";
 40      } else if path.ends_with("Inter-Regular.woff") {
 41          path= "Inter-Regular.woff"; 
 42          cache_control = "public, max-age=15552000, immutable";
 43      } else if path.ends_with("Inter-Regular.woff2") {
 44          path = "Inter-Regular.woff2";
 45          cache_control = "public, max-age=15552000, immutable";
 46      } else if path.ends_with("RobotoMono-Regular.woff") {
 47          path = "RobotoMono-Regular.woff";
 48          cache_control = "public, max-age=15552000, immutable";
 49      } else if path.ends_with("RobotoMono-Regular.woff2"){
 50          path = "RobotoMono-Regular.woff2";
 51          cache_control = "public, max-age=15552000, immutable";
 52      } else if path.ends_with("auth") {
 53          path = "auth.html";
 54          cache_control = "private, max-age=3600";
 55      } else {
 56          path = "index.html";
 57          cache_control = "private, max-age=3600";
 58      }
 59  
 60      match Asset::get(&path) {
 61          Some(content) => {
 62              let etag = hex::encode(content.metadata.sha256_hash());
 63              
 64              // Check If-None-Match header
 65              if let Some(if_none_match) = request.headers().get("If-None-Match") {
 66              if let Ok(if_none_match_str) = if_none_match.to_str() {
 67                  if if_none_match_str == etag {
 68                  return StatusCode::NOT_MODIFIED.into_response();
 69                  }
 70              }
 71              }
 72              
 73              (
 74              [
 75                  (
 76                  "Content-Type",
 77                  mime_guess::from_path(path)
 78                      .first_or_octet_stream()
 79                      .essence_str(),
 80                  ),
 81                  ("ETag", etag.as_str()),
 82                  ("Cache-Control", cache_control),
 83              ],
 84              content.data.into_response(),
 85              )
 86              .into_response()
 87          },
 88          None => StatusCode::NOT_FOUND.into_response(),
 89      }
 90  }
 91  
 92  pub async fn req_info(ConnectInfo(addr): ConnectInfo<SocketAddr>, headers: HeaderMap) -> String {
 93      let headers_str = headers
 94          .iter()
 95          .map(|(k, v)| format!("{}: {}", k, v.to_str().unwrap()))
 96          .collect::<Vec<String>>()
 97          .join("\n");
 98  
 99      info!("Request info from IP: {}", addr);
100      debug!("Headers: {}", headers_str);
101  
102      format!("IP: {}\n\nHeaders:\n{}", addr, headers_str)
103  }
104  
105  // docker
106  pub async fn ws_handler_d(
107      ws: WebSocketUpgrade,
108      State((_, config)): State<(Arc<Mutex<System>>, Config)>,
109  ) -> impl IntoResponse {
110      debug!("Docker websocket connection requested");
111      ws.on_upgrade(move |socket| handle_socket_d(socket, config.update_interval))
112  }
113  
114  async fn handle_socket_d(mut socket: WebSocket, ws_interval: u64) {
115      debug!("Docker websocket connection established");
116      let mut interval = interval(Duration::from_secs(ws_interval));
117  
118      let mut docker_accessible = true;
119      loop {
120          let json_string = match collect_info::get_docker_containers().await {
121              Some(info) => serde_json::to_string(&info).unwrap(),
122              None => {
123                  warn!("Can't get docker containers info");
124                  docker_accessible = false;
125                  String::from("null")
126              }
127          };
128          if socket
129              .send(Message::Binary({
130                  let mut encoder =
131                      flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
132                  std::io::Write::write_all(&mut encoder, json_string.as_bytes()).unwrap();
133                  encoder.finish().unwrap().into()
134              }))
135              .await
136              .is_err()
137          {
138              debug!("Docker websocket connection closed");
139              break;
140          }
141          if !docker_accessible {
142              break;
143          }
144          interval.tick().await;
145      }
146  }
147  
148  // processes
149  pub async fn ws_handler_p(
150      ws: WebSocketUpgrade,
151      State((sys, config)): State<(Arc<Mutex<System>>, Config)>,
152  ) -> impl IntoResponse {
153      debug!("Processes websocket connection requested");
154      ws.on_upgrade(move |socket| handle_socket_p(socket, sys, config.update_interval))
155  }
156  
157  async fn handle_socket_p(mut socket: WebSocket, sys: Arc<Mutex<System>>, ws_interval: u64) {
158      debug!("Processes websocket connection established");
159      let mut interval = interval(Duration::from_secs(ws_interval));
160      loop {
161          let processes_info = collect_info::collect_processes_info(&sys.lock().unwrap());
162          if socket
163              .send(Message::Binary({
164                  let json_string = serde_json::to_string(&processes_info).unwrap();
165                  let mut encoder =
166                      flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
167                  std::io::Write::write_all(&mut encoder, json_string.as_bytes()).unwrap();
168                  encoder.finish().unwrap().into()
169              }))
170              .await
171              .is_err()
172          {
173              debug!("Processes websocket connection closed");
174              break;
175          }
176          interval.tick().await;
177      }
178  }
179  
180  // general info
181  pub async fn ws_handler_g(
182      ws: WebSocketUpgrade,
183      State((sys, config)): State<(Arc<Mutex<System>>, Config)>,
184  ) -> impl IntoResponse {
185      debug!("General system info websocket connection requested");
186      ws.on_upgrade(move |socket| handle_socket_g(socket, sys, config.update_interval))
187  }
188  
189  async fn handle_socket_g(mut socket: WebSocket, sys: Arc<Mutex<System>>, ws_interval: u64) {
190      debug!("General system info websocket connection established");
191      let mut interval = interval(Duration::from_secs(ws_interval));
192      loop {
193          let general_info = collect_info::collect_general_info(&sys.lock().unwrap());
194          if socket
195              .send(Message::Binary({
196                  let json_string = serde_json::to_string(&general_info).unwrap();
197                  let mut encoder =
198                      flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
199                  std::io::Write::write_all(&mut encoder, json_string.as_bytes()).unwrap();
200                  encoder.finish().unwrap().into()
201              }))
202              .await
203              .is_err()
204          {
205              debug!("General system info websocket connection closed");
206              break;
207          }
208          interval.tick().await;
209      }
210  }
211  
212  pub async fn get_container_logs(Path(container_id): Path<String>) -> impl IntoResponse {
213      debug!("Getting logs for container: {}", container_id);
214      let docker = bollard::Docker::connect_with_local_defaults().unwrap();
215      let options = Some(LogsOptions::<String> {
216          stdout: true,
217          stderr: true,
218          timestamps: true,
219          ..Default::default()
220      });
221  
222      let mut logs = String::new();
223      // Fetch and print logs
224      let mut logs_stream = docker.logs(&container_id, options);
225      while let Some(log_result) = logs_stream.next().await {
226          match log_result {
227              Ok(log_output) => match log_output {
228                  bollard::container::LogOutput::StdOut { message } => {
229                      logs.push_str(format!("STDOUT|{}", String::from_utf8_lossy(&message)).as_str());
230                  }
231                  bollard::container::LogOutput::StdErr { message } => {
232                      logs.push_str(format!("STDERR|{}", String::from_utf8_lossy(&message)).as_str());
233                  }
234                  _ => {}
235              },
236              Err(e) => {
237                  error!("Error getting logs for container {}: {}", container_id, e);
238                  break;
239              }
240          }
241      }
242  
243      Html(logs)
244  }
245  
246  // Historical data endpoint
247  pub async fn historical_data(
248      Query(params): Query<HistoricalQueryOptions>,
249      State((_, config)): State<(Arc<Mutex<System>>, Config)>,
250  ) -> Result<impl IntoResponse, (StatusCode, String)> {
251      debug!("Historical data requested: {:?}", params);
252      // Open database connection
253      let db = match Database::new(&config.db_path) {
254          Ok(db) => db,
255          Err(e) => {
256              error!("Failed to open database: {}", e);
257              return Err((
258                  StatusCode::INTERNAL_SERVER_ERROR,
259                  format!("Failed to open database: {}", e),
260              ));
261          }
262      };
263  
264      // Query historical data
265      match db.query_historical_data(&params) {
266          Ok(data) => {
267              debug!("Historical data query successful: {} records", data.len());
268              Ok(Json(data))
269          }
270          Err(e) => {
271              error!("Failed to query database: {}", e);
272              Err((
273                  StatusCode::INTERNAL_SERVER_ERROR,
274                  format!("Failed to query database: {}", e),
275              ))
276          }
277      }
278  }
279  
280  pub async fn add_notif_method(
281      State((_, config)): State<(Arc<Mutex<System>>, Config)>,
282      body: Result<Json<NotificationMethod>, JsonRejection>,
283  ) -> impl IntoResponse {
284      let mut notification_method = match body {
285          Ok(Json(method)) => method,
286          Err(err) => {
287              error!("Invalid notification method JSON payload: {}", err);
288              return (
289                  StatusCode::BAD_REQUEST,
290                  format!("Invalid JSON payload: {}", err),
291              )
292                  .into_response();
293          }
294      };
295  
296      info!("Adding notification method: {}", notification_method.name);
297      debug!("Notification method details: {:?}", notification_method);
298  
299      let db = match Database::new(&config.db_path) {
300          Ok(db) => db,
301          Err(e) => {
302              error!("Failed to open database: {}", e);
303              return (
304                  StatusCode::INTERNAL_SERVER_ERROR,
305                  format!("Failed to open database: {}", e),
306              )
307                  .into_response();
308          }
309      };
310  
311      let mut methods: Vec<NotificationMethod> =
312          match db.get_kv_str("notification_sources").unwrap_or_default() {
313              Some(methods) => serde_json::from_str(&methods).unwrap(),
314              None => Vec::new(),
315          };
316  
317      if notification_method.id == "-1" {
318          notification_method.id = uuid::Uuid::new_v4().to_string();
319          info!(
320              "Created new notification method with ID: {}",
321              notification_method.id
322          );
323      } else {
324          info!(
325              "Updating notification method with ID: {}",
326              notification_method.id
327          );
328          methods.retain(|method| method.id != notification_method.id);
329      }
330  
331      methods.push(notification_method);
332  
333      db.set_kv_str(
334          "notification_methods",
335          &serde_json::to_string(&methods).unwrap().to_string(),
336      )
337      .unwrap();
338  
339      (StatusCode::CREATED, Json(methods)).into_response()
340  }
341  
342  pub async fn get_notif_methods(
343      State((_, config)): State<(Arc<Mutex<System>>, Config)>,
344  ) -> Result<impl IntoResponse, (StatusCode, String)> {
345      let db = match Database::new(&config.db_path) {
346          Ok(db) => db,
347          Err(e) => {
348              return Err((
349                  StatusCode::INTERNAL_SERVER_ERROR,
350                  format!("Failed to open database: {}", e),
351              ));
352          }
353      };
354  
355      let methods: Vec<NotificationMethod> =
356          match db.get_kv_str("notification_methods").unwrap_or_default() {
357              Some(methods) => serde_json::from_str(&methods).unwrap(),
358              None => Vec::new(),
359          };
360  
361      Ok(Json(methods))
362  }
363  
364  pub async fn delete_notif_method(
365      State((_, config)): State<(Arc<Mutex<System>>, Config)>,
366      Path(id): Path<String>,
367  ) -> Result<impl IntoResponse, (StatusCode, String)> {
368      let db = match Database::new(&config.db_path) {
369          Ok(db) => db,
370          Err(e) => {
371              return Err((
372                  StatusCode::INTERNAL_SERVER_ERROR,
373                  format!("Failed to open database: {}", e),
374              ));
375          }
376      };
377  
378      let mut methods: Vec<NotificationMethod> =
379          match db.get_kv_str("notification_sources").unwrap_or_default() {
380              Some(methods) => serde_json::from_str(&methods).unwrap(),
381              None => Vec::new(),
382          };
383  
384      methods.retain(|source| source.id != id);
385  
386      db.set_kv_str(
387          "notification_methods",
388          &serde_json::to_string(&methods).unwrap().to_string(),
389      )
390      .unwrap();
391  
392      Ok(Json(methods))
393  }
394  
395  pub async fn add_alert(
396      State((_, config)): State<(Arc<Mutex<System>>, Config)>,
397      body: Result<Json<models::Alert>, JsonRejection>,
398  ) -> impl IntoResponse {
399      let mut alert = match body {
400          Ok(Json(alert)) => alert,
401          Err(err) => {
402              error!("Invalid alert JSON payload: {}", err);
403              return (
404                  StatusCode::BAD_REQUEST,
405                  format!("Invalid JSON payload: {}", err),
406              )
407                  .into_response();
408          }
409      };
410      alert.firing = false;
411  
412      info!("Adding alert for {}", alert.var.var);
413      debug!("Alert details: {:?}", alert);
414  
415      let db = match Database::new(&config.db_path) {
416          Ok(db) => db,
417          Err(e) => {
418              error!("Failed to open database: {}", e);
419              return (
420                  StatusCode::INTERNAL_SERVER_ERROR,
421                  format!("Failed to open database: {}", e),
422              )
423                  .into_response();
424          }
425      };
426  
427      let mut alerts: Vec<models::Alert> = match db.get_kv_str("alerts").unwrap_or_default() {
428          Some(alerts) => serde_json::from_str(&alerts).unwrap(),
429          None => Vec::new(),
430      };
431  
432      if alert.id == "-1" {
433          alert.id = uuid::Uuid::new_v4().to_string();
434          info!("Created new alert with ID: {}", alert.id);
435      } else {
436          info!("Updating alert with ID: {}", alert.id);
437          alerts.retain(|a| a.id != alert.id);
438      }
439  
440      alerts.push(alert);
441  
442      db.set_kv_str(
443          "alerts",
444          &serde_json::to_string(&alerts).unwrap().to_string(),
445      )
446      .unwrap();
447  
448      (StatusCode::CREATED, Json(alerts)).into_response()
449  }
450  
451  pub async fn get_alerts(
452      State((_, config)): State<(Arc<Mutex<System>>, Config)>,
453  ) -> Result<impl IntoResponse, (StatusCode, String)> {
454      let db = match Database::new(&config.db_path) {
455          Ok(db) => db,
456          Err(e) => {
457              return Err((
458                  StatusCode::INTERNAL_SERVER_ERROR,
459                  format!("Failed to open database: {}", e),
460              ));
461          }
462      };
463  
464      let alerts: Vec<models::Alert> = match db.get_kv_str("alerts").unwrap_or_default() {
465          Some(alerts) => serde_json::from_str(&alerts).unwrap(),
466          None => Vec::new(),
467      };
468  
469      Ok(Json(alerts))
470  }
471  
472  pub async fn delete_alert(
473      State((_, config)): State<(Arc<Mutex<System>>, Config)>,
474      Path(id): Path<String>,
475  ) -> Result<impl IntoResponse, (StatusCode, String)> {
476      info!("Deleting alert with ID: {}", id);
477  
478      let db = match Database::new(&config.db_path) {
479          Ok(db) => db,
480          Err(e) => {
481              error!("Failed to open database: {}", e);
482              return Err((
483                  StatusCode::INTERNAL_SERVER_ERROR,
484                  format!("Failed to open database: {}", e),
485              ));
486          }
487      };
488  
489      let mut alerts: Vec<models::Alert> = match db.get_kv_str("alerts").unwrap_or_default() {
490          Some(alerts) => serde_json::from_str(&alerts).unwrap(),
491          None => Vec::new(),
492      };
493  
494      alerts.retain(|alert| alert.id != id);
495  
496      db.set_kv_str(
497          "alerts",
498          &serde_json::to_string(&alerts).unwrap().to_string(),
499      )
500      .unwrap();
501  
502      Ok(Json(alerts))
503  }
504  
505  pub async fn get_alert_vars(
506      State((_, config)): State<(Arc<Mutex<System>>, Config)>,
507  ) -> Result<impl IntoResponse, (StatusCode, String)> {
508      let db = match Database::new(&config.db_path) {
509          Ok(db) => db,
510          Err(e) => {
511              return Err((
512                  StatusCode::INTERNAL_SERVER_ERROR,
513                  format!("Failed to open database: {}", e),
514              ));
515          }
516      };
517  
518      let vars: Vec<models::AlertVar> = match db.get_resource_list() {
519          Ok(vars) => vars,
520          Err(e) => {
521              return Err((
522                  StatusCode::INTERNAL_SERVER_ERROR,
523                  format!("Failed to get resource list: {}", e),
524              ));
525          }
526      };
527  
528      Ok(Json(vars))
529  }