watch_manager.rs
1 #[cfg(all(feature = "watch", feature = "async"))] 2 use crate::events::AbundantisEvent; 3 4 #[cfg(all(feature = "watch", feature = "async"))] 5 use crate::source::{EnvSource, FileSource}; 6 7 #[cfg(all(feature = "watch", feature = "async"))] 8 use compact_str::CompactString; 9 10 #[cfg(all(feature = "watch", feature = "async"))] 11 use parking_lot::Mutex; 12 13 #[cfg(all(feature = "watch", feature = "async"))] 14 use std::path::{Path, PathBuf}; 15 16 #[cfg(all(feature = "watch", feature = "async"))] 17 use std::sync::Arc; 18 19 #[cfg(all(feature = "watch", feature = "async"))] 20 use crate::watch::{ChangeKind, FileChanged, FileWatcher}; 21 22 #[cfg(all(feature = "watch", feature = "async"))] 23 pub struct WatchManager { 24 watcher: Arc<FileWatcher>, 25 file_sources: Arc<Mutex<std::collections::HashMap<PathBuf, Arc<FileSource>>>>, 26 event_bus: Arc<crate::events::EventBus>, 27 } 28 29 #[cfg(all(feature = "watch", feature = "async"))] 30 impl WatchManager { 31 pub fn new(event_bus: Arc<crate::events::EventBus>) -> Result<Self, notify::Error> { 32 let watcher = Arc::new(FileWatcher::new()?); 33 34 Ok(Self { 35 watcher, 36 file_sources: Arc::new(Mutex::new(std::collections::HashMap::new())), 37 event_bus, 38 }) 39 } 40 41 pub fn watch_file(&self, source: Arc<FileSource>) { 42 let path = source.path().to_path_buf(); 43 let source_id = source.as_ref().id().as_str(); 44 45 self.watcher.watch(&path, source_id); 46 self.file_sources.lock().insert(path, source); 47 } 48 49 pub fn unwatch_file(&self, path: impl AsRef<Path>) { 50 let path_buf = path.as_ref().to_path_buf(); 51 self.watcher.unwatch(&path_buf); 52 self.file_sources.lock().remove(&path_buf); 53 } 54 55 pub fn start(&self) { 56 let sources = Arc::clone(&self.file_sources); 57 let event_bus = Arc::clone(&self.event_bus); 58 59 self.watcher 60 .register_callback(Arc::new(move |change: FileChanged| { 61 let path = &change.path; 62 63 let source_opt = { 64 let sources = sources.lock(); 65 sources.get(path).cloned() 66 }; 67 68 if let Some(source) = source_opt { 69 match change.kind { 70 ChangeKind::Created => { 71 tracing::debug!("File created: {:?}", path); 72 if let Err(e) = Self::handle_file_create(&source, &event_bus) { 73 tracing::error!( 74 "Failed to handle file create for {:?}: {}", 75 path, 76 e 77 ); 78 } 79 } 80 ChangeKind::Modified => { 81 tracing::debug!("File modified: {:?}", path); 82 if let Err(e) = Self::handle_file_change(&source, &event_bus) { 83 tracing::error!( 84 "Failed to handle file change for {:?}: {}", 85 path, 86 e 87 ); 88 } 89 } 90 ChangeKind::Deleted => { 91 tracing::debug!("File deleted: {:?}", path); 92 if let Err(e) = Self::handle_file_delete(&source, &event_bus) { 93 tracing::error!( 94 "Failed to handle file delete for {:?}: {}", 95 path, 96 e 97 ); 98 } 99 } 100 } 101 } 102 })); 103 } 104 105 fn handle_file_change( 106 source: &Arc<FileSource>, 107 event_bus: &Arc<crate::events::EventBus>, 108 ) -> Result<(), String> { 109 let before_snapshot = source 110 .as_ref() 111 .load() 112 .map_err(|e| format!("Failed to load before reload: {}", e))?; 113 114 source 115 .as_ref() 116 .reload() 117 .map_err(|e| format!("Failed to reload file: {}", e))?; 118 119 let after_snapshot = source 120 .as_ref() 121 .load() 122 .map_err(|e| format!("Failed to load after reload: {}", e))?; 123 124 let before_vars: std::collections::HashSet<CompactString> = before_snapshot 125 .variables 126 .iter() 127 .map(|v| v.key.clone()) 128 .collect(); 129 130 let after_vars: std::collections::HashSet<CompactString> = after_snapshot 131 .variables 132 .iter() 133 .map(|v| v.key.clone()) 134 .collect(); 135 136 let added: Vec<CompactString> = after_vars.difference(&before_vars).cloned().collect(); 137 138 let removed: Vec<CompactString> = before_vars.difference(&after_vars).cloned().collect(); 139 140 event_bus.publish(AbundantisEvent::VariablesChanged { 141 source_id: source.as_ref().id().clone(), 142 added, 143 removed, 144 }); 145 146 event_bus.publish(AbundantisEvent::CacheInvalidated { scope: None }); 147 148 Ok(()) 149 } 150 151 fn handle_file_create( 152 source: &Arc<FileSource>, 153 event_bus: &Arc<crate::events::EventBus>, 154 ) -> Result<(), String> { 155 let snapshot = source 156 .as_ref() 157 .load() 158 .map_err(|e| format!("Failed to load created file: {}", e))?; 159 160 let vars: Vec<CompactString> = snapshot.variables.iter().map(|v| v.key.clone()).collect(); 161 162 event_bus.publish(AbundantisEvent::VariablesChanged { 163 source_id: source.as_ref().id().clone(), 164 added: vars, 165 removed: Vec::new(), 166 }); 167 168 event_bus.publish(AbundantisEvent::CacheInvalidated { scope: None }); 169 170 Ok(()) 171 } 172 173 fn handle_file_delete( 174 source: &Arc<FileSource>, 175 event_bus: &Arc<crate::events::EventBus>, 176 ) -> Result<(), String> { 177 let snapshot = source 178 .as_ref() 179 .load() 180 .map_err(|e| format!("Failed to load deleted file (cached): {}", e))?; 181 182 let vars: Vec<CompactString> = snapshot.variables.iter().map(|v| v.key.clone()).collect(); 183 184 event_bus.publish(AbundantisEvent::VariablesChanged { 185 source_id: source.as_ref().id().clone(), 186 added: Vec::new(), 187 removed: vars, 188 }); 189 190 event_bus.publish(AbundantisEvent::CacheInvalidated { scope: None }); 191 192 Ok(()) 193 } 194 195 pub fn watched_files(&self) -> Vec<PathBuf> { 196 self.watcher.paths() 197 } 198 199 pub fn is_watching(&self, path: impl AsRef<Path>) -> bool { 200 self.watcher.is_watching(path) 201 } 202 } 203 204 #[cfg(all(test, feature = "watch", feature = "async"))] 205 mod tests { 206 use super::*; 207 use std::io::Write; 208 use tempfile::NamedTempFile; 209 210 #[tokio::test] 211 async fn test_watch_manager() { 212 let event_bus = Arc::new(crate::events::EventBus::new(100)); 213 let manager = WatchManager::new(event_bus.clone()).unwrap(); 214 215 let mut file = NamedTempFile::new().unwrap(); 216 writeln!(file, "KEY=value1").unwrap(); 217 218 let source = Arc::new(FileSource::new(file.path()).unwrap()); 219 manager.watch_file(source.clone()); 220 221 assert!(manager.is_watching(file.path())); 222 223 manager.unwatch_file(file.path()); 224 assert!(!manager.is_watching(file.path())); 225 } 226 }