collect_info.rs
1 use crate::models::*; 2 use bollard::{ 3 Docker, 4 container::{ListContainersOptions, StatsOptions}, 5 }; 6 use futures::StreamExt; 7 use log::{debug, trace, warn}; 8 use sysinfo::{Disks, Networks, System}; 9 10 pub fn collect_general_info(sys: &System) -> GeneralInfo { 11 debug!("Collecting general system information"); 12 // CPU info 13 let cores_usage: Vec<f32> = sys.cpus().iter().map(|cpu| cpu.cpu_usage()).collect(); 14 let average_usage = cores_usage.iter().sum::<f32>() / cores_usage.len() as f32; 15 let cpu_info = CpuInfo { 16 count: sys.cpus().len(), 17 avg_usage: average_usage, 18 usage: cores_usage, 19 }; 20 trace!( 21 "CPU info collected: {} cores, {:.2}% avg usage", 22 cpu_info.count, cpu_info.avg_usage 23 ); 24 25 // Memory info 26 let memory_info = MemoryInfo { 27 total_mem: sys.total_memory(), 28 used_mem: sys.used_memory(), 29 total_swap: sys.total_swap(), 30 used_swap: sys.used_swap(), 31 }; 32 33 // System load info 34 let system_info = SystemInfo { 35 name: System::name().unwrap_or_default(), 36 kernel_ver: System::kernel_version().unwrap_or_default(), 37 os_ver: System::os_version().unwrap_or_default(), 38 os_name: System::long_os_version().unwrap_or_default(), 39 host_name: System::host_name().unwrap_or_default(), 40 load_avg: vec![ 41 System::load_average().one, 42 System::load_average().five, 43 System::load_average().fifteen, 44 ], 45 uptime: System::uptime(), 46 }; 47 48 // Network info 49 let networks = Networks::new_with_refreshed_list(); 50 let interfaces = networks 51 .iter() 52 .map(|(name, data)| NetworkInterface { 53 name: name.to_string(), 54 rx: data.total_received(), 55 tx: data.total_transmitted(), 56 }) 57 .collect(); 58 let network_info = NetworkInfo { interfaces }; 59 60 // Disk info 61 let disks = Disks::new_with_refreshed_list(); 62 63 let disk_info = DisksInfo { 64 disks: disks 65 .iter() 66 .filter(|disk| { 67 let fs = disk.file_system().to_str().unwrap_or_default(); 68 let mount_point = disk.mount_point().to_str().unwrap_or_default(); 69 // Skip non-filesystems and system partitions 70 if fs.is_empty() 71 || mount_point.starts_with("/sys") 72 || mount_point.starts_with("/proc") 73 || mount_point.starts_with("/etc") 74 || mount_point.starts_with("/app") 75 { 76 return false; 77 } 78 // Common filesystem types 79 matches!( 80 fs.to_lowercase().as_str(), 81 "ext2" 82 | "ext3" 83 | "ext4" 84 | "btrfs" 85 | "xfs" 86 | "zfs" 87 | "ntfs" 88 | "fat" 89 | "fat32" 90 | "exfat" 91 | "hfs" 92 | "hfs+" 93 | "apfs" 94 | "jfs" 95 | "reiserfs" 96 | "ufs" 97 | "f2fs" 98 | "nilfs2" 99 | "hpfs" 100 | "minix" 101 | "qnx4" 102 | "ocfs2" 103 | "udf" 104 | "vfat" 105 | "msdos" 106 ) 107 }) 108 .map(|disk| DiskInfo { 109 fs: disk.file_system().to_str().unwrap_or_default().to_string(), 110 kind: disk.kind().to_string(), 111 total_space: disk.total_space(), 112 free_space: disk.available_space(), 113 mount_point: disk.mount_point().to_str().unwrap_or_default().to_string(), 114 removable: disk.is_removable(), 115 io: [ 116 disk.usage().read_bytes, 117 disk.usage().written_bytes, 118 disk.usage().total_read_bytes, 119 disk.usage().total_written_bytes, 120 ], 121 }) 122 .collect(), 123 }; 124 125 debug!("General system information collection completed"); 126 GeneralInfo { 127 t: chrono::Utc::now().timestamp(), 128 sys: system_info, 129 mem: memory_info, 130 cpu: cpu_info, 131 net: network_info, 132 disk: disk_info, 133 } 134 } 135 136 pub fn collect_processes_info(sys: &System) -> ProcessesInfo { 137 debug!("Collecting processes information"); 138 let processes = sys 139 .processes() 140 .values() 141 .map(|process| ProcessInfo { 142 pid: process.pid().as_u32(), 143 name: process.name().to_str().unwrap_or_default().to_string(), 144 runtime: process.run_time(), 145 cpu: process.cpu_usage(), 146 mem: process.memory(), 147 stat: process.status().to_string(), 148 cmd: process 149 .cmd() 150 .iter() 151 .map(|x| x.to_str().unwrap_or_default()) 152 .collect::<Vec<&str>>() 153 .join(" "), 154 env: process 155 .environ() 156 .iter() 157 .map(|x| x.to_str().unwrap_or_default()) 158 .collect::<Vec<&str>>() 159 .join("; "), 160 }) 161 .collect(); 162 163 debug!( 164 "Collected information for {} processes", 165 sys.processes().len() 166 ); 167 ProcessesInfo { 168 t: chrono::Utc::now().timestamp(), 169 processes, 170 } 171 } 172 173 pub async fn get_docker_containers() -> Option<DockerInfo> { 174 debug!("Attempting to connect to Docker daemon"); 175 let docker = match Docker::connect_with_local_defaults() { 176 Ok(docker) => { 177 debug!("Successfully connected to Docker daemon"); 178 docker 179 } 180 Err(e) => { 181 warn!("Failed to connect to Docker daemon: {}", e); 182 return None; 183 } 184 }; 185 186 // List all containers 187 debug!("Listing Docker containers"); 188 let containers = match docker 189 .list_containers(Some(ListContainersOptions::<String> { 190 all: true, 191 ..Default::default() 192 })) 193 .await 194 { 195 Ok(containers) => { 196 debug!("Found {} Docker containers", containers.len()); 197 containers 198 } 199 Err(e) => { 200 warn!("Failed to list Docker containers: {}", e); 201 return None; 202 } 203 }; 204 205 let mut result = Vec::new(); 206 207 // Pre-collect all stats futures 208 debug!("Gathering stats for {} containers", containers.len()); 209 let stats_futures = containers 210 .iter() 211 .map(|container| { 212 let container_id = container.id.clone().unwrap_or_default(); 213 trace!("Requesting stats for container {}", container_id); 214 let mut stats_stream = docker.stats( 215 &container_id, 216 Some(StatsOptions { 217 stream: false, 218 ..Default::default() 219 }), 220 ); 221 async move { (container.clone(), stats_stream.next().await) } 222 }) 223 .collect::<Vec<_>>(); 224 225 // Resolve all futures in parallel 226 let results = futures::future::join_all(stats_futures).await; 227 228 for (container, stats_result) in results { 229 let container_id = container.id.clone().unwrap_or_default(); 230 231 // Get container stats 232 let stats = match stats_result { 233 Some(Ok(stats)) => stats, 234 Some(Err(e)) => { 235 warn!("Failed to get stats for container {}: {}", container_id, e); 236 continue; 237 } 238 None => { 239 warn!("No stats available for container {}", container_id); 240 continue; 241 } 242 }; 243 244 trace!("Processing stats for container {}", container_id); 245 246 // Calculate CPU usage 247 // https://github.com/moby/moby/blob/eb131c5383db8cac633919f82abad86c99bffbe5/cli/command/container/stats_helpers.go#L175-L188 248 let cpu_delta = stats.cpu_stats.cpu_usage.total_usage as f64 249 - stats.precpu_stats.cpu_usage.total_usage as f64; 250 let system_delta = stats.cpu_stats.system_cpu_usage.unwrap_or(0) as f64 251 - stats.precpu_stats.system_cpu_usage.unwrap_or(0) as f64; 252 let cpu_usage = 100.0 * if system_delta > 0.0 && cpu_delta > 0.0 { 253 (cpu_delta / system_delta) 254 * (stats.cpu_stats.online_cpus.unwrap_or(1) as f64) 255 } else { 256 0.0 257 }; 258 259 // Parse ports 260 let ports = container 261 .ports 262 .unwrap_or_default() 263 .iter() 264 .map(|port| DockerPort { 265 ip: port.ip.clone(), 266 priv_port: port.private_port, 267 pub_port: port.public_port, 268 protocol: port.typ.clone().unwrap().to_string(), 269 }) 270 .collect(); 271 272 // Create container info 273 result.push(DockerContainer { 274 id: container_id.clone(), 275 name: container.names.unwrap_or_default().join(", "), 276 image: container.image.unwrap_or_default(), 277 status: container.status.unwrap_or_default(), 278 state: container.state.unwrap_or_default(), 279 created: container.created.unwrap_or(0), 280 ports, 281 cpu_usage, 282 mem_usage: stats.memory_stats.usage.unwrap_or(0), 283 mem_limit: stats.memory_stats.limit.unwrap_or(0), 284 net_io: match stats.network { 285 Some(network) => [network.rx_bytes, network.tx_bytes], 286 None => { 287 trace!("No network stats for container {}", container_id); 288 [0, 0] 289 } 290 }, 291 disk_io: [ 292 stats.storage_stats.read_size_bytes.unwrap_or(0), 293 stats.storage_stats.write_size_bytes.unwrap_or(0), 294 ], 295 }); 296 } 297 298 debug!( 299 "Successfully collected data for {} Docker containers", 300 result.len() 301 ); 302 Some(DockerInfo { 303 t: chrono::Utc::now().timestamp(), 304 containers: result, 305 }) 306 } 307 308 #[cfg(test)] 309 mod tests { 310 use super::*; 311 312 #[tokio::test] 313 async fn gather_data() { 314 use std::time::Instant; 315 let mut now = Instant::now(); 316 let mut sys = System::new_all(); 317 sys.refresh_all(); 318 std::thread::sleep(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL); 319 sys.refresh_all(); 320 println!("Elapsed: {:.2?}", now.elapsed()); 321 322 now = Instant::now(); 323 println!("{}", serde_json::json!(collect_general_info(&sys))); 324 println!("Elapsed: {:.2?}", now.elapsed()); 325 326 now = Instant::now(); 327 // collect_processes_info(&sys); 328 println!("{}", serde_json::json!(collect_processes_info(&sys))); 329 println!("Elapsed: {:.2?}", now.elapsed()); 330 331 now = Instant::now(); 332 println!("{}", serde_json::json!(get_docker_containers().await)); 333 println!("Elapsed: {:.2?}", now.elapsed()); 334 } 335 }