/ core / sync / file_watcher.py
file_watcher.py
  1  """
  2  File Watcher for Sovereign OS Mission Control
  3  
  4  Watches LIVE-COMPRESSION-*.md files and triggers callbacks on changes.
  5  This is the foundation for cross-thread resonance detection.
  6  
  7  Usage:
  8      from core.sync import FileWatcher
  9      
 10      def on_change(event):
 11          print(f"File changed: {event.path}")
 12          print(f"Thread: {event.thread_id}")
 13          print(f"Patterns: {event.patterns}")
 14      
 15      watcher = FileWatcher(
 16          sessions_dir="/path/to/sessions",
 17          on_change=on_change
 18      )
 19      watcher.start()
 20  """
 21  
 22  import os
 23  import re
 24  import time
 25  import logging
 26  from pathlib import Path
 27  from dataclasses import dataclass, field
 28  from typing import Callable, Optional, Dict, List, Set, Union
 29  from datetime import datetime
 30  from threading import Thread, Event
 31  
 32  try:
 33      from watchdog.observers import Observer
 34      from watchdog.events import FileSystemEventHandler, FileModifiedEvent, FileCreatedEvent
 35      WATCHDOG_AVAILABLE = True
 36  except ImportError:
 37      WATCHDOG_AVAILABLE = False
 38      Observer = None
 39      FileSystemEventHandler = object
 40  
 41  logger = logging.getLogger(__name__)
 42  
 43  
 44  @dataclass
 45  class CompressionEvent:
 46      """Event emitted when a LIVE-COMPRESSION file changes."""
 47      
 48      path: Path
 49      thread_id: str
 50      timestamp: datetime
 51      patterns: List[str] = field(default_factory=list)
 52      gravity_wells: Dict[str, float] = field(default_factory=dict)
 53      urgency_items: List[Dict] = field(default_factory=list)
 54      raw_content: str = ""
 55      
 56      @property
 57      def has_cross_thread_flag(self) -> bool:
 58          """Check if this compression has a cross-thread flag."""
 59          return "Cross-Thread Flag" in self.raw_content
 60  
 61  
 62  @dataclass 
 63  class ThreadState:
 64      """Tracks the state of a single thread's compression file."""
 65      
 66      thread_id: str
 67      path: Path
 68      last_modified: datetime
 69      checkpoint_count: int = 0
 70      patterns: Set[str] = field(default_factory=set)
 71      gravity_wells: Dict[str, float] = field(default_factory=dict)
 72  
 73  
 74  class CompressionFileHandler(FileSystemEventHandler):
 75      """Handles file system events for LIVE-COMPRESSION files."""
 76      
 77      def __init__(self, callback: Callable[[CompressionEvent], None]):
 78          self.callback = callback
 79          self._debounce: Dict[str, float] = {}
 80          self._debounce_seconds = 1.0  # Ignore rapid successive changes
 81      
 82      def on_modified(self, event):
 83          if event.is_directory:
 84              return
 85          self._handle_file_event(event.src_path)
 86      
 87      def on_created(self, event):
 88          if event.is_directory:
 89              return
 90          self._handle_file_event(event.src_path)
 91  
 92      def on_moved(self, event):
 93          """Handle file moved/renamed events (atomic saves)."""
 94          if event.is_directory:
 95              return
 96          # Use dest_path for moved events (the final location)
 97          self._handle_file_event(event.dest_path)
 98  
 99      def _handle_file_event(self, path: str):
