/ core / metacog / mission_control.py
mission_control.py
  1  """
  2  Mission Control for Sovereign OS
  3  
  4  The orchestration layer that:
  5  1. Watches all LIVE-COMPRESSION files
  6  2. Detects cross-thread resonance
  7  3. Produces DAILY-SYNTHESIS
  8  4. Manages temporal compaction
  9  5. Writes RESONANCE-ALERTS
 10  6. Tracks gravity well formation and topology
 11  
 12  Run as a daemon:
 13      python -m core.metacog.mission_control /path/to/sessions
 14  """
 15  
 16  import os
 17  import sys
 18  import json
 19  import logging
 20  import time
 21  import urllib.request
 22  import urllib.error
 23  from pathlib import Path
 24  from datetime import datetime, date
 25  from typing import Dict, List, Optional, Set, Any
 26  from dataclasses import dataclass, field
 27  
 28  from ..sync.file_watcher import FileWatcher, CompressionEvent
 29  from .resonance import (
 30      ResonanceDetector, Resonance, ThreadSnapshot, ResonanceType,
 31      AXIOM_FIELDS, calculate_axiom_resonance, get_dominant_axiom
 32  )
 33  from ..theory_of_mind.cognitive_fingerprint import (
 34      GravityWell, TemporalScope, AltitudeScope, CognitiveFingerprint
 35  )
 36  from ..graph.gravity_topology import (
 37      GravityTopologyRenderer, WellFormationEvent
 38  )
 39  
 40  logger = logging.getLogger(__name__)
 41  
 42  # Mesh network integration (N of X - all instances share Mission Control state)
 43  MESH_HTTP_PORT = 7778
 44  
 45  
 46  def publish_to_mesh(message_type: str, payload: Dict[str, Any]) -> bool:
 47      """Publish a Mission Control event to the sovereign mesh network.
 48  
 49      Args:
 50          message_type: Event type (mc_synthesis, mc_alert, gravity_well_formed, etc.)
 51          payload: Event data to broadcast
 52  
 53      Returns:
 54          True if published successfully, False otherwise
 55      """
 56      try:
 57          msg = json.dumps({
 58              "type": message_type,
 59              "from": "mission-control",
 60              "payload": payload,
 61              "timestamp": datetime.now().isoformat()
 62          }).encode('utf-8')
 63  
 64          req = urllib.request.Request(
 65              f"http://localhost:{MESH_HTTP_PORT}/publish",
 66              data=msg,
 67              headers={"Content-Type": "application/json"},
 68              method="POST"
 69          )
 70          with urllib.request.urlopen(req, timeout=2) as resp:
 71              return resp.status == 200
 72      except (urllib.error.URLError, TimeoutError, ConnectionRefusedError):
 73          # Mesh not available - not critical, continue operation
 74          return False
 75      except Exception:
 76          return False
 77  
 78  
 79  @dataclass
 80  class MissionControlConfig:
 81      """Configuration for Mission Control."""
 82  
 83      sessions_dir: Path
 84      synthesis_interval_checkpoints: int = 5  # Every N FO checkpoints
 85      resonance_check_interval: float = 30.0  # Seconds between resonance checks
 86      daily_synthesis_time: str = "23:00"  # When to run daily synthesis
 87      alert_dir_name: str = "RESONANCE-ALERTS"
 88  
 89      # Gravity well formation thresholds
 90      gravity_well_edge_threshold: int = 10  # Edges needed to form well
 91      gravity_well_cross_session_threshold: int = 2  # Sessions needed for cross-session well
 92  
 93      @classmethod
 94      def from_env(cls, sessions_dir: Path) -> "MissionControlConfig":
 95          """Load config from environment variables."""
 96          return cls(
 97              sessions_dir=sessions_dir,
 98              synthesis_interval_checkpoints=int(
 99                  os.environ.get("MC_SYNTHESIS_INTERVAL", "5")
100              ),
101              resonance_check_interval=float(
102                  os.environ.get("MC_RESONANCE_INTERVAL", "30.0")
103              ),
104          )
105  
106  
107  class MissionControl:
108      """
109      Mission Control - Cross-thread synthesis and awareness.
110      
111      Hierarchy:
112          YOU (super-metacognition)
113114          MISSION CONTROL (this)
115116          FIRST OFFICERS (per-thread)
117118          THREADS (conversations)
119      """
120      
121      def __init__(self, config: MissionControlConfig):
122          self.config = config
123          self._watcher: Optional[FileWatcher] = None
124          self._detector = ResonanceDetector()
125          self._checkpoint_count = 0
126          self._last_synthesis: Optional[datetime] = None
127          self._running = False
128  
129          # Gravity well tracking
130          self._gravity_wells: Dict[str, GravityWell] = {}
131          self._well_formation_events: List[WellFormationEvent] = []
132          self._concept_thread_counts: Dict[str, Set[str]] = {}  # concept -> threads
133          self._topology_renderer = GravityTopologyRenderer(config.sessions_dir)
134          self._last_topology_update: Optional[datetime] = None
135  
136          # Typed resonance tracking (axiom clustering)
137          self._thread_axioms: Dict[str, Dict[str, float]] = {}  # thread_id -> {axiom: score}
138          self._thread_dominant_axiom: Dict[str, str] = {}  # thread_id -> axiom_id
139  
140          # Ensure directories exist
141          self._ensure_directories()
142      
143      def _ensure_directories(self) -> None:
144          """Ensure required directories exist."""
145          alert_dir = self.config.sessions_dir / self.config.alert_dir_name
146          alert_dir.mkdir(parents=True, exist_ok=True)
147          
148          synthesis_dir = self.config.sessions_dir / "synthesis"
149          for subdir in ["daily", "weekly", "monthly", "quarterly", "yearly"]:
150              (synthesis_dir / subdir).mkdir(parents=True, exist_ok=True)
151      
152      def start(self) -> None:
153          """Start Mission Control."""
154          logger.info("Starting Mission Control...")
155          
156          self._watcher = FileWatcher(
157              sessions_dir=self.config.sessions_dir,
158              on_change=self._on_compression_change,
159              on_resonance=self._on_resonance_detected
160          )
161          
162          self._running = True
163          self._watcher.start()
164          
165          logger.info(f"Mission Control active. Watching: {self.config.sessions_dir}")
166          logger.info(f"Synthesis triggers every {self.config.synthesis_interval_checkpoints} FO checkpoints")
167      
168      def stop(self) -> None:
169          """Stop Mission Control."""
170          self._running = False
171          if self._watcher:
172              self._watcher.stop()
173          logger.info("Mission Control stopped.")
174      
175      def run_forever(self) -> None:
176          """Run Mission Control as a daemon."""
177          self.start()
178          
179          try:
180              while self._running:
181                  # Periodic resonance check
182                  time.sleep(self.config.resonance_check_interval)
183                  
184                  # Check for daily synthesis trigger
185                  self._check_daily_synthesis()
186                  
187          except KeyboardInterrupt:
188              logger.info("Interrupt received, shutting down...")
189          finally:
190              self.stop()
191      
192      def _on_compression_change(self, event: CompressionEvent) -> None:
193          """Handle a LIVE-COMPRESSION file change."""
194          logger.info(f"FO checkpoint: {event.thread_id}")
195  
196          # Update resonance detector
197          snapshot = ThreadSnapshot(
198              thread_id=event.thread_id,
199              patterns=set(event.patterns),
200              conclusions=[],  # Would extract from content
201              dependencies=self._extract_dependencies(event),
202              gravity_wells=event.gravity_wells,
203              last_updated=event.timestamp
204          )
205          self._detector.update_thread(snapshot)
206  
207          # Track axiom resonance for this thread (typed resonance)
208          self._update_thread_axiom_resonance(event.thread_id, set(event.patterns))
209  
210          # Track concepts for gravity well formation
211          self._track_concepts_for_wells(event)
212  
213          # Check for new gravity well formation
214          new_wells = self._check_gravity_well_formation(event)
215          if new_wells:
216              logger.info(f"New gravity wells formed: {[w.concept for w in new_wells]}")
217              self._update_topology()
218  
219          # Increment checkpoint counter
220          self._checkpoint_count += 1
221  
222          # Check if synthesis should trigger
223          if self._checkpoint_count >= self.config.synthesis_interval_checkpoints:
224              logger.info(f"Synthesis trigger: {self._checkpoint_count} checkpoints reached")
225              self._run_synthesis()
226              self._checkpoint_count = 0
227  
228          # Check for resonance
229          resonances = self._detector.detect()
230          if resonances:
231              self._handle_resonances(resonances)
232      
233      def _on_resonance_detected(self, events: List[CompressionEvent]) -> None:
234          """Handle resonance detected by file watcher."""
235          logger.info(f"Resonance callback: {len(events)} affected threads")
236      
237      def _extract_dependencies(self, event: CompressionEvent) -> List[str]:
238          """Extract thread dependencies from compression content."""
239          dependencies = []
240          
241          # Look for cross-thread flags
242          if "Cross-Thread Flag" in event.raw_content:
243              import re
244              match = re.search(
245                  r"Related thread:\s*([^\n]+)", 
246                  event.raw_content
247              )
248              if match:
249                  dependencies.append(match.group(1).strip())
250          
251          # Look for "blocked by" mentions
252          if "blocked by" in event.raw_content.lower():
253              import re
254              matches = re.findall(
255                  r"blocked by[:\s]+([^\n,]+)", 
256                  event.raw_content,
257                  re.IGNORECASE
258              )
259              dependencies.extend([m.strip() for m in matches])
260          
261          return dependencies
262      
263      def _handle_resonances(self, resonances: List[Resonance]) -> None:
264          """Handle newly detected resonances."""
265          for resonance in resonances:
266              if resonance.should_bubble or resonance.should_highlight:
267                  self._write_resonance_alert(resonance)
268                  logger.info(
269                      f"Resonance alert written: {resonance.type.value} - {resonance.pattern}"
270                  )
271  
272      # === TYPED RESONANCE TRACKING ===
273  
274      def _update_thread_axiom_resonance(self, thread_id: str, patterns: Set[str]) -> None:
275          """Update axiom resonance tracking for a thread."""
276          # Calculate axiom resonance from patterns
277          axiom_scores = calculate_axiom_resonance(patterns)
278          self._thread_axioms[thread_id] = axiom_scores
279  
280          # Find dominant axiom
281          dominant = get_dominant_axiom(patterns)
282          if dominant:
283              self._thread_dominant_axiom[thread_id] = dominant[0]
284              logger.debug(
285                  f"Thread {thread_id} dominant axiom: {dominant[0]} ({dominant[1]:.0%})"
286              )
287          elif thread_id in self._thread_dominant_axiom:
288              del self._thread_dominant_axiom[thread_id]
289  
290      def _generate_axiom_clustering_section(self, thread_states: Dict) -> str:
291          """Generate the axiom clustering section for synthesis."""
292          if not self._thread_dominant_axiom:
293              return "| (no axiom clustering detected yet) |"
294  
295          lines = [
296              "| Axiom | Name | Threads | Avg Resonance |",
297              "|-------|------|---------|---------------|"
298          ]
299  
300          # Group threads by dominant axiom
301          axiom_threads: Dict[str, List[str]] = {}
302          for thread_id, axiom_id in self._thread_dominant_axiom.items():
303              if axiom_id not in axiom_threads:
304                  axiom_threads[axiom_id] = []
305              axiom_threads[axiom_id].append(thread_id)
306  
307          # Generate rows sorted by thread count
308          for axiom_id in sorted(axiom_threads.keys(), key=lambda a: len(axiom_threads[a]), reverse=True):
309              threads = axiom_threads[axiom_id]
310              axiom_name = AXIOM_FIELDS[axiom_id]["name"]
311  
312              # Calculate average resonance
313              total = 0.0
314              for tid in threads:
315                  if tid in self._thread_axioms and axiom_id in self._thread_axioms[tid]:
316                      total += self._thread_axioms[tid][axiom_id]
317              avg = total / len(threads) if threads else 0
318  
319              thread_str = ', '.join(threads[:3])
320              if len(threads) > 3:
321                  thread_str += f"... (+{len(threads)-3})"
322  
323              lines.append(f"| **{axiom_id}** | {axiom_name} | {thread_str} | {avg:.0%} |")
324  
325          # Add insight about convergence
326          if len(axiom_threads) >= 2:
327              lines.append("")
328              lines.append("**Insight:** Threads cluster around different principles - ")
329              lines.append("potential for cross-axiom synthesis or pole navigation.")
330  
331          return '\n'.join(lines)
332  
333      # === GRAVITY WELL TRACKING ===
334  
335      def _track_concepts_for_wells(self, event: CompressionEvent) -> None:
336          """Track which concepts appear in which threads for cross-session detection."""
337          # Extract concepts from patterns and gravity wells
338          concepts = set()
339  
340          for pattern in event.patterns:
341              # Normalize pattern to concept
342              concept = pattern.lower().strip()
343              if len(concept) >= 3:
344                  concepts.add(concept)
345  
346          # Also track explicit gravity wells from the compression
347          for well_name, strength in event.gravity_wells.items():
348              concepts.add(well_name.lower().strip())
349  
350          # Update thread counts
351          for concept in concepts:
352              if concept not in self._concept_thread_counts:
353                  self._concept_thread_counts[concept] = set()
354              self._concept_thread_counts[concept].add(event.thread_id)
355  
356      def _check_gravity_well_formation(self, event: CompressionEvent) -> List[GravityWell]:
357          """Check if any new gravity wells should form."""
358          new_wells = []
359  
360          # Check for cross-session wells (concept in multiple threads)
361          for concept, threads in self._concept_thread_counts.items():
362              if concept in self._gravity_wells:
363                  # Already exists, just activate
364                  self._gravity_wells[concept].activate()
365                  continue
366  
367              # Cross-session trigger
368              if len(threads) >= self.config.gravity_well_cross_session_threshold:
369                  well = self._create_gravity_well(
370                      concept=concept,
371                      trigger="cross_session",
372                      source_threads=list(threads),
373                      context=f"Appeared in {len(threads)} concurrent threads"
374                  )
375                  new_wells.append(well)
376  
377          # Check gravity wells reported by thread compression
378          for well_name, strength in event.gravity_wells.items():
379              normalized = well_name.lower().strip()
380              if normalized not in self._gravity_wells:
381                  # New well from thread
382                  well = self._create_gravity_well(
383                      concept=normalized,
384                      trigger="thread_report",
385                      source_threads=[event.thread_id],
386                      initial_mass=strength,
387                      context=f"Reported by thread {event.thread_id}"
388                  )
389                  new_wells.append(well)
390              else:
391                  # Existing well - update mass if higher
392                  existing = self._gravity_wells[normalized]
393                  if strength > existing.mass:
394                      existing.mass = strength
395                  existing.activate()
396  
397          return new_wells
398  
399      def _create_gravity_well(
400          self,
401          concept: str,
402          trigger: str,
403          source_threads: List[str],
404          initial_mass: float = 0.5,
405          context: str = ""
406      ) -> GravityWell:
407          """Create a new gravity well and record formation event."""
408          # Infer scope from trigger and context
409          temporal_scope = self._infer_temporal_scope(concept, trigger)
410          altitude_scope = self._infer_altitude_scope(concept)
411  
412          well = GravityWell(
413              concept=concept,
414              mass=initial_mass,
415              temporal_scope=temporal_scope,
416              altitude_scope=altitude_scope,
417              source_nodes=source_threads,
418              formation_context=context
419          )
420  
421          # Record formation event
422          formation_event = WellFormationEvent(
423              well_concept=concept,
424              formed_at=datetime.now(),
425              source_nodes=source_threads,
426              formation_trigger=trigger,
427              initial_mass=initial_mass,
428              context=context
429          )
430          self._well_formation_events.append(formation_event)
431  
432          # Store well
433          self._gravity_wells[concept] = well
434  
435          logger.info(f"Gravity well formed: {concept} (trigger={trigger}, mass={initial_mass:.2f})")
436  
437          # Publish to mesh (N of X - all instances see gravity well formation)
438          publish_to_mesh("gravity_well_formed", {
439              "concept": concept,
440              "trigger": trigger,
441              "source_threads": source_threads,
442              "initial_mass": initial_mass,
443              "temporal_scope": temporal_scope.value,
444              "altitude_scope": altitude_scope.value,
445              "context": context[:200]  # Truncate for mesh
446          })
447  
448          return well
449  
450      def _infer_temporal_scope(self, concept: str, trigger: str) -> TemporalScope:
451          """Infer temporal scope from concept and formation trigger."""
452          # Permanent concepts (axioms, core principles)
453          permanent_markers = ['axiom', 'principle', 'bedrock', 'always', 'never', 'core']
454          if any(marker in concept.lower() for marker in permanent_markers):
455              return TemporalScope.PERMANENT
456  
457          # Seasonal concepts (projects, major features)
458          seasonal_markers = ['architecture', 'system', 'framework', 'protocol']
459          if any(marker in concept.lower() for marker in seasonal_markers):
460              return TemporalScope.SEASONAL
461  
462          # Ephemeral if single-thread, contextual if cross-session
463          if trigger == "cross_session":
464              return TemporalScope.CONTEXTUAL
465          else:
466              return TemporalScope.EPHEMERAL
467  
468      def _infer_altitude_scope(self, concept: str) -> AltitudeScope:
469          """Infer altitude scope from concept."""
470          # Philosophical markers
471          if any(m in concept.lower() for m in ['axiom', 'principle', 'meaning', 'purpose', 'why']):
472              return AltitudeScope.PHILOSOPHICAL
473  
474          # Strategic markers
475          if any(m in concept.lower() for m in ['architecture', 'design', 'strategy', 'system']):
476              return AltitudeScope.STRATEGIC
477  
478          # Tactical markers
479          if any(m in concept.lower() for m in ['implementation', 'pattern', 'how', 'approach']):
480              return AltitudeScope.TACTICAL
481  
482          # Operational markers
483          if any(m in concept.lower() for m in ['fix', 'bug', 'task', 'todo', 'build']):
484              return AltitudeScope.OPERATIONAL
485  
486          return AltitudeScope.ALL
487  
488      def _update_topology(self) -> None:
489          """Update the gravity topology visualization."""
490          if not self._gravity_wells:
491              return
492  
493          wells = list(self._gravity_wells.values())
494          output_path = self._topology_renderer.render(
495              wells=wells,
496              formation_events=self._well_formation_events[-20:],  # Last 20 events
497          )
498          self._last_topology_update = datetime.now()
499          logger.info(f"Gravity topology updated: {output_path}")
500  
501      def _generate_gravity_wells_section(self) -> str:
502          """Generate the gravity wells section for synthesis."""
503          if not self._gravity_wells:
504              return "| (no gravity wells detected yet) | - | - | - |"
505  
506          lines = [
507              "| Well | Mass | Scope | Threads |",
508              "|------|------|-------|---------|"
509          ]
510  
511          # Sort by mass descending
512          sorted_wells = sorted(
513              self._gravity_wells.values(),
514              key=lambda w: w.mass,
515              reverse=True
516          )
517  
518          for well in sorted_wells[:10]:  # Top 10
519              temporal = well.temporal_scope.value[:4]  # Abbreviate
520              altitude = well.altitude_scope.value[:4]
521              scope = f"{temporal}/{altitude}"
522  
523              threads = self._concept_thread_counts.get(well.concept, set())
524              thread_str = ', '.join(list(threads)[:3])
525              if len(threads) > 3:
526                  thread_str += "..."
527  
528              lines.append(
529                  f"| **{well.concept}** | {well.mass:.2f} | {scope} | {thread_str} |"
530              )
531  
532          return '\n'.join(lines)
533      
534      def _write_resonance_alert(self, resonance: Resonance) -> None:
535          """Write a resonance alert file."""
536          alert_dir = self.config.sessions_dir / self.config.alert_dir_name
537  
538          timestamp = resonance.timestamp.strftime("%Y%m%d-%H%M%S")
539          filename = f"{timestamp}-{resonance.type.value}.md"
540          filepath = alert_dir / filename
541  
542          filepath.write_text(resonance.to_alert_markdown(), encoding="utf-8")
543  
544          # Publish to mesh (N of X - all instances see MC alerts)
545          publish_to_mesh("mc_resonance_alert", {
546              "type": resonance.type.value,
547              "pattern": resonance.pattern,
548              "threads": resonance.threads,
549              "strength": resonance.strength,
550              "urgency": resonance.urgency,
551              "axiom_type": resonance.axiom_type,
552              "should_bubble": resonance.should_bubble,
553              "file": str(filepath.name)
554          })
555      
556      def _check_daily_synthesis(self) -> None:
557          """Check if daily synthesis should run."""
558          now = datetime.now()
559          target_time = datetime.strptime(
560              f"{now.date()} {self.config.daily_synthesis_time}",
561              "%Y-%m-%d %H:%M"
562          )
563          
564          # If past target time and haven't run today
565          if now >= target_time:
566              if self._last_synthesis is None or self._last_synthesis.date() < now.date():
567                  logger.info("Daily synthesis trigger (end of day)")
568                  self._run_daily_synthesis()
569      
570      def _run_synthesis(self) -> None:
571          """Run Mission Control synthesis."""
572          logger.info("Running Mission Control synthesis...")
573  
574          # Collect all thread states
575          thread_states = self._watcher.thread_states if self._watcher else {}
576  
577          # Generate synthesis
578          synthesis = self._generate_synthesis(thread_states)
579  
580          # Write to DAILY-SYNTHESIS.md
581          output_path = self.config.sessions_dir / "DAILY-SYNTHESIS.md"
582          output_path.write_text(synthesis, encoding="utf-8")
583  
584          # Update gravity topology
585          self._update_topology()
586  
587          self._last_synthesis = datetime.now()
588          logger.info(f"Synthesis written to {output_path}")
589  
590          # Publish to mesh (N of X - all instances see synthesis events)
591          resonances = self._detector.get_all_resonances()
592          publish_to_mesh("mc_synthesis", {
593              "active_threads": len(thread_states),
594              "thread_ids": list(thread_states.keys()),
595              "total_resonances": len(resonances),
596              "high_urgency_count": len(self._detector.get_high_urgency_resonances()),
597              "gravity_wells": len(self._gravity_wells),
598              "checkpoint_count": self._checkpoint_count
599          })
600      
601      def _run_daily_synthesis(self) -> None:
602          """Run end-of-day synthesis with archival."""
603          self._run_synthesis()
604          
605          # Archive to daily folder
606          now = datetime.now()
607          archive_path = (
608              self.config.sessions_dir / 
609              "synthesis" / 
610              "daily" / 
611              f"{now.strftime('%Y-%m-%d')}-SYNTHESIS.md"
612          )
613          
614          # Copy current synthesis to archive
615          current = self.config.sessions_dir / "DAILY-SYNTHESIS.md"
616          if current.exists():
617              archive_path.write_text(current.read_text(), encoding="utf-8")
618              logger.info(f"Daily synthesis archived to {archive_path}")
619      
620      def _generate_synthesis(self, thread_states: Dict) -> str:
621          """Generate synthesis markdown."""
622          now = datetime.now()
623          
624          # Build thread overview table
625          thread_rows = []
626          for thread_id, state in thread_states.items():
627              thread_rows.append(
628                  f"| {thread_id} | FO-{hash(thread_id) % 1000:03d} | "
629                  f"{state.last_modified.strftime('%H:%M')} | ACTIVE | "
630                  f"{len(state.patterns)} patterns |"
631              )
632          
633          thread_table = "\n".join(thread_rows) if thread_rows else "| (no active threads) | - | - | - | - |"
634          
635          # Get resonances
636          resonances = self._detector.get_all_resonances()
637          high_urgency = self._detector.get_high_urgency_resonances()
638          
639          # Build resonance section (with axiom type)
640          resonance_section = ""
641          if resonances:
642              resonance_rows = []
643              for r in resonances[:10]:  # Top 10
644                  axiom_tag = f" [{r.axiom_type}]" if r.axiom_type else ""
645                  resonance_rows.append(
646                      f"| {r.type.value}{axiom_tag} | {r.pattern[:30]} | "
647                      f"{', '.join(r.threads[:3])} | {r.urgency:.2f} |"
648                  )
649              resonance_section = """
650  ## Cross-Thread Resonances
651  
652  | Type | Pattern | Threads | Urgency |
653  |------|---------|---------|---------|
654  """ + "\n".join(resonance_rows)
655          
656          # Build alerts section
657          alerts_section = ""
658          if high_urgency:
659              alert_items = []
660              for r in high_urgency:
661                  emoji = "!" if r.should_bubble else "?"
662                  alert_items.append(f"- {emoji} **{r.type.value}**: {r.pattern}")
663              alerts_section = """
664  ## High Priority Alerts
665  
666  """ + "\n".join(alert_items)
667          
668          return f"""# Daily Synthesis - {now.strftime('%Y-%m-%d')}
669  
670  *Mission Control synthesis across all threads*
671  
672  **Updated:** {now.strftime('%H:%M')}
673  **FO Checkpoints Since Last Synthesis:** {self._checkpoint_count}
674  **Active Threads:** {len(thread_states)}
675  **Total Resonances:** {len(resonances)}
676  
677  ---
678  
679  ## Thread Overview
680  
681  | Thread | First Officer | Last CP | Status | Focus |
682  |--------|---------------|---------|--------|-------|
683  {thread_table}
684  
685  ---
686  
687  ## Axiom Clustering (Typed Resonance)
688  
689  *Threads grouped by dominant principle - typed resonance reveals meaning*
690  
691  {self._generate_axiom_clustering_section(thread_states)}
692  
693  ---
694  {resonance_section}
695  {alerts_section}
696  ---
697  
698  ## Gravity Wells (System-Wide)
699  
700  {self._generate_gravity_wells_section()}
701  
702  **View full topology:** [GRAVITY-TOPOLOGY.md](GRAVITY-TOPOLOGY.md)
703  
704  ---
705  
706  ## Resurrection Seed
707  
708  **If starting fresh, bootstrap with:**
709  
710  Active threads: {list(thread_states.keys()) if thread_states else 'none'}
711  Checkpoint count: {self._checkpoint_count}
712  Resonances detected: {len(resonances)}
713  High urgency items: {len(high_urgency)}
714  
715  See individual LIVE-COMPRESSION-*.md files for thread details.
716  
717  ---
718  
719  *Mission Control Synthesis | {now.strftime('%Y-%m-%d %H:%M')}*
720  """
721  
722  
723  def main():
724      """CLI entry point."""
725      import argparse
726      
727      parser = argparse.ArgumentParser(
728          description="Mission Control - Cross-thread synthesis daemon"
729      )
730      parser.add_argument(
731          "sessions_dir",
732          type=Path,
733          help="Path to sessions directory"
734      )
735      parser.add_argument(
736          "--interval",
737          type=int,
738          default=5,
739          help="Synthesis interval in FO checkpoints (default: 5)"
740      )
741      parser.add_argument(
742          "--debug",
743          action="store_true",
744          help="Enable debug logging"
745      )
746      
747      args = parser.parse_args()
748      
749      # Set up logging
750      logging.basicConfig(
751          level=logging.DEBUG if args.debug else logging.INFO,
752          format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
753      )
754      
755      # Create config
756      config = MissionControlConfig(
757          sessions_dir=args.sessions_dir,
758          synthesis_interval_checkpoints=args.interval
759      )
760      
761      # Run Mission Control
762      mc = MissionControl(config)
763      
764      print(f"""
765  ╔════════════════════════════════════════════════════════════╗
766  ║                    MISSION CONTROL                          ║
767  ║              Sovereign OS Cross-Thread Synthesis            ║
768  ╠════════════════════════════════════════════════════════════╣
769  ║  Watching: {str(config.sessions_dir):<47} ║
770  ║  Synthesis every: {config.synthesis_interval_checkpoints} FO checkpoints                        ║
771  ║                                                            ║
772  ║  Press Ctrl+C to stop                                      ║
773  ╚════════════════════════════════════════════════════════════╝
774  """)
775      
776      mc.run_forever()
777  
778  
779  if __name__ == "__main__":
780      main()