/ src / watch_manager.rs
watch_manager.rs
  1  #[cfg(all(feature = "watch", feature = "async"))]
  2  use crate::events::AbundantisEvent;
  3  
  4  #[cfg(all(feature = "watch", feature = "async"))]
  5  use crate::source::{EnvSource, FileSource};
  6  
  7  #[cfg(all(feature = "watch", feature = "async"))]
  8  use compact_str::CompactString;
  9  
 10  #[cfg(all(feature = "watch", feature = "async"))]
 11  use parking_lot::Mutex;
 12  
 13  #[cfg(all(feature = "watch", feature = "async"))]
 14  use std::path::{Path, PathBuf};
 15  
 16  #[cfg(all(feature = "watch", feature = "async"))]
 17  use std::sync::Arc;
 18  
 19  #[cfg(all(feature = "watch", feature = "async"))]
 20  use crate::watch::{ChangeKind, FileChanged, FileWatcher};
 21  
 22  #[cfg(all(feature = "watch", feature = "async"))]
 23  pub struct WatchManager {
 24      watcher: Arc<FileWatcher>,
 25      file_sources: Arc<Mutex<std::collections::HashMap<PathBuf, Arc<FileSource>>>>,
 26      event_bus: Arc<crate::events::EventBus>,
 27  }
 28  
 29  #[cfg(all(feature = "watch", feature = "async"))]
 30  impl WatchManager {
 31      pub fn new(event_bus: Arc<crate::events::EventBus>) -> Result<Self, notify::Error> {
 32          let watcher = Arc::new(FileWatcher::new()?);
 33  
 34          Ok(Self {
 35              watcher,
 36              file_sources: Arc::new(Mutex::new(std::collections::HashMap::new())),
 37              event_bus,
 38          })
 39      }
 40  
 41      pub fn watch_file(&self, source: Arc<FileSource>) {
 42          let path = source.path().to_path_buf();
 43          let source_id = source.as_ref().id().as_str();
 44  
 45          self.watcher.watch(&path, source_id);
 46          self.file_sources.lock().insert(path, source);
 47      }
 48  
 49      pub fn unwatch_file(&self, path: impl AsRef<Path>) {
 50          let path_buf = path.as_ref().to_path_buf();
 51          self.watcher.unwatch(&path_buf);
 52          self.file_sources.lock().remove(&path_buf);
 53      }
 54  
 55      pub fn start(&self) {
 56          let sources = Arc::clone(&self.file_sources);
 57          let event_bus = Arc::clone(&self.event_bus);
 58  
 59          self.watcher
 60              .register_callback(Arc::new(move |change: FileChanged| {
 61                  let path = &change.path;
 62  
 63                  let source_opt = {
 64                      let sources = sources.lock();
 65                      sources.get(path).cloned()
 66                  };
 67  
 68                  if let Some(source) = source_opt {
 69                      match change.kind {
 70                          ChangeKind::Created => {
 71                              tracing::debug!("File created: {:?}", path);
 72                              if let Err(e) = Self::handle_file_create(&source, &event_bus) {
 73                                  tracing::error!(
 74                                      "Failed to handle file create for {:?}: {}",
 75                                      path,
 76                                      e
 77                                  );
 78                              }
 79                          }
 80                          ChangeKind::Modified => {
 81                              tracing::debug!("File modified: {:?}", path);
 82                              if let Err(e) = Self::handle_file_change(&source, &event_bus) {
 83                                  tracing::error!(
 84                                      "Failed to handle file change for {:?}: {}",
 85                                      path,
 86                                      e
 87                                  );
 88                              }
 89                          }
 90                          ChangeKind::Deleted => {
 91                              tracing::debug!("File deleted: {:?}", path);
 92                              if let Err(e) = Self::handle_file_delete(&source, &event_bus) {
 93                                  tracing::error!(
 94                                      "Failed to handle file delete for {:?}: {}",
 95                                      path,
 96                                      e
 97                                  );
 98                              }
 99                          }
100                      }
101                  }
102              }));
103      }
104  
105      fn handle_file_change(
106          source: &Arc<FileSource>,
107          event_bus: &Arc<crate::events::EventBus>,
108      ) -> Result<(), String> {
109          let before_snapshot = source
110              .as_ref()
111              .load()
112              .map_err(|e| format!("Failed to load before reload: {}", e))?;
113  
114          source
115              .as_ref()
116              .reload()
117              .map_err(|e| format!("Failed to reload file: {}", e))?;
118  
119          let after_snapshot = source
120              .as_ref()
121              .load()
122              .map_err(|e| format!("Failed to load after reload: {}", e))?;
123  
124          let before_vars: std::collections::HashSet<CompactString> = before_snapshot
125              .variables
126              .iter()
127              .map(|v| v.key.clone())
128              .collect();
129  
130          let after_vars: std::collections::HashSet<CompactString> = after_snapshot
131              .variables
132              .iter()
133              .map(|v| v.key.clone())
134              .collect();
135  
136          let added: Vec<CompactString> = after_vars.difference(&before_vars).cloned().collect();
137  
138          let removed: Vec<CompactString> = before_vars.difference(&after_vars).cloned().collect();
139  
140          event_bus.publish(AbundantisEvent::VariablesChanged {
141              source_id: source.as_ref().id().clone(),
142              added,
143              removed,
144          });
145  
146          event_bus.publish(AbundantisEvent::CacheInvalidated { scope: None });
147  
148          Ok(())
149      }
150  
151      fn handle_file_create(
152          source: &Arc<FileSource>,
153          event_bus: &Arc<crate::events::EventBus>,
154      ) -> Result<(), String> {
155          let snapshot = source
156              .as_ref()
157              .load()
158              .map_err(|e| format!("Failed to load created file: {}", e))?;
159  
160          let vars: Vec<CompactString> = snapshot.variables.iter().map(|v| v.key.clone()).collect();
161  
162          event_bus.publish(AbundantisEvent::VariablesChanged {
163              source_id: source.as_ref().id().clone(),
164              added: vars,
165              removed: Vec::new(),
166          });
167  
168          event_bus.publish(AbundantisEvent::CacheInvalidated { scope: None });
169  
170          Ok(())
171      }
172  
173      fn handle_file_delete(
174          source: &Arc<FileSource>,
175          event_bus: &Arc<crate::events::EventBus>,
176      ) -> Result<(), String> {
177          let snapshot = source
178              .as_ref()
179              .load()
180              .map_err(|e| format!("Failed to load deleted file (cached): {}", e))?;
181  
182          let vars: Vec<CompactString> = snapshot.variables.iter().map(|v| v.key.clone()).collect();
183  
184          event_bus.publish(AbundantisEvent::VariablesChanged {
185              source_id: source.as_ref().id().clone(),
186              added: Vec::new(),
187              removed: vars,
188          });
189  
190          event_bus.publish(AbundantisEvent::CacheInvalidated { scope: None });
191  
192          Ok(())
193      }
194  
195      pub fn watched_files(&self) -> Vec<PathBuf> {
196          self.watcher.paths()
197      }
198  
199      pub fn is_watching(&self, path: impl AsRef<Path>) -> bool {
200          self.watcher.is_watching(path)
201      }
202  }
203  
204  #[cfg(all(test, feature = "watch", feature = "async"))]
205  mod tests {
206      use super::*;
207      use std::io::Write;
208      use tempfile::NamedTempFile;
209  
210      #[tokio::test]
211      async fn test_watch_manager() {
212          let event_bus = Arc::new(crate::events::EventBus::new(100));
213          let manager = WatchManager::new(event_bus.clone()).unwrap();
214  
215          let mut file = NamedTempFile::new().unwrap();
216          writeln!(file, "KEY=value1").unwrap();
217  
218          let source = Arc::new(FileSource::new(file.path()).unwrap());
219          manager.watch_file(source.clone());
220  
221          assert!(manager.is_watching(file.path()));
222  
223          manager.unwatch_file(file.path());
224          assert!(!manager.is_watching(file.path()));
225      }
226  }