100          """Process a file event if it matches our pattern."""
101          path = Path(path)
102          
103          # Only process LIVE-COMPRESSION files
104          if not self._is_compression_file(path):
105              return
106          
107          # Debounce rapid changes
108          now = time.time()
109          if path.name in self._debounce:
110              if now - self._debounce[path.name] < self._debounce_seconds:
111                  return
112          self._debounce[path.name] = now
113          
114          # Parse and emit event
115          try:
116              event = self._parse_compression_file(path)
117              if event:
118                  self.callback(event)
119          except Exception as e:
120              logger.error(f"Error parsing compression file {path}: {e}")
121      
122      def _is_compression_file(self, path: Path) -> bool:
123          """Check if path is a LIVE-COMPRESSION file."""
124          name = path.name
125          return (
126              name.startswith("LIVE-COMPRESSION") and 
127              name.endswith(".md")
128          )
129      
130      def _parse_compression_file(self, path: Path) -> Optional[CompressionEvent]:
131          """Parse a LIVE-COMPRESSION file and extract key information."""
132          if not path.exists():
133              return None
134          
135          content = path.read_text(encoding="utf-8")
136          
137          # Extract thread ID from filename
138          # LIVE-COMPRESSION.md -> "main"
139          # LIVE-COMPRESSION-thread-a.md -> "thread-a"
140          thread_id = self._extract_thread_id(path.name)
141          
142          # Extract patterns (concepts mentioned)
143          patterns = self._extract_patterns(content)
144          
145          # Extract gravity wells
146          gravity_wells = self._extract_gravity_wells(content)
147          
148          # Extract urgency items
149          urgency_items = self._extract_urgency_items(content)
150          
151          return CompressionEvent(
152              path=path,
153              thread_id=thread_id,
154              timestamp=datetime.now(),
155              patterns=patterns,
156              gravity_wells=gravity_wells,
157              urgency_items=urgency_items,
158              raw_content=content
159          )
160      
161      def _extract_thread_id(self, filename: str) -> str:
162          """Extract thread ID from filename."""
163          # LIVE-COMPRESSION.md -> main
164          # LIVE-COMPRESSION-alignment-protocol.md -> alignment-protocol
165          if filename == "LIVE-COMPRESSION.md":
166              return "main"
167          
168          match = re.match(r"LIVE-COMPRESSION-(.+)\.md", filename)
169          if match:
170              return match.group(1)
171          return "unknown"
172      
173      def _extract_patterns(self, content: str) -> List[str]:
174          """Extract key patterns/concepts from content."""
175          patterns = []
176          
177          # Look for patterns in specific sections
178          # Gravity wells section
179          gravity_match = re.search(
180              r"## Gravity Wells.*?\n(.*?)(?=\n##|\Z)", 
181              content, 
182              re.DOTALL
183          )
184          if gravity_match:
185              # Extract concept names from table rows
186              for line in gravity_match.group(1).split("\n"):
187                  if "|" in line and not line.strip().startswith("|--"):
188                      parts = [p.strip() for p in line.split("|")]
189                      if len(parts) >= 2 and parts[1] and not parts[1].startswith("Well"):
190                          patterns.append(parts[1])
191          
192          # Look for [[wiki-links]]
193          wiki_links = re.findall(r"\[\[([^\]]+)\]\]", content)
194          patterns.extend(wiki_links)
195          
196          # Look for `backtick-concepts`
197          backtick_concepts = re.findall(r"`([^`]+)`", content)
198          # Filter out code-like things
199          for concept in backtick_concepts:
200              if not any(c in concept for c in ["=", "(", ")", "{", "}"]):
201                  patterns.append(concept)
202          
203          # Deduplicate and return
204          return list(set(patterns))
205      
206      def _extract_gravity_wells(self, content: str) -> Dict[str, float]:
207          """Extract gravity wells and their strengths."""
208          wells = {}
209          
210          gravity_match = re.search(
211              r"## Gravity Wells.*?\n(.*?)(?=\n##|\Z)", 
212              content, 
213              re.DOTALL
214          )
215          if gravity_match:
216              for line in gravity_match.group(1).split("\n"):
217                  if "|" in line and not line.strip().startswith("|--"):
218                      parts = [p.strip() for p in line.split("|")]
219                      if len(parts) >= 3:
220                          name = parts[1] if len(parts) > 1 else ""
221                          strength_str = parts[2] if len(parts) > 2 else ""
222                          if name and strength_str:
223                              try:
224                                  strength = float(strength_str)
225                                  wells[name] = strength
226                              except ValueError:
227                                  pass
228          
229          return wells
230      
231      def _extract_urgency_items(self, content: str) -> List[Dict]:
232          """Extract urgency items from content."""
233          items = []
234          
235          urgency_match = re.search(
236              r"## (?:Urgency|Pending|Decisions?).*?\n(.*?)(?=\n##|\Z)", 
237              content, 
238              re.DOTALL | re.IGNORECASE
239          )
240          if urgency_match:
241              for line in urgency_match.group(1).split("\n"):
242                  if "|" in line and not line.strip().startswith("|--"):
243                      parts = [p.strip() for p in line.split("|")]
244                      if len(parts) >= 4:
245                          item = parts[1] if len(parts) > 1 else ""
246                          urgency_str = parts[3] if len(parts) > 3 else ""
247                          if item and urgency_str:
248                              try:
249                                  urgency = float(urgency_str)
250                                  items.append({
251                                      "item": item,
252                                      "urgency": urgency
253                                  })
254                              except ValueError:
255                                  pass
256          
257          return items
258  
259  
260  class FileWatcher:
261      """
262      Watches LIVE-COMPRESSION files for changes.
263      
264      This is the foundation of Mission Control's awareness.
265      When any thread's First Officer updates its compression file,
266      this watcher detects it and can trigger resonance detection.
267      """
268      
269      def __init__(
270          self,
271          sessions_dir: Union[str, Path],
272          on_change: Optional[Callable[[CompressionEvent], None]] = None,
273          on_resonance: Optional[Callable[[List[CompressionEvent]], None]] = None,
274      ):
275          self.sessions_dir = Path(sessions_dir)
276          self.on_change = on_change or self._default_on_change
277          self.on_resonance = on_resonance
278          
279          self._observer: Optional[Observer] = None
280          self._thread_states: Dict[str, ThreadState] = {}
281          self._checkpoint_count = 0
282          self._stop_event = Event()
283          
284          if not WATCHDOG_AVAILABLE:
285              logger.warning(
286                  "watchdog not installed. Install with: pip install watchdog"
287              )
288      
289      def start(self) -> None:
290          """Start watching for file changes."""
291          if not WATCHDOG_AVAILABLE:
292              raise RuntimeError(
293                  "watchdog library required. Install with: pip install watchdog"
294              )
295          
296          if not self.sessions_dir.exists():
297              raise FileNotFoundError(f"Sessions directory not found: {self.sessions_dir}")
298          
299          # Initialize with existing files
300          self._scan_existing_files()
301          
302          # Set up file system observer
303          handler = CompressionFileHandler(self._handle_change)
304          self._observer = Observer()
305          self._observer.schedule(handler, str(self.sessions_dir), recursive=False)
306          self._observer.start()
307          
308          logger.info(f"FileWatcher started. Watching: {self.sessions_dir}")
309      
310      def stop(self) -> None:
311          """Stop watching for file changes."""
312          self._stop_event.set()
313          if self._observer:
314              self._observer.stop()
315              self._observer.join()
316              logger.info("FileWatcher stopped.")
317      
318      def _scan_existing_files(self) -> None:
319          """Scan existing LIVE-COMPRESSION files on startup."""
320          for path in self.sessions_dir.glob("LIVE-COMPRESSION*.md"):
321              handler = CompressionFileHandler(lambda e: None)
322              event = handler._parse_compression_file(path)
323              if event:
324                  self._update_thread_state(event)
325                  logger.info(f"Found existing compression file: {path.name}")
326      
327      def _handle_change(self, event: CompressionEvent) -> None:
328          """Handle a compression file change."""
329          logger.info(f"Compression file changed: {event.thread_id}")
330          
331          # Update internal state
332          self._update_thread_state(event)
333          
334          # Increment checkpoint count
335          self._checkpoint_count += 1
336          
337          # Call user callback
338          self.on_change(event)
339          
340          # Check for resonance every change
341          if self.on_resonance and len(self._thread_states) > 1:
342              resonance = self._detect_resonance()
343              if resonance:
344                  self.on_resonance(resonance)
345          
346          # Check if Mission Control synthesis should trigger
347          if self._checkpoint_count >= 5:
348              logger.info("5 FO checkpoints reached - Mission Control synthesis trigger")
349              self._checkpoint_count = 0
350      
351      def _update_thread_state(self, event: CompressionEvent) -> None:
352          """Update internal state for a thread."""
353          thread_id = event.thread_id
354          
355          if thread_id in self._thread_states:
356              state = self._thread_states[thread_id]
357              state.last_modified = event.timestamp
358              state.checkpoint_count += 1
359              state.patterns.update(event.patterns)
360              state.gravity_wells.update(event.gravity_wells)
361          else:
362              self._thread_states[thread_id] = ThreadState(
363                  thread_id=thread_id,
364                  path=event.path,
365                  last_modified=event.timestamp,
366                  checkpoint_count=1,
367                  patterns=set(event.patterns),
368                  gravity_wells=event.gravity_wells
369              )
370      
371      def _detect_resonance(self) -> Optional[List[CompressionEvent]]:
372          """
373          Detect resonance across threads.
374          
375          Resonance = shared patterns appearing in multiple threads.
376          """
377          if len(self._thread_states) < 2:
378              return None
379          
380          # Find patterns that appear in multiple threads
381          all_patterns: Dict[str, Set[str]] = {}  # pattern -> set of thread_ids
382          
383          for thread_id, state in self._thread_states.items():
384              for pattern in state.patterns:
385                  pattern_lower = pattern.lower()
386                  if pattern_lower not in all_patterns:
387                      all_patterns[pattern_lower] = set()
388                  all_patterns[pattern_lower].add(thread_id)
389          
390          # Find shared patterns (in 2+ threads)
391          shared = {
392              pattern: threads 
393              for pattern, threads in all_patterns.items() 
394              if len(threads) >= 2
395          }
396          
397          if shared:
398              logger.info(f"Resonance detected! Shared patterns: {list(shared.keys())}")
399              # Return events for all affected threads
400              affected_threads = set()
401              for threads in shared.values():
402                  affected_threads.update(threads)
403              
404              # Create events for affected threads
405              events = []
406              for thread_id in affected_threads:
407                  state = self._thread_states[thread_id]
408                  events.append(CompressionEvent(
409                      path=state.path,
410                      thread_id=thread_id,
411                      timestamp=state.last_modified,
412                      patterns=list(state.patterns)
413                  ))
414              return events
415          
416          return None
417      
418      def _default_on_change(self, event: CompressionEvent) -> None:
419          """Default change handler - just logs."""
420          logger.info(
421              f"[{event.thread_id}] Changed at {event.timestamp}. "
422              f"Patterns: {len(event.patterns)}, "
423              f"Gravity wells: {len(event.gravity_wells)}"
424          )
425      
426      @property
427      def thread_states(self) -> Dict[str, ThreadState]:
428          """Get current state of all watched threads."""
429          return self._thread_states.copy()
430      
431      @property
432      def total_checkpoints(self) -> int:
433          """Get total checkpoint count across all threads."""
434          return sum(s.checkpoint_count for s in self._thread_states.values())
435      
436      def get_shared_patterns(self) -> Dict[str, Set[str]]:
437          """Get patterns that appear in multiple threads."""
438          all_patterns: Dict[str, Set[str]] = {}
439          
440          for thread_id, state in self._thread_states.items():
441              for pattern in state.patterns:
442                  pattern_lower = pattern.lower()
443                  if pattern_lower not in all_patterns:
444                      all_patterns[pattern_lower] = set()
445                  all_patterns[pattern_lower].add(thread_id)
446          
447          return {p: t for p, t in all_patterns.items() if len(t) >= 2}
448  
449  
450  # CLI for testing
451  if __name__ == "__main__":
452      import sys
453      
454      logging.basicConfig(
455          level=logging.INFO,
456          format="%(asctime)s [%(levelname)s] %(message)s"
457      )
458      
459      sessions_dir = sys.argv[1] if len(sys.argv) > 1 else "./sessions"
460      
461      def on_change(event: CompressionEvent):
462          print(f"\n{'='*60}")
463          print(f"CHANGE DETECTED: {event.thread_id}")
464          print(f"Path: {event.path}")
465          print(f"Patterns: {event.patterns[:5]}...")  # First 5
466          print(f"Gravity wells: {event.gravity_wells}")
467          print(f"Has cross-thread flag: {event.has_cross_thread_flag}")
468          print(f"{'='*60}\n")
469      
470      def on_resonance(events: List[CompressionEvent]):
471          print(f"\n{'*'*60}")
472          print("RESONANCE DETECTED!")
473          print(f"Affected threads: {[e.thread_id for e in events]}")
474          print(f"{'*'*60}\n")
475      
476      watcher = FileWatcher(
477          sessions_dir=sessions_dir,
478          on_change=on_change,
479          on_resonance=on_resonance
480      )
481      
482      print(f"Starting FileWatcher on {sessions_dir}")
483      print("Press Ctrl+C to stop\n")
484      
485      try:
486          watcher.start()
487          while True:
488              time.sleep(1)
489      except KeyboardInterrupt:
490          print("\nStopping...")
491          watcher.stop()