phoenix_hygiene.py
1 #!/usr/bin/env python3 2 """ 3 Phoenix Hygiene Checker - Enforceable Protocol Compliance 4 5 This script checks for protocol violations and outputs warnings that get 6 injected into agent context. It's designed to be run: 7 1. At session startup (via CLAUDE.md protocol) 8 2. Periodically by First Officer 9 3. As a Claude Code hook (if configured) 10 11 EXIT CODES: 12 0 = All checks pass 13 1 = Warnings present (non-blocking) 14 2 = Critical violations (should block) 15 16 OUTPUT: 17 Warnings are printed to stdout in a format that agents will see 18 and be compelled to address. 19 """ 20 21 import os 22 import sys 23 import json 24 from pathlib import Path 25 from datetime import datetime, timedelta 26 from typing import List, Tuple, Optional 27 28 # Import common utilities for portability 29 try: 30 from sos_common import ( 31 find_repo_root, 32 get_live_compression_path, 33 get_insight_backlog_path, 34 get_staleness_warning_minutes, 35 get_staleness_critical_minutes, 36 ) 37 REPO_ROOT = find_repo_root() 38 LIVE_COMPRESSION_FILE = get_live_compression_path(REPO_ROOT) 39 INSIGHT_BACKLOG_FILE = get_insight_backlog_path(REPO_ROOT) 40 STALENESS_WARNING_MINUTES = get_staleness_warning_minutes(REPO_ROOT) 41 STALENESS_CRITICAL_MINUTES = get_staleness_critical_minutes(REPO_ROOT) 42 except ImportError: 43 # Fallback for standalone use 44 REPO_ROOT = Path(__file__).parent.parent 45 LIVE_COMPRESSION_FILE = REPO_ROOT / "sessions" / "LIVE-COMPRESSION.md" 46 INSIGHT_BACKLOG_FILE = REPO_ROOT / "sessions" / "INSIGHT-BACKLOG.md" 47 # Tighter thresholds for active sessions 48 STALENESS_WARNING_MINUTES = 30 # Warn at 30 min (per checklist) 49 STALENESS_CRITICAL_MINUTES = 60 # Critical at 60 min 50 51 52 def check_live_compression_staleness() -> Tuple[str, Optional[str]]: 53 """ 54 Check if LIVE-COMPRESSION.md is stale. 55 56 Returns: 57 (status, message) where status is 'ok', 'warning', or 'critical' 58 """ 59 if not LIVE_COMPRESSION_FILE.exists(): 60 return ('critical', 'LIVE-COMPRESSION.md does not exist') 61 62 # Check file modification time 63 mtime = datetime.fromtimestamp(LIVE_COMPRESSION_FILE.stat().st_mtime) 64 age = datetime.now() - mtime 65 age_minutes = age.total_seconds() / 60 66 67 # Also check the metadata timestamp inside the file 68 content = LIVE_COMPRESSION_FILE.read_text() 69 internal_age_minutes = None 70 71 for line in content.split('\n'): 72 if 'updated::' in line: 73 try: 74 # Parse timestamp like "2026-01-15T20:15:00" 75 ts_str = line.split('updated::')[1].strip() 76 if 'T' in ts_str: 77 internal_ts = datetime.fromisoformat(ts_str.split()[0]) 78 internal_age = datetime.now() - internal_ts 79 internal_age_minutes = internal_age.total_seconds() / 60 80 except: 81 pass 82 break 83 84 # Use internal timestamp as authoritative if available (file mtime can be touched by daemons) 85 # If internal timestamp exists, use it; otherwise fall back to file mtime 86 effective_age = internal_age_minutes if internal_age_minutes is not None else age_minutes 87 88 if effective_age > STALENESS_CRITICAL_MINUTES: 89 return ('critical', 90 f'LIVE-COMPRESSION.md is {effective_age:.0f} minutes stale. ' 91 f'Phoenix state NOT being maintained. UPDATE IMMEDIATELY.') 92 elif effective_age > STALENESS_WARNING_MINUTES: 93 return ('warning', 94 f'LIVE-COMPRESSION.md is {effective_age:.0f} minutes old. ' 95 f'Consider updating phoenix state.') 96 97 return ('ok', None) 98 99 100 def check_live_compression_content() -> Tuple[str, Optional[str]]: 101 """ 102 Check if LIVE-COMPRESSION.md has required sections. 103 """ 104 if not LIVE_COMPRESSION_FILE.exists(): 105 return ('critical', 'LIVE-COMPRESSION.md missing') 106 107 content = LIVE_COMPRESSION_FILE.read_text() 108 109 required_sections = [ 110 ('## Resurrection Seed', 'Missing resurrection seed - context cannot be recovered'), 111 ('## Gravity Wells', 'Missing gravity wells - no tracking of active focus'), 112 ('## Key Insights', 'Missing key insights - learnings not captured'), 113 ] 114 115 warnings = [] 116 for section, message in required_sections: 117 if section not in content: 118 warnings.append(message) 119 120 if warnings: 121 return ('warning', '; '.join(warnings)) 122 123 return ('ok', None) 124 125 126 def check_insight_backlog() -> Tuple[str, Optional[str]]: 127 """ 128 Check if INSIGHT-BACKLOG.md has unaddressed high-priority items. 129 """ 130 if not INSIGHT_BACKLOG_FILE.exists(): 131 return ('ok', None) # File is optional 132 133 content = INSIGHT_BACKLOG_FILE.read_text() 134 lines = content.split('\n') 135 136 # Look for ## High Priority section and count unchecked items 137 in_high_priority = False 138 unchecked_items = [] 139 140 for line in lines: 141 if '## High Priority' in line: 142 in_high_priority = True 143 continue 144 if in_high_priority and line.startswith('## '): 145 break # End of High Priority section 146 if in_high_priority and line.strip().startswith('- [ ]'): 147 # Extract item title 148 item = line.strip()[5:].split(' - ')[0].strip('*').strip() 149 unchecked_items.append(item) 150 151 if unchecked_items: 152 items_str = ', '.join(unchecked_items[:3]) # Show first 3 153 if len(unchecked_items) > 3: 154 items_str += f' (+{len(unchecked_items) - 3} more)' 155 return ('warning', 156 f'INSIGHT-BACKLOG has {len(unchecked_items)} HIGH PRIORITY items: {items_str}') 157 158 # Also check for explicit markers 159 high_priority_markers = ['[HIGH]', '🔴', 'URGENT', 'CRITICAL'] 160 for marker in high_priority_markers: 161 if marker in content: 162 return ('warning', 163 f'INSIGHT-BACKLOG.md has high-priority items ({marker}). Review immediately.') 164 165 return ('ok', None) 166 167 168 def check_session_debrief() -> Tuple[str, Optional[str]]: 169 """ 170 Check if there are git commits without a corresponding session debrief. 171 172 CLAUDE.md MANDATES: "After EVERY work run, you MUST include session debrief" 173 This check enforces that rule. 174 """ 175 import subprocess 176 177 debrief_ledger = Path.home() / ".sovereign" / "debrief-ledger.json" 178 179 # Get last debrief time 180 last_debrief_time = None 181 if debrief_ledger.exists(): 182 try: 183 with open(debrief_ledger) as f: 184 ledger = json.load(f) 185 if ledger: 186 last_debrief_time = datetime.fromisoformat(ledger[-1]['timestamp']) 187 except: 188 pass 189 190 # Get commits since last debrief 191 try: 192 if last_debrief_time: 193 since = last_debrief_time.strftime('%Y-%m-%d %H:%M:%S') 194 result = subprocess.run( 195 ['git', 'log', '--oneline', f'--since={since}', '--', '.'], 196 capture_output=True, text=True, cwd=REPO_ROOT 197 ) 198 else: 199 # No debrief ever recorded - check last 24 hours 200 result = subprocess.run( 201 ['git', 'log', '--oneline', '--since=24 hours ago', '--', '.'], 202 capture_output=True, text=True, cwd=REPO_ROOT 203 ) 204 205 if result.returncode == 0: 206 commits = [l for l in result.stdout.strip().split('\n') if l] 207 if commits: 208 return ('warning', 209 f'{len(commits)} commit(s) since last debrief. ' 210 f'Run: python3 scripts/session_report.py') 211 except Exception: 212 pass # Git not available or not a repo 213 214 return ('ok', None) 215 216 217 def check_all() -> List[Tuple[str, str, str]]: 218 """ 219 Run all hygiene checks. 220 221 Returns: 222 List of (check_name, status, message) tuples 223 """ 224 checks = [ 225 ('phoenix_staleness', check_live_compression_staleness), 226 ('phoenix_content', check_live_compression_content), 227 ('insight_backlog', check_insight_backlog), 228 ('session_debrief', check_session_debrief), 229 ] 230 231 results = [] 232 for name, check_fn in checks: 233 status, message = check_fn() 234 if message: 235 results.append((name, status, message)) 236 237 return results 238 239 240 def format_output(results: List[Tuple[str, str, str]], format: str = 'text') -> str: 241 """Format results for output.""" 242 if format == 'json': 243 return json.dumps([ 244 {'check': name, 'status': status, 'message': message} 245 for name, status, message in results 246 ], indent=2) 247 248 if not results: 249 return "" 250 251 lines = [] 252 253 # Separate critical from warnings 254 critical = [(n, m) for n, s, m in results if s == 'critical'] 255 warnings = [(n, m) for n, s, m in results if s == 'warning'] 256 257 if critical: 258 lines.append("=" * 70) 259 lines.append("🚨 CRITICAL PROTOCOL VIOLATIONS - MUST ADDRESS IMMEDIATELY") 260 lines.append("=" * 70) 261 for name, message in critical: 262 lines.append(f" [{name}] {message}") 263 lines.append("") 264 265 if warnings: 266 lines.append("-" * 70) 267 lines.append("⚠️ HYGIENE WARNINGS") 268 lines.append("-" * 70) 269 for name, message in warnings: 270 lines.append(f" [{name}] {message}") 271 lines.append("") 272 273 return "\n".join(lines) 274 275 276 def main(): 277 import argparse 278 279 parser = argparse.ArgumentParser( 280 description="Phoenix Hygiene Checker - Protocol Compliance Enforcement" 281 ) 282 parser.add_argument('--json', action='store_true', help='Output in JSON format') 283 parser.add_argument('--quiet', action='store_true', help='Only output if issues found') 284 parser.add_argument('--check', choices=['staleness', 'content', 'backlog', 'all'], 285 default='all', help='Which check to run') 286 287 args = parser.parse_args() 288 289 # Run checks 290 if args.check == 'all': 291 results = check_all() 292 elif args.check == 'staleness': 293 status, msg = check_live_compression_staleness() 294 results = [('phoenix_staleness', status, msg)] if msg else [] 295 elif args.check == 'content': 296 status, msg = check_live_compression_content() 297 results = [('phoenix_content', status, msg)] if msg else [] 298 elif args.check == 'backlog': 299 status, msg = check_insight_backlog() 300 results = [('insight_backlog', status, msg)] if msg else [] 301 302 # Output 303 fmt = 'json' if args.json else 'text' 304 output = format_output(results, fmt) 305 306 if output or not args.quiet: 307 if output: 308 print(output) 309 elif not args.quiet: 310 print("✓ All phoenix hygiene checks pass") 311 312 # Exit code 313 has_critical = any(s == 'critical' for _, s, _ in results) 314 has_warnings = any(s == 'warning' for _, s, _ in results) 315 316 if has_critical: 317 sys.exit(2) 318 elif has_warnings: 319 sys.exit(1) 320 else: 321 sys.exit(0) 322 323 324 if __name__ == "__main__": 325 main()