file_watcher.py
1 """ 2 File Watcher for Sovereign OS Mission Control 3 4 Watches LIVE-COMPRESSION-*.md files and triggers callbacks on changes. 5 This is the foundation for cross-thread resonance detection. 6 7 Usage: 8 from core.sync import FileWatcher 9 10 def on_change(event): 11 print(f"File changed: {event.path}") 12 print(f"Thread: {event.thread_id}") 13 print(f"Patterns: {event.patterns}") 14 15 watcher = FileWatcher( 16 sessions_dir="/path/to/sessions", 17 on_change=on_change 18 ) 19 watcher.start() 20 """ 21 22 import os 23 import re 24 import time 25 import logging 26 from pathlib import Path 27 from dataclasses import dataclass, field 28 from typing import Callable, Optional, Dict, List, Set, Union 29 from datetime import datetime 30 from threading import Thread, Event 31 32 try: 33 from watchdog.observers import Observer 34 from watchdog.events import FileSystemEventHandler, FileModifiedEvent, FileCreatedEvent 35 WATCHDOG_AVAILABLE = True 36 except ImportError: 37 WATCHDOG_AVAILABLE = False 38 Observer = None 39 FileSystemEventHandler = object 40 41 logger = logging.getLogger(__name__) 42 43 44 @dataclass 45 class CompressionEvent: 46 """Event emitted when a LIVE-COMPRESSION file changes.""" 47 48 path: Path 49 thread_id: str 50 timestamp: datetime 51 patterns: List[str] = field(default_factory=list) 52 gravity_wells: Dict[str, float] = field(default_factory=dict) 53 urgency_items: List[Dict] = field(default_factory=list) 54 raw_content: str = "" 55 56 @property 57 def has_cross_thread_flag(self) -> bool: 58 """Check if this compression has a cross-thread flag.""" 59 return "Cross-Thread Flag" in self.raw_content 60 61 62 @dataclass 63 class ThreadState: 64 """Tracks the state of a single thread's compression file.""" 65 66 thread_id: str 67 path: Path 68 last_modified: datetime 69 checkpoint_count: int = 0 70 patterns: Set[str] = field(default_factory=set) 71 gravity_wells: Dict[str, float] = field(default_factory=dict) 72 73 74 class CompressionFileHandler(FileSystemEventHandler): 75 """Handles file system events for LIVE-COMPRESSION files.""" 76 77 def __init__(self, callback: Callable[[CompressionEvent], None]): 78 self.callback = callback 79 self._debounce: Dict[str, float] = {} 80 self._debounce_seconds = 1.0 # Ignore rapid successive changes 81 82 def on_modified(self, event): 83 if event.is_directory: 84 return 85 self._handle_file_event(event.src_path) 86 87 def on_created(self, event): 88 if event.is_directory: 89 return 90 self._handle_file_event(event.src_path) 91 92 def on_moved(self, event): 93 """Handle file moved/renamed events (atomic saves).""" 94 if event.is_directory: 95 return 96 # Use dest_path for moved events (the final location) 97 self._handle_file_event(event.dest_path) 98 99 def _handle_file_event(self, path: str): 100 """Process a file event if it matches our pattern.""" 101 path = Path(path) 102 103 # Only process LIVE-COMPRESSION files 104 if not self._is_compression_file(path): 105 return 106 107 # Debounce rapid changes 108 now = time.time() 109 if path.name in self._debounce: 110 if now - self._debounce[path.name] < self._debounce_seconds: 111 return 112 self._debounce[path.name] = now 113 114 # Parse and emit event 115 try: 116 event = self._parse_compression_file(path) 117 if event: 118 self.callback(event) 119 except Exception as e: 120 logger.error(f"Error parsing compression file {path}: {e}") 121 122 def _is_compression_file(self, path: Path) -> bool: 123 """Check if path is a LIVE-COMPRESSION file.""" 124 name = path.name 125 return ( 126 name.startswith("LIVE-COMPRESSION") and 127 name.endswith(".md") 128 ) 129 130 def _parse_compression_file(self, path: Path) -> Optional[CompressionEvent]: 131 """Parse a LIVE-COMPRESSION file and extract key information.""" 132 if not path.exists(): 133 return None 134 135 content = path.read_text(encoding="utf-8") 136 137 # Extract thread ID from filename 138 # LIVE-COMPRESSION.md -> "main" 139 # LIVE-COMPRESSION-thread-a.md -> "thread-a" 140 thread_id = self._extract_thread_id(path.name) 141 142 # Extract patterns (concepts mentioned) 143 patterns = self._extract_patterns(content) 144 145 # Extract gravity wells 146 gravity_wells = self._extract_gravity_wells(content) 147 148 # Extract urgency items 149 urgency_items = self._extract_urgency_items(content) 150 151 return CompressionEvent( 152 path=path, 153 thread_id=thread_id, 154 timestamp=datetime.now(), 155 patterns=patterns, 156 gravity_wells=gravity_wells, 157 urgency_items=urgency_items, 158 raw_content=content 159 ) 160 161 def _extract_thread_id(self, filename: str) -> str: 162 """Extract thread ID from filename.""" 163 # LIVE-COMPRESSION.md -> main 164 # LIVE-COMPRESSION-alignment-protocol.md -> alignment-protocol 165 if filename == "LIVE-COMPRESSION.md": 166 return "main" 167 168 match = re.match(r"LIVE-COMPRESSION-(.+)\.md", filename) 169 if match: 170 return match.group(1) 171 return "unknown" 172 173 def _extract_patterns(self, content: str) -> List[str]: 174 """Extract key patterns/concepts from content.""" 175 patterns = [] 176 177 # Look for patterns in specific sections 178 # Gravity wells section 179 gravity_match = re.search( 180 r"## Gravity Wells.*?\n(.*?)(?=\n##|\Z)", 181 content, 182 re.DOTALL 183 ) 184 if gravity_match: 185 # Extract concept names from table rows 186 for line in gravity_match.group(1).split("\n"): 187 if "|" in line and not line.strip().startswith("|--"): 188 parts = [p.strip() for p in line.split("|")] 189 if len(parts) >= 2 and parts[1] and not parts[1].startswith("Well"): 190 patterns.append(parts[1]) 191 192 # Look for [[wiki-links]] 193 wiki_links = re.findall(r"\[\[([^\]]+)\]\]", content) 194 patterns.extend(wiki_links) 195 196 # Look for `backtick-concepts` 197 backtick_concepts = re.findall(r"`([^`]+)`", content) 198 # Filter out code-like things 199 for concept in backtick_concepts: 200 if not any(c in concept for c in ["=", "(", ")", "{", "}"]): 201 patterns.append(concept) 202 203 # Deduplicate and return 204 return list(set(patterns)) 205 206 def _extract_gravity_wells(self, content: str) -> Dict[str, float]: 207 """Extract gravity wells and their strengths.""" 208 wells = {} 209 210 gravity_match = re.search( 211 r"## Gravity Wells.*?\n(.*?)(?=\n##|\Z)", 212 content, 213 re.DOTALL 214 ) 215 if gravity_match: 216 for line in gravity_match.group(1).split("\n"): 217 if "|" in line and not line.strip().startswith("|--"): 218 parts = [p.strip() for p in line.split("|")] 219 if len(parts) >= 3: 220 name = parts[1] if len(parts) > 1 else "" 221 strength_str = parts[2] if len(parts) > 2 else "" 222 if name and strength_str: 223 try: 224 strength = float(strength_str) 225 wells[name] = strength 226 except ValueError: 227 pass 228 229 return wells 230 231 def _extract_urgency_items(self, content: str) -> List[Dict]: 232 """Extract urgency items from content.""" 233 items = [] 234 235 urgency_match = re.search( 236 r"## (?:Urgency|Pending|Decisions?).*?\n(.*?)(?=\n##|\Z)", 237 content, 238 re.DOTALL | re.IGNORECASE 239 ) 240 if urgency_match: 241 for line in urgency_match.group(1).split("\n"): 242 if "|" in line and not line.strip().startswith("|--"): 243 parts = [p.strip() for p in line.split("|")] 244 if len(parts) >= 4: 245 item = parts[1] if len(parts) > 1 else "" 246 urgency_str = parts[3] if len(parts) > 3 else "" 247 if item and urgency_str: 248 try: 249 urgency = float(urgency_str) 250 items.append({ 251 "item": item, 252 "urgency": urgency 253 }) 254 except ValueError: 255 pass 256 257 return items 258 259 260 class FileWatcher: 261 """ 262 Watches LIVE-COMPRESSION files for changes. 263 264 This is the foundation of Mission Control's awareness. 265 When any thread's First Officer updates its compression file, 266 this watcher detects it and can trigger resonance detection. 267 """ 268 269 def __init__( 270 self, 271 sessions_dir: Union[str, Path], 272 on_change: Optional[Callable[[CompressionEvent], None]] = None, 273 on_resonance: Optional[Callable[[List[CompressionEvent]], None]] = None, 274 ): 275 self.sessions_dir = Path(sessions_dir) 276 self.on_change = on_change or self._default_on_change 277 self.on_resonance = on_resonance 278 279 self._observer: Optional[Observer] = None 280 self._thread_states: Dict[str, ThreadState] = {} 281 self._checkpoint_count = 0 282 self._stop_event = Event() 283 284 if not WATCHDOG_AVAILABLE: 285 logger.warning( 286 "watchdog not installed. Install with: pip install watchdog" 287 ) 288 289 def start(self) -> None: 290 """Start watching for file changes.""" 291 if not WATCHDOG_AVAILABLE: 292 raise RuntimeError( 293 "watchdog library required. Install with: pip install watchdog" 294 ) 295 296 if not self.sessions_dir.exists(): 297 raise FileNotFoundError(f"Sessions directory not found: {self.sessions_dir}") 298 299 # Initialize with existing files 300 self._scan_existing_files() 301 302 # Set up file system observer 303 handler = CompressionFileHandler(self._handle_change) 304 self._observer = Observer() 305 self._observer.schedule(handler, str(self.sessions_dir), recursive=False) 306 self._observer.start() 307 308 logger.info(f"FileWatcher started. Watching: {self.sessions_dir}") 309 310 def stop(self) -> None: 311 """Stop watching for file changes.""" 312 self._stop_event.set() 313 if self._observer: 314 self._observer.stop() 315 self._observer.join() 316 logger.info("FileWatcher stopped.") 317 318 def _scan_existing_files(self) -> None: 319 """Scan existing LIVE-COMPRESSION files on startup.""" 320 for path in self.sessions_dir.glob("LIVE-COMPRESSION*.md"): 321 handler = CompressionFileHandler(lambda e: None) 322 event = handler._parse_compression_file(path) 323 if event: 324 self._update_thread_state(event) 325 logger.info(f"Found existing compression file: {path.name}") 326 327 def _handle_change(self, event: CompressionEvent) -> None: 328 """Handle a compression file change.""" 329 logger.info(f"Compression file changed: {event.thread_id}") 330 331 # Update internal state 332 self._update_thread_state(event) 333 334 # Increment checkpoint count 335 self._checkpoint_count += 1 336 337 # Call user callback 338 self.on_change(event) 339 340 # Check for resonance every change 341 if self.on_resonance and len(self._thread_states) > 1: 342 resonance = self._detect_resonance() 343 if resonance: 344 self.on_resonance(resonance) 345 346 # Check if Mission Control synthesis should trigger 347 if self._checkpoint_count >= 5: 348 logger.info("5 FO checkpoints reached - Mission Control synthesis trigger") 349 self._checkpoint_count = 0 350 351 def _update_thread_state(self, event: CompressionEvent) -> None: 352 """Update internal state for a thread.""" 353 thread_id = event.thread_id 354 355 if thread_id in self._thread_states: 356 state = self._thread_states[thread_id] 357 state.last_modified = event.timestamp 358 state.checkpoint_count += 1 359 state.patterns.update(event.patterns) 360 state.gravity_wells.update(event.gravity_wells) 361 else: 362 self._thread_states[thread_id] = ThreadState( 363 thread_id=thread_id, 364 path=event.path, 365 last_modified=event.timestamp, 366 checkpoint_count=1, 367 patterns=set(event.patterns), 368 gravity_wells=event.gravity_wells 369 ) 370 371 def _detect_resonance(self) -> Optional[List[CompressionEvent]]: 372 """ 373 Detect resonance across threads. 374 375 Resonance = shared patterns appearing in multiple threads. 376 """ 377 if len(self._thread_states) < 2: 378 return None 379 380 # Find patterns that appear in multiple threads 381 all_patterns: Dict[str, Set[str]] = {} # pattern -> set of thread_ids 382 383 for thread_id, state in self._thread_states.items(): 384 for pattern in state.patterns: 385 pattern_lower = pattern.lower() 386 if pattern_lower not in all_patterns: 387 all_patterns[pattern_lower] = set() 388 all_patterns[pattern_lower].add(thread_id) 389 390 # Find shared patterns (in 2+ threads) 391 shared = { 392 pattern: threads 393 for pattern, threads in all_patterns.items() 394 if len(threads) >= 2 395 } 396 397 if shared: 398 logger.info(f"Resonance detected! Shared patterns: {list(shared.keys())}") 399 # Return events for all affected threads 400 affected_threads = set() 401 for threads in shared.values(): 402 affected_threads.update(threads) 403 404 # Create events for affected threads 405 events = [] 406 for thread_id in affected_threads: 407 state = self._thread_states[thread_id] 408 events.append(CompressionEvent( 409 path=state.path, 410 thread_id=thread_id, 411 timestamp=state.last_modified, 412 patterns=list(state.patterns) 413 )) 414 return events 415 416 return None 417 418 def _default_on_change(self, event: CompressionEvent) -> None: 419 """Default change handler - just logs.""" 420 logger.info( 421 f"[{event.thread_id}] Changed at {event.timestamp}. " 422 f"Patterns: {len(event.patterns)}, " 423 f"Gravity wells: {len(event.gravity_wells)}" 424 ) 425 426 @property 427 def thread_states(self) -> Dict[str, ThreadState]: 428 """Get current state of all watched threads.""" 429 return self._thread_states.copy() 430 431 @property 432 def total_checkpoints(self) -> int: 433 """Get total checkpoint count across all threads.""" 434 return sum(s.checkpoint_count for s in self._thread_states.values()) 435 436 def get_shared_patterns(self) -> Dict[str, Set[str]]: 437 """Get patterns that appear in multiple threads.""" 438 all_patterns: Dict[str, Set[str]] = {} 439 440 for thread_id, state in self._thread_states.items(): 441 for pattern in state.patterns: 442 pattern_lower = pattern.lower() 443 if pattern_lower not in all_patterns: 444 all_patterns[pattern_lower] = set() 445 all_patterns[pattern_lower].add(thread_id) 446 447 return {p: t for p, t in all_patterns.items() if len(t) >= 2} 448 449 450 # CLI for testing 451 if __name__ == "__main__": 452 import sys 453 454 logging.basicConfig( 455 level=logging.INFO, 456 format="%(asctime)s [%(levelname)s] %(message)s" 457 ) 458 459 sessions_dir = sys.argv[1] if len(sys.argv) > 1 else "./sessions" 460 461 def on_change(event: CompressionEvent): 462 print(f"\n{'='*60}") 463 print(f"CHANGE DETECTED: {event.thread_id}") 464 print(f"Path: {event.path}") 465 print(f"Patterns: {event.patterns[:5]}...") # First 5 466 print(f"Gravity wells: {event.gravity_wells}") 467 print(f"Has cross-thread flag: {event.has_cross_thread_flag}") 468 print(f"{'='*60}\n") 469 470 def on_resonance(events: List[CompressionEvent]): 471 print(f"\n{'*'*60}") 472 print("RESONANCE DETECTED!") 473 print(f"Affected threads: {[e.thread_id for e in events]}") 474 print(f"{'*'*60}\n") 475 476 watcher = FileWatcher( 477 sessions_dir=sessions_dir, 478 on_change=on_change, 479 on_resonance=on_resonance 480 ) 481 482 print(f"Starting FileWatcher on {sessions_dir}") 483 print("Press Ctrl+C to stop\n") 484 485 try: 486 watcher.start() 487 while True: 488 time.sleep(1) 489 except KeyboardInterrupt: 490 print("\nStopping...") 491 watcher.stop()