meta_fo.py
1 #!/usr/bin/env python3 2 """ 3 Meta First Officer - Cross-Instance Coordinator 4 5 The mesh is the bus. Meta FO is the brain. 6 7 Responsibilities: 8 1. Subscribe to all mesh traffic 9 2. Aggregate state from Claude instances 10 3. Detect conflicts (file edit collisions, divergent work) 11 4. Maintain global graph state 12 5. Publish coordination messages 13 14 Architecture: 15 ┌─────────────────────────────────────────┐ 16 │ META FIRST OFFICER │ 17 │ (subscribes, aggregates, coordinates) │ 18 └───────────────────┬─────────────────────┘ 19 │ 20 ┌───────▼───────┐ 21 │ SOVEREIGN │ 22 │ MESH │ 23 └───────┬───────┘ 24 │ 25 ┌───────────────┼───────────────┐ 26 │ │ │ 27 [Claude 1] [Claude 2] [Claude N] 28 29 Usage: 30 python3 scripts/meta_fo.py # Run once 31 python3 scripts/meta_fo.py --watch # Continuous monitoring 32 python3 scripts/meta_fo.py --status # Show cross-instance status 33 """ 34 35 import json 36 import sys 37 import time 38 import os 39 import re 40 import hashlib 41 import urllib.request 42 import urllib.error 43 from pathlib import Path 44 from datetime import datetime, timedelta 45 from typing import Dict, List, Any, Optional, Set 46 from dataclasses import dataclass, field, asdict 47 from collections import defaultdict 48 49 # Paths 50 SOVEREIGN_OS = Path(__file__).parent.parent 51 SESSIONS_DIR = SOVEREIGN_OS / "sessions" 52 CONFIG_DIR = Path.home() / ".sovereign" 53 MESH_LOG = Path.home() / "repos" / "Sovereign_Estate" / "logs" / "sovereign-mesh.log" 54 55 # Output files 56 META_FO_STATE = SESSIONS_DIR / "META-FO-STATE.json" 57 CROSS_INSTANCE_MD = SESSIONS_DIR / "CROSS-INSTANCE.md" 58 59 # Mesh config 60 MESH_PORT = 7778 61 MESH_URL = f"http://localhost:{MESH_PORT}" 62 63 64 @dataclass 65 class InstanceState: 66 """State of a single Claude instance.""" 67 instance_id: str 68 name: str = "" 69 last_seen: str = "" 70 current_work: str = "" 71 files_touched: List[str] = field(default_factory=list) 72 f_score: float = 0.0 73 insights_count: int = 0 74 messages: List[Dict] = field(default_factory=list) 75 76 77 @dataclass 78 class ConflictAlert: 79 """A detected conflict between instances.""" 80 conflict_type: str # 'file_collision', 'work_overlap', 'f_divergence' 81 instances: List[str] 82 details: str 83 severity: str # 'warning', 'critical' 84 timestamp: str = "" 85 resolved: bool = False 86 87 88 @dataclass 89 class PhoenixState: 90 """ 91 Phoenix state for cross-instance resurrection. 92 93 Borrowed from handoff-protocol.md - this is what lets 94 any Claude instance resurrect the full system context. 95 """ 96 # Current focus across all instances (deepest gravity well) 97 global_focus: str = "" 98 thread_anchor: str = "" 99 100 # Session lineage - all active sessions and their branches 101 session_lineage: List[Dict] = field(default_factory=list) 102 103 # Operator altitude (strategic/tactical/operational) 104 operator_altitude: str = "tactical" 105 106 # Momentum (rising/stable/falling) 107 momentum: str = "stable" 108 109 # Last handoff beacon (for crash recovery) 110 recovery_beacon: Dict = field(default_factory=dict) 111 112 113 @dataclass 114 class SalienceContext: 115 """ 116 Dynamic context to combat context compaction. 117 118 This is the "what matters right now" for the entire system. 119 When a Claude instance compacts or resurrects, it reads this 120 to understand current priorities and hot threads. 121 """ 122 # Active gravity wells - where attention is concentrated 123 gravity_wells: List[Dict] = field(default_factory=list) 124 125 # Hot threads - currently active work across instances 126 hot_threads: List[Dict] = field(default_factory=list) 127 128 # Pending decisions - things waiting for human input 129 pending_decisions: List[Dict] = field(default_factory=list) 130 131 # Recent insights - high-value discoveries (last hour) 132 recent_insights: List[Dict] = field(default_factory=list) 133 134 # System alerts - things that need attention 135 alerts: List[Dict] = field(default_factory=list) 136 137 # Resurrection seeds - compressed state per instance 138 resurrection_seeds: Dict[str, str] = field(default_factory=dict) 139 140 # Phoenix state for system-wide resurrection 141 phoenix: PhoenixState = field(default_factory=PhoenixState) 142 143 144 @dataclass 145 class MetaFOState: 146 """Global state maintained by Meta FO.""" 147 instances: Dict[str, InstanceState] = field(default_factory=dict) 148 conflicts: List[ConflictAlert] = field(default_factory=list) 149 global_f_score: float = 0.0 150 total_insights: int = 0 151 graph_node_count: int = 0 152 graph_edge_count: int = 0 153 last_update: str = "" 154 coordination_messages: List[Dict] = field(default_factory=list) 155 156 # Dynamic context for compaction resistance 157 salience: SalienceContext = field(default_factory=SalienceContext) 158 159 160 class MetaFirstOfficer: 161 """ 162 Meta First Officer - coordinates across all Claude instances. 163 164 The mesh is the nervous system. 165 Meta FO is the brain. 166 """ 167 168 def __init__(self): 169 self.state = MetaFOState() 170 self.log_position = 0 # Track position in mesh log 171 self.seen_messages: Set[str] = set() 172 self.load_state() 173 174 def load_state(self): 175 """Load existing state from disk.""" 176 if META_FO_STATE.exists(): 177 try: 178 with open(META_FO_STATE) as f: 179 data = json.load(f) 180 # Reconstruct state from dict 181 self.state.global_f_score = data.get('global_f_score', 0.0) 182 self.state.total_insights = data.get('total_insights', 0) 183 self.state.graph_node_count = data.get('graph_node_count', 0) 184 self.state.graph_edge_count = data.get('graph_edge_count', 0) 185 186 for iid, idata in data.get('instances', {}).items(): 187 self.state.instances[iid] = InstanceState( 188 instance_id=iid, 189 name=idata.get('name', ''), 190 last_seen=idata.get('last_seen', ''), 191 current_work=idata.get('current_work', ''), 192 files_touched=idata.get('files_touched', []), 193 f_score=idata.get('f_score', 0.0) 194 ) 195 except Exception as e: 196 print(f"[meta-fo] Warning: Could not load state: {e}") 197 198 def save_state(self): 199 """Save state to disk.""" 200 self.state.last_update = datetime.now().isoformat() 201 202 # Convert to dict for JSON 203 data = { 204 'instances': {k: asdict(v) for k, v in self.state.instances.items()}, 205 'conflicts': [asdict(c) for c in self.state.conflicts if not c.resolved], 206 'global_f_score': self.state.global_f_score, 207 'total_insights': self.state.total_insights, 208 'graph_node_count': self.state.graph_node_count, 209 'graph_edge_count': self.state.graph_edge_count, 210 'last_update': self.state.last_update, 211 'coordination_messages': self.state.coordination_messages[-20:] # Keep last 20 212 } 213 214 META_FO_STATE.parent.mkdir(parents=True, exist_ok=True) 215 with open(META_FO_STATE, 'w') as f: 216 json.dump(data, f, indent=2) 217 218 def poll_mesh_log(self) -> List[Dict]: 219 """ 220 Poll mesh log for new messages. 221 222 This is the simplest subscription mechanism - watching the log file. 223 Future: direct Hyperswarm connection or WebSocket. 224 """ 225 messages = [] 226 227 if not MESH_LOG.exists(): 228 return messages 229 230 try: 231 with open(MESH_LOG, 'r') as f: 232 f.seek(self.log_position) 233 new_content = f.read() 234 self.log_position = f.tell() 235 236 # Parse HTTP messages from log 237 # Format: [http] ← phone: {"type":"...", ...} 238 for line in new_content.split('\n'): 239 if '[http] ←' in line or '[mesh] ←' in line: 240 # Extract JSON from line 241 match = re.search(r'\{.*', line) 242 if match: 243 try: 244 # Handle truncated JSON (log may cut off) 245 json_str = match.group() 246 # Try to complete truncated JSON 247 if not json_str.endswith('}'): 248 json_str = json_str + '}' 249 msg = json.loads(json_str) 250 251 # Deduplicate 252 msg_hash = hashlib.md5(json.dumps(msg, sort_keys=True).encode()).hexdigest()[:12] 253 if msg_hash not in self.seen_messages: 254 self.seen_messages.add(msg_hash) 255 messages.append(msg) 256 except json.JSONDecodeError: 257 pass 258 except Exception as e: 259 print(f"[meta-fo] Log poll error: {e}") 260 261 return messages 262 263 def process_message(self, msg: Dict): 264 """Process a message from the mesh.""" 265 msg_type = msg.get('type', 'unknown') 266 source = msg.get('from', msg.get('source', 'unknown')) 267 timestamp = msg.get('timestamp', datetime.now().isoformat()) 268 269 # Ensure instance exists 270 if source not in self.state.instances: 271 self.state.instances[source] = InstanceState( 272 instance_id=source, 273 name=source, 274 last_seen=timestamp 275 ) 276 277 instance = self.state.instances[source] 278 instance.last_seen = timestamp 279 instance.messages.append(msg) 280 281 # Process by type 282 if msg_type == 'graph_update': 283 payload = msg.get('payload', {}) 284 self.state.graph_node_count = max(self.state.graph_node_count, payload.get('node_count', 0)) 285 self.state.graph_edge_count = max(self.state.graph_edge_count, payload.get('edge_count', 0)) 286 287 elif msg_type == 'work_start': 288 instance.current_work = msg.get('content', msg.get('work', '')) 289 self.check_work_conflicts(source, instance.current_work) 290 291 elif msg_type == 'file_edit': 292 file_path = msg.get('file', msg.get('path', '')) 293 if file_path and file_path not in instance.files_touched: 294 instance.files_touched.append(file_path) 295 self.check_file_conflicts(source, file_path) 296 297 elif msg_type == 'f_score': 298 instance.f_score = msg.get('f', msg.get('score', 0.0)) 299 self.update_global_f() 300 301 elif msg_type == 'insight': 302 self.state.total_insights += 1 303 instance.insights_count += 1 304 305 elif msg_type == 'claude_message': 306 # General message from Claude instance 307 content = msg.get('content', '') 308 if 'working on' in content.lower(): 309 instance.current_work = content 310 311 def check_file_conflicts(self, source: str, file_path: str): 312 """Check if multiple instances are editing the same file.""" 313 if not file_path: 314 return 315 316 conflicting = [] 317 for iid, inst in self.state.instances.items(): 318 if iid != source and file_path in inst.files_touched: 319 # Check if recent (within 5 minutes) 320 if inst.last_seen: 321 try: 322 last = datetime.fromisoformat(inst.last_seen.replace('Z', '+00:00')) 323 if datetime.now(last.tzinfo) - last < timedelta(minutes=5): 324 conflicting.append(iid) 325 except: 326 conflicting.append(iid) 327 328 if conflicting: 329 conflict = ConflictAlert( 330 conflict_type='file_collision', 331 instances=[source] + conflicting, 332 details=f"Multiple instances editing: {file_path}", 333 severity='critical', 334 timestamp=datetime.now().isoformat() 335 ) 336 self.state.conflicts.append(conflict) 337 self.broadcast_conflict(conflict) 338 339 def check_work_conflicts(self, source: str, work: str): 340 """Check if instances are doing overlapping work.""" 341 if not work: 342 return 343 344 work_lower = work.lower() 345 overlapping = [] 346 347 for iid, inst in self.state.instances.items(): 348 if iid != source and inst.current_work: 349 # Simple overlap detection - shared significant words 350 other_work = inst.current_work.lower() 351 words1 = set(w for w in work_lower.split() if len(w) > 4) 352 words2 = set(w for w in other_work.split() if len(w) > 4) 353 overlap = words1 & words2 354 355 if len(overlap) >= 2: # At least 2 significant words in common 356 overlapping.append((iid, overlap)) 357 358 if overlapping: 359 conflict = ConflictAlert( 360 conflict_type='work_overlap', 361 instances=[source] + [i[0] for i in overlapping], 362 details=f"Possible work overlap detected", 363 severity='warning', 364 timestamp=datetime.now().isoformat() 365 ) 366 self.state.conflicts.append(conflict) 367 368 def update_global_f(self): 369 """Update global F-score as weighted average of instances.""" 370 scores = [i.f_score for i in self.state.instances.values() if i.f_score > 0] 371 if scores: 372 self.state.global_f_score = sum(scores) / len(scores) 373 374 def update_salience(self): 375 """ 376 Update dynamic salience context from all sources. 377 378 This is what a new/resurrecting Claude instance needs to know. 379 """ 380 now = datetime.now() 381 salience = self.state.salience 382 383 # 1. Hot threads from active instances 384 salience.hot_threads = [] 385 for iid, inst in self.state.instances.items(): 386 if inst.current_work and inst.last_seen: 387 try: 388 last = datetime.fromisoformat(inst.last_seen.replace('Z', '+00:00')) 389 age_minutes = (now - last.replace(tzinfo=None)).seconds // 60 390 if age_minutes < 30: # Active in last 30 min 391 salience.hot_threads.append({ 392 "instance": iid, 393 "work": inst.current_work, 394 "age_minutes": age_minutes 395 }) 396 except: 397 pass 398 399 # 2. Read gravity wells from LIVE-COMPRESSION 400 compression_file = SESSIONS_DIR / "LIVE-COMPRESSION.md" 401 if compression_file.exists(): 402 try: 403 content = compression_file.read_text() 404 # Extract gravity wells section 405 if "## Gravity Wells" in content or "## Active Threads" in content: 406 # Simple extraction - find bullet points after gravity wells 407 lines = content.split('\n') 408 in_wells = False 409 wells = [] 410 for line in lines: 411 if 'gravity' in line.lower() or 'active thread' in line.lower(): 412 in_wells = True 413 elif in_wells and line.startswith('- '): 414 wells.append({"topic": line[2:].strip()}) 415 elif in_wells and line.startswith('#'): 416 break 417 salience.gravity_wells = wells[:5] # Top 5 418 except: 419 pass 420 421 # 3. Extract resurrection seed from LIVE-COMPRESSION 422 if compression_file.exists(): 423 try: 424 content = compression_file.read_text() 425 # Look for resurrection seed section 426 if "## Resurrection Seed" in content: 427 idx = content.find("## Resurrection Seed") 428 end_idx = content.find("\n## ", idx + 1) 429 if end_idx == -1: 430 end_idx = len(content) 431 seed = content[idx:end_idx].strip() 432 salience.resurrection_seeds["main"] = seed[:2000] # Limit size 433 except: 434 pass 435 436 # 4. Recent insights from FO state 437 fo_state_file = SESSIONS_DIR / "FO-STATE.json" 438 if fo_state_file.exists(): 439 try: 440 with open(fo_state_file) as f: 441 fo_data = json.load(f) 442 findings = fo_data.get('findings', []) 443 # Get high-importance recent findings 444 recent = [f for f in findings if f.get('importance', 0) > 0.7][-5:] 445 salience.recent_insights = recent 446 except: 447 pass 448 449 # 5. Active conflicts become alerts 450 active_conflicts = [c for c in self.state.conflicts if not c.resolved] 451 salience.alerts = [ 452 {"type": c.conflict_type, "details": c.details, "severity": c.severity} 453 for c in active_conflicts 454 ] 455 456 # 6. Update Phoenix state for system-wide resurrection 457 self.update_phoenix_state() 458 459 def update_phoenix_state(self): 460 """ 461 Update Phoenix state from handoff files and instance activity. 462 463 This enables any new Claude instance to resurrect the full system context 464 by reading META-FO-STATE.json. 465 """ 466 phoenix = self.state.salience.phoenix 467 468 # 1. Compute global focus from all instances' current work 469 all_work = [inst.current_work for inst in self.state.instances.values() if inst.current_work] 470 if all_work: 471 # Simple heuristic: most common significant words 472 word_counts = defaultdict(int) 473 for work in all_work: 474 for word in work.lower().split(): 475 if len(word) > 4: 476 word_counts[word] += 1 477 if word_counts: 478 top_word = max(word_counts.items(), key=lambda x: x[1])[0] 479 phoenix.global_focus = top_word.title() 480 481 # 2. Read thread anchor from LIVE-COMPRESSION 482 compression_file = SESSIONS_DIR / "LIVE-COMPRESSION.md" 483 if compression_file.exists(): 484 try: 485 content = compression_file.read_text() 486 # Look for focus line: focus:: #something | #anchor 487 for line in content.split('\n'): 488 if 'focus::' in line.lower(): 489 parts = line.split('|') 490 if len(parts) >= 2: 491 phoenix.thread_anchor = parts[-1].strip().replace('#', '') 492 break 493 except: 494 pass 495 496 # 3. Build session lineage from active instances 497 phoenix.session_lineage = [] 498 for iid, inst in sorted(self.state.instances.items(), key=lambda x: x[1].last_seen, reverse=True): 499 phoenix.session_lineage.append({ 500 "instance": iid, 501 "work": inst.current_work or "idle", 502 "last_seen": inst.last_seen, 503 "f_score": inst.f_score 504 }) 505 506 # 4. Read operator altitude from handoff files 507 handoff_pattern = SESSIONS_DIR / "HANDOFF-*.md" 508 for hf in sorted(SESSIONS_DIR.glob("HANDOFF-*.md"), reverse=True)[:1]: 509 try: 510 content = hf.read_text() 511 if 'operator_altitude:' in content.lower(): 512 for line in content.split('\n'): 513 if 'operator_altitude:' in line.lower(): 514 phoenix.operator_altitude = line.split(':')[-1].strip() 515 break 516 if 'momentum:' in content.lower(): 517 for line in content.split('\n'): 518 if 'momentum:' in line.lower(): 519 phoenix.momentum = line.split(':')[-1].strip() 520 break 521 except: 522 pass 523 524 # 5. Construct recovery beacon for crash recovery 525 phoenix.recovery_beacon = { 526 "focus": phoenix.global_focus, 527 "anchor": phoenix.thread_anchor, 528 "instances": len(self.state.instances), 529 "conflicts": len([c for c in self.state.conflicts if not c.resolved]), 530 "global_f": self.state.global_f_score, 531 "timestamp": datetime.now().isoformat() 532 } 533 534 def publish_recovery_beacon(self): 535 """Publish recovery beacon to mesh for crash recovery.""" 536 beacon = self.state.salience.phoenix.recovery_beacon 537 if beacon: 538 self.publish_to_mesh({ 539 "type": "recovery_beacon", 540 "from": "meta-fo", 541 "beacon": beacon, 542 "salience": { 543 "hot_threads": self.state.salience.hot_threads[:3], 544 "gravity_wells": self.state.salience.gravity_wells[:3], 545 "alerts": self.state.salience.alerts[:3] 546 }, 547 "timestamp": datetime.now().isoformat() 548 }) 549 550 def broadcast_conflict(self, conflict: ConflictAlert): 551 """Broadcast conflict alert to mesh.""" 552 msg = { 553 "type": "conflict_alert", 554 "from": "meta-fo", 555 "conflict_type": conflict.conflict_type, 556 "instances": conflict.instances, 557 "details": conflict.details, 558 "severity": conflict.severity, 559 "timestamp": conflict.timestamp 560 } 561 self.publish_to_mesh(msg) 562 563 def publish_to_mesh(self, msg: Dict) -> bool: 564 """Publish a coordination message to the mesh.""" 565 try: 566 data = json.dumps(msg).encode() 567 req = urllib.request.Request( 568 f"{MESH_URL}/publish", 569 data=data, 570 headers={"Content-Type": "application/json"}, 571 method="POST" 572 ) 573 with urllib.request.urlopen(req, timeout=2) as resp: 574 self.state.coordination_messages.append(msg) 575 return resp.status == 200 576 except: 577 return False 578 579 def generate_dashboard(self) -> str: 580 """Generate human-readable cross-instance dashboard.""" 581 lines = [] 582 now = datetime.now() 583 584 lines.append("# Cross-Instance Dashboard") 585 lines.append("") 586 lines.append(f"*Updated: {now.isoformat()}*") 587 lines.append("") 588 lines.append("---") 589 lines.append("") 590 591 # Global metrics 592 lines.append("## Global State") 593 lines.append("") 594 lines.append(f"| Metric | Value |") 595 lines.append(f"|--------|-------|") 596 lines.append(f"| Active Instances | {len(self.state.instances)} |") 597 lines.append(f"| Global F-Score | {self.state.global_f_score:.2f} |") 598 lines.append(f"| Total Insights | {self.state.total_insights} |") 599 lines.append(f"| Graph Nodes | {self.state.graph_node_count} |") 600 lines.append(f"| Graph Edges | {self.state.graph_edge_count} |") 601 lines.append("") 602 603 # Active conflicts 604 active_conflicts = [c for c in self.state.conflicts if not c.resolved] 605 if active_conflicts: 606 lines.append("## Active Conflicts") 607 lines.append("") 608 for c in active_conflicts[-5:]: # Last 5 609 severity_emoji = "🔴" if c.severity == 'critical' else "🟡" 610 lines.append(f"- {severity_emoji} **{c.conflict_type}**: {c.details}") 611 lines.append(f" - Instances: {', '.join(c.instances)}") 612 lines.append("") 613 614 # Instance status 615 lines.append("## Instance Status") 616 lines.append("") 617 lines.append("| Instance | Last Seen | Current Work | F |") 618 lines.append("|----------|-----------|--------------|---|") 619 620 for iid, inst in sorted(self.state.instances.items(), key=lambda x: x[1].last_seen, reverse=True): 621 # Format last seen 622 last_seen = "unknown" 623 if inst.last_seen: 624 try: 625 ts = datetime.fromisoformat(inst.last_seen.replace('Z', '+00:00')) 626 delta = now - ts.replace(tzinfo=None) 627 if delta.seconds < 60: 628 last_seen = "just now" 629 elif delta.seconds < 3600: 630 last_seen = f"{delta.seconds // 60}m ago" 631 else: 632 last_seen = f"{delta.seconds // 3600}h ago" 633 except: 634 last_seen = inst.last_seen[:16] 635 636 work = (inst.current_work or "-")[:30] 637 f_score = f"{inst.f_score:.2f}" if inst.f_score else "-" 638 lines.append(f"| {iid[:20]} | {last_seen} | {work} | {f_score} |") 639 640 lines.append("") 641 642 # Recent coordination messages 643 if self.state.coordination_messages: 644 lines.append("## Recent Coordination") 645 lines.append("") 646 for msg in self.state.coordination_messages[-5:]: 647 lines.append(f"- [{msg.get('type')}] {msg.get('details', msg.get('content', ''))[:50]}") 648 lines.append("") 649 650 lines.append("---") 651 lines.append("") 652 lines.append("*Meta First Officer - The mesh is the bus, this is the brain.*") 653 654 return '\n'.join(lines) 655 656 def update_dashboard(self): 657 """Write dashboard to file.""" 658 dashboard = self.generate_dashboard() 659 with open(CROSS_INSTANCE_MD, 'w') as f: 660 f.write(dashboard) 661 662 def run_once(self, publish_beacon: bool = False): 663 """Run a single update cycle.""" 664 # Poll for new messages from mesh 665 messages = self.poll_mesh_log() 666 667 for msg in messages: 668 self.process_message(msg) 669 670 # Also check local FO state 671 fo_state_file = SESSIONS_DIR / "FO-STATE.json" 672 if fo_state_file.exists(): 673 try: 674 with open(fo_state_file) as f: 675 fo_data = json.load(f) 676 # Extract insights count 677 findings = fo_data.get('findings', []) 678 self.state.total_insights = max(self.state.total_insights, len(findings)) 679 except: 680 pass 681 682 # Check graph state 683 graph_state = CONFIG_DIR / "graph-data.json" 684 if graph_state.exists(): 685 try: 686 with open(graph_state) as f: 687 data = json.load(f) 688 self.state.graph_node_count = len(data.get('nodes', [])) 689 self.state.graph_edge_count = len(data.get('edges', [])) 690 except: 691 pass 692 693 # Update salience context (dynamic context for compaction resistance) 694 self.update_salience() 695 696 # Publish recovery beacon periodically (every 5th call when in watch mode) 697 if publish_beacon: 698 self.publish_recovery_beacon() 699 700 # Save and update 701 self.save_state() 702 self.update_dashboard() 703 704 return len(messages) 705 706 def run_watch(self, interval: int = 5): 707 """ 708 Continuous monitoring mode. 709 710 The mesh is the nervous system. Meta FO is the brain. 711 Everything streams through here - all subsystems report to mesh, 712 Meta FO aggregates, coordinates, and publishes recovery beacons. 713 """ 714 print("[meta-fo] Starting continuous monitoring...") 715 print(f"[meta-fo] Polling every {interval}s") 716 print("[meta-fo] Dashboard: sessions/CROSS-INSTANCE.md") 717 print("[meta-fo] Recovery beacons every 30s") 718 print("[meta-fo] Ctrl+C to stop") 719 print() 720 721 # Announce presence on mesh 722 self.publish_to_mesh({ 723 "type": "meta_fo_online", 724 "from": "meta-fo", 725 "content": "Meta First Officer now monitoring all instances", 726 "subsystems": ["resonance", "monologue", "biometric", "graph", "attention"], 727 "timestamp": datetime.now().isoformat() 728 }) 729 730 iteration = 0 731 try: 732 while True: 733 iteration += 1 734 735 # Publish recovery beacon every 6th iteration (~30s) 736 publish_beacon = (iteration % 6 == 0) 737 738 msg_count = self.run_once(publish_beacon=publish_beacon) 739 740 if msg_count > 0 or publish_beacon: 741 status = f"[meta-fo] " 742 if msg_count > 0: 743 status += f"Processed {msg_count} msgs | " 744 status += f"Inst: {len(self.state.instances)} | " 745 status += f"F: {self.state.global_f_score:.2f} | " 746 status += f"Nodes: {self.state.graph_node_count}" 747 if publish_beacon: 748 status += " | Beacon" 749 print(status) 750 751 time.sleep(interval) 752 except KeyboardInterrupt: 753 print("\n[meta-fo] Shutting down...") 754 self.publish_to_mesh({ 755 "type": "meta_fo_offline", 756 "from": "meta-fo", 757 "timestamp": datetime.now().isoformat() 758 }) 759 760 761 def show_status(): 762 """Show current cross-instance status.""" 763 mfo = MetaFirstOfficer() 764 mfo.run_once() 765 766 print() 767 print("╔══════════════════════════════════════════════════════════╗") 768 print("║ META FIRST OFFICER STATUS ║") 769 print("╠══════════════════════════════════════════════════════════╣") 770 print(f"║ Instances: {len(mfo.state.instances):<5} ║") 771 print(f"║ Global F: {mfo.state.global_f_score:<5.2f} ║") 772 print(f"║ Graph Nodes: {mfo.state.graph_node_count:<5} ║") 773 print(f"║ Graph Edges: {mfo.state.graph_edge_count:<5} ║") 774 print(f"║ Insights: {mfo.state.total_insights:<5} ║") 775 print("╠══════════════════════════════════════════════════════════╣") 776 777 active_conflicts = [c for c in mfo.state.conflicts if not c.resolved] 778 if active_conflicts: 779 print("║ ACTIVE CONFLICTS: ║") 780 for c in active_conflicts[:3]: 781 severity = "🔴" if c.severity == 'critical' else "🟡" 782 print(f"║ {severity} {c.conflict_type[:15]}: {c.details[:30]:<30} ║") 783 else: 784 print("║ No active conflicts ║") 785 786 print("╚══════════════════════════════════════════════════════════╝") 787 print() 788 print("Dashboard: sessions/CROSS-INSTANCE.md") 789 print("Run watch: python3 scripts/meta_fo.py --watch") 790 791 792 if __name__ == "__main__": 793 if len(sys.argv) > 1: 794 if sys.argv[1] == "--watch": 795 mfo = MetaFirstOfficer() 796 mfo.run_watch() 797 elif sys.argv[1] == "--status": 798 show_status() 799 elif sys.argv[1] in ["--help", "-h"]: 800 print(__doc__) 801 else: 802 print(f"Unknown option: {sys.argv[1]}") 803 print(__doc__) 804 else: 805 show_status()