/ src / collect_info.rs
collect_info.rs
  1  use crate::models::*;
  2  use bollard::{
  3      Docker,
  4      container::{ListContainersOptions, StatsOptions},
  5  };
  6  use futures::StreamExt;
  7  use log::{debug, trace, warn};
  8  use sysinfo::{Disks, Networks, System};
  9  
 10  pub fn collect_general_info(sys: &System) -> GeneralInfo {
 11      debug!("Collecting general system information");
 12      // CPU info
 13      let cores_usage: Vec<f32> = sys.cpus().iter().map(|cpu| cpu.cpu_usage()).collect();
 14      let average_usage = cores_usage.iter().sum::<f32>() / cores_usage.len() as f32;
 15      let cpu_info = CpuInfo {
 16          count: sys.cpus().len(),
 17          avg_usage: average_usage,
 18          usage: cores_usage,
 19      };
 20      trace!(
 21          "CPU info collected: {} cores, {:.2}% avg usage",
 22          cpu_info.count, cpu_info.avg_usage
 23      );
 24  
 25      // Memory info
 26      let memory_info = MemoryInfo {
 27          total_mem: sys.total_memory(),
 28          used_mem: sys.used_memory(),
 29          total_swap: sys.total_swap(),
 30          used_swap: sys.used_swap(),
 31      };
 32  
 33      // System load info
 34      let system_info = SystemInfo {
 35          name: System::name().unwrap_or_default(),
 36          kernel_ver: System::kernel_version().unwrap_or_default(),
 37          os_ver: System::os_version().unwrap_or_default(),
 38          os_name: System::long_os_version().unwrap_or_default(),
 39          host_name: System::host_name().unwrap_or_default(),
 40          load_avg: vec![
 41              System::load_average().one,
 42              System::load_average().five,
 43              System::load_average().fifteen,
 44          ],
 45          uptime: System::uptime(),
 46      };
 47  
 48      // Network info
 49      let networks = Networks::new_with_refreshed_list();
 50      let interfaces = networks
 51          .iter()
 52          .map(|(name, data)| NetworkInterface {
 53              name: name.to_string(),
 54              rx: data.total_received(),
 55              tx: data.total_transmitted(),
 56          })
 57          .collect();
 58      let network_info = NetworkInfo { interfaces };
 59  
 60      // Disk info
 61      let disks = Disks::new_with_refreshed_list();
 62  
 63      let disk_info = DisksInfo {
 64          disks: disks
 65              .iter()
 66              .filter(|disk| {
 67                  let fs = disk.file_system().to_str().unwrap_or_default();
 68                  let mount_point = disk.mount_point().to_str().unwrap_or_default();
 69                  // Skip non-filesystems and system partitions
 70                  if fs.is_empty()
 71                      || mount_point.starts_with("/sys")
 72                      || mount_point.starts_with("/proc")
 73                      || mount_point.starts_with("/etc")
 74                      || mount_point.starts_with("/app")
 75                  {
 76                      return false;
 77                  }
 78                  // Common filesystem types
 79                  matches!(
 80                      fs.to_lowercase().as_str(),
 81                      "ext2"
 82                          | "ext3"
 83                          | "ext4"
 84                          | "btrfs"
 85                          | "xfs"
 86                          | "zfs"
 87                          | "ntfs"
 88                          | "fat"
 89                          | "fat32"
 90                          | "exfat"
 91                          | "hfs"
 92                          | "hfs+"
 93                          | "apfs"
 94                          | "jfs"
 95                          | "reiserfs"
 96                          | "ufs"
 97                          | "f2fs"
 98                          | "nilfs2"
 99                          | "hpfs"
100                          | "minix"
101                          | "qnx4"
102                          | "ocfs2"
103                          | "udf"
104                          | "vfat"
105                          | "msdos"
106                  )
107              })
108              .map(|disk| DiskInfo {
109                  fs: disk.file_system().to_str().unwrap_or_default().to_string(),
110                  kind: disk.kind().to_string(),
111                  total_space: disk.total_space(),
112                  free_space: disk.available_space(),
113                  mount_point: disk.mount_point().to_str().unwrap_or_default().to_string(),
114                  removable: disk.is_removable(),
115                  io: [
116                      disk.usage().read_bytes,
117                      disk.usage().written_bytes,
118                      disk.usage().total_read_bytes,
119                      disk.usage().total_written_bytes,
120                  ],
121              })
122              .collect(),
123      };
124  
125      debug!("General system information collection completed");
126      GeneralInfo {
127          t: chrono::Utc::now().timestamp(),
128          sys: system_info,
129          mem: memory_info,
130          cpu: cpu_info,
131          net: network_info,
132          disk: disk_info,
133      }
134  }
135  
136  pub fn collect_processes_info(sys: &System) -> ProcessesInfo {
137      debug!("Collecting processes information");
138      let processes = sys
139          .processes()
140          .values()
141          .map(|process| ProcessInfo {
142              pid: process.pid().as_u32(),
143              name: process.name().to_str().unwrap_or_default().to_string(),
144              runtime: process.run_time(),
145              cpu: process.cpu_usage(),
146              mem: process.memory(),
147              stat: process.status().to_string(),
148              cmd: process
149                  .cmd()
150                  .iter()
151                  .map(|x| x.to_str().unwrap_or_default())
152                  .collect::<Vec<&str>>()
153                  .join(" "),
154              env: process
155                  .environ()
156                  .iter()
157                  .map(|x| x.to_str().unwrap_or_default())
158                  .collect::<Vec<&str>>()
159                  .join("; "),
160          })
161          .collect();
162  
163      debug!(
164          "Collected information for {} processes",
165          sys.processes().len()
166      );
167      ProcessesInfo {
168          t: chrono::Utc::now().timestamp(),
169          processes,
170      }
171  }
172  
173  pub async fn get_docker_containers() -> Option<DockerInfo> {
174      debug!("Attempting to connect to Docker daemon");
175      let docker = match Docker::connect_with_local_defaults() {
176          Ok(docker) => {
177              debug!("Successfully connected to Docker daemon");
178              docker
179          }
180          Err(e) => {
181              warn!("Failed to connect to Docker daemon: {}", e);
182              return None;
183          }
184      };
185  
186      // List all containers
187      debug!("Listing Docker containers");
188      let containers = match docker
189          .list_containers(Some(ListContainersOptions::<String> {
190              all: true,
191              ..Default::default()
192          }))
193          .await
194      {
195          Ok(containers) => {
196              debug!("Found {} Docker containers", containers.len());
197              containers
198          }
199          Err(e) => {
200              warn!("Failed to list Docker containers: {}", e);
201              return None;
202          }
203      };
204  
205      let mut result = Vec::new();
206  
207      // Pre-collect all stats futures
208      debug!("Gathering stats for {} containers", containers.len());
209      let stats_futures = containers
210          .iter()
211          .map(|container| {
212              let container_id = container.id.clone().unwrap_or_default();
213              trace!("Requesting stats for container {}", container_id);
214              let mut stats_stream = docker.stats(
215                  &container_id,
216                  Some(StatsOptions {
217                      stream: false,
218                      ..Default::default()
219                  }),
220              );
221              async move { (container.clone(), stats_stream.next().await) }
222          })
223          .collect::<Vec<_>>();
224  
225      // Resolve all futures in parallel
226      let results = futures::future::join_all(stats_futures).await;
227  
228      for (container, stats_result) in results {
229          let container_id = container.id.clone().unwrap_or_default();
230  
231          // Get container stats
232          let stats = match stats_result {
233              Some(Ok(stats)) => stats,
234              Some(Err(e)) => {
235                  warn!("Failed to get stats for container {}: {}", container_id, e);
236                  continue;
237              }
238              None => {
239                  warn!("No stats available for container {}", container_id);
240                  continue;
241              }
242          };
243  
244          trace!("Processing stats for container {}", container_id);
245  
246          // Calculate CPU usage
247          // https://github.com/moby/moby/blob/eb131c5383db8cac633919f82abad86c99bffbe5/cli/command/container/stats_helpers.go#L175-L188
248          let cpu_delta = stats.cpu_stats.cpu_usage.total_usage as f64
249              - stats.precpu_stats.cpu_usage.total_usage as f64;
250          let system_delta = stats.cpu_stats.system_cpu_usage.unwrap_or(0) as f64
251              - stats.precpu_stats.system_cpu_usage.unwrap_or(0) as f64;
252          let cpu_usage = 100.0 * if system_delta > 0.0 && cpu_delta > 0.0 {
253              (cpu_delta / system_delta)
254                  * (stats.cpu_stats.online_cpus.unwrap_or(1) as f64)
255          } else {
256              0.0
257          };
258  
259          // Parse ports
260          let ports = container
261              .ports
262              .unwrap_or_default()
263              .iter()
264              .map(|port| DockerPort {
265                  ip: port.ip.clone(),
266                  priv_port: port.private_port,
267                  pub_port: port.public_port,
268                  protocol: port.typ.clone().unwrap().to_string(),
269              })
270              .collect();
271  
272          // Create container info
273          result.push(DockerContainer {
274              id: container_id.clone(),
275              name: container.names.unwrap_or_default().join(", "),
276              image: container.image.unwrap_or_default(),
277              status: container.status.unwrap_or_default(),
278              state: container.state.unwrap_or_default(),
279              created: container.created.unwrap_or(0),
280              ports,
281              cpu_usage,
282              mem_usage: stats.memory_stats.usage.unwrap_or(0),
283              mem_limit: stats.memory_stats.limit.unwrap_or(0),
284              net_io: match stats.network {
285                  Some(network) => [network.rx_bytes, network.tx_bytes],
286                  None => {
287                      trace!("No network stats for container {}", container_id);
288                      [0, 0]
289                  }
290              },
291              disk_io: [
292                  stats.storage_stats.read_size_bytes.unwrap_or(0),
293                  stats.storage_stats.write_size_bytes.unwrap_or(0),
294              ],
295          });
296      }
297  
298      debug!(
299          "Successfully collected data for {} Docker containers",
300          result.len()
301      );
302      Some(DockerInfo {
303          t: chrono::Utc::now().timestamp(),
304          containers: result,
305      })
306  }
307  
308  #[cfg(test)]
309  mod tests {
310      use super::*;
311  
312      #[tokio::test]
313      async fn gather_data() {
314          use std::time::Instant;
315          let mut now = Instant::now();
316          let mut sys = System::new_all();
317          sys.refresh_all();
318          std::thread::sleep(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL);
319          sys.refresh_all();
320          println!("Elapsed: {:.2?}", now.elapsed());
321  
322          now = Instant::now();
323          println!("{}", serde_json::json!(collect_general_info(&sys)));
324          println!("Elapsed: {:.2?}", now.elapsed());
325  
326          now = Instant::now();
327          // collect_processes_info(&sys);
328          println!("{}", serde_json::json!(collect_processes_info(&sys)));
329          println!("Elapsed: {:.2?}", now.elapsed());
330  
331          now = Instant::now();
332          println!("{}", serde_json::json!(get_docker_containers().await));
333          println!("Elapsed: {:.2?}", now.elapsed());
334      }
335  }