/ scripts / phoenix_hygiene.py
phoenix_hygiene.py
  1  #!/usr/bin/env python3
  2  """
  3  Phoenix Hygiene Checker - Enforceable Protocol Compliance
  4  
  5  This script checks for protocol violations and outputs warnings that get
  6  injected into agent context. It's designed to be run:
  7  1. At session startup (via CLAUDE.md protocol)
  8  2. Periodically by First Officer
  9  3. As a Claude Code hook (if configured)
 10  
 11  EXIT CODES:
 12    0 = All checks pass
 13    1 = Warnings present (non-blocking)
 14    2 = Critical violations (should block)
 15  
 16  OUTPUT:
 17    Warnings are printed to stdout in a format that agents will see
 18    and be compelled to address.
 19  """
 20  
 21  import os
 22  import sys
 23  import json
 24  from pathlib import Path
 25  from datetime import datetime, timedelta
 26  from typing import List, Tuple, Optional
 27  
 28  # Import common utilities for portability
 29  try:
 30      from sos_common import (
 31          find_repo_root,
 32          get_live_compression_path,
 33          get_insight_backlog_path,
 34          get_staleness_warning_minutes,
 35          get_staleness_critical_minutes,
 36      )
 37      REPO_ROOT = find_repo_root()
 38      LIVE_COMPRESSION_FILE = get_live_compression_path(REPO_ROOT)
 39      INSIGHT_BACKLOG_FILE = get_insight_backlog_path(REPO_ROOT)
 40      STALENESS_WARNING_MINUTES = get_staleness_warning_minutes(REPO_ROOT)
 41      STALENESS_CRITICAL_MINUTES = get_staleness_critical_minutes(REPO_ROOT)
 42  except ImportError:
 43      # Fallback for standalone use
 44      REPO_ROOT = Path(__file__).parent.parent
 45      LIVE_COMPRESSION_FILE = REPO_ROOT / "sessions" / "LIVE-COMPRESSION.md"
 46      INSIGHT_BACKLOG_FILE = REPO_ROOT / "sessions" / "INSIGHT-BACKLOG.md"
 47      # Tighter thresholds for active sessions
 48      STALENESS_WARNING_MINUTES = 30   # Warn at 30 min (per checklist)
 49      STALENESS_CRITICAL_MINUTES = 60  # Critical at 60 min
 50  
 51  
 52  def check_live_compression_staleness() -> Tuple[str, Optional[str]]:
 53      """
 54      Check if LIVE-COMPRESSION.md is stale.
 55  
 56      Returns:
 57          (status, message) where status is 'ok', 'warning', or 'critical'
 58      """
 59      if not LIVE_COMPRESSION_FILE.exists():
 60          return ('critical', 'LIVE-COMPRESSION.md does not exist')
 61  
 62      # Check file modification time
 63      mtime = datetime.fromtimestamp(LIVE_COMPRESSION_FILE.stat().st_mtime)
 64      age = datetime.now() - mtime
 65      age_minutes = age.total_seconds() / 60
 66  
 67      # Also check the metadata timestamp inside the file
 68      content = LIVE_COMPRESSION_FILE.read_text()
 69      internal_age_minutes = None
 70  
 71      for line in content.split('\n'):
 72          if 'updated::' in line:
 73              try:
 74                  # Parse timestamp like "2026-01-15T20:15:00"
 75                  ts_str = line.split('updated::')[1].strip()
 76                  if 'T' in ts_str:
 77                      internal_ts = datetime.fromisoformat(ts_str.split()[0])
 78                      internal_age = datetime.now() - internal_ts
 79                      internal_age_minutes = internal_age.total_seconds() / 60
 80              except:
 81                  pass
 82              break
 83  
 84      # Use internal timestamp as authoritative if available (file mtime can be touched by daemons)
 85      # If internal timestamp exists, use it; otherwise fall back to file mtime
 86      effective_age = internal_age_minutes if internal_age_minutes is not None else age_minutes
 87  
 88      if effective_age > STALENESS_CRITICAL_MINUTES:
 89          return ('critical',
 90                  f'LIVE-COMPRESSION.md is {effective_age:.0f} minutes stale. '
 91                  f'Phoenix state NOT being maintained. UPDATE IMMEDIATELY.')
 92      elif effective_age > STALENESS_WARNING_MINUTES:
 93          return ('warning',
 94                  f'LIVE-COMPRESSION.md is {effective_age:.0f} minutes old. '
 95                  f'Consider updating phoenix state.')
 96  
 97      return ('ok', None)
 98  
 99  
100  def check_live_compression_content() -> Tuple[str, Optional[str]]:
101      """
102      Check if LIVE-COMPRESSION.md has required sections.
103      """
104      if not LIVE_COMPRESSION_FILE.exists():
105          return ('critical', 'LIVE-COMPRESSION.md missing')
106  
107      content = LIVE_COMPRESSION_FILE.read_text()
108  
109      required_sections = [
110          ('## Resurrection Seed', 'Missing resurrection seed - context cannot be recovered'),
111          ('## Gravity Wells', 'Missing gravity wells - no tracking of active focus'),
112          ('## Key Insights', 'Missing key insights - learnings not captured'),
113      ]
114  
115      warnings = []
116      for section, message in required_sections:
117          if section not in content:
118              warnings.append(message)
119  
120      if warnings:
121          return ('warning', '; '.join(warnings))
122  
123      return ('ok', None)
124  
125  
126  def check_insight_backlog() -> Tuple[str, Optional[str]]:
127      """
128      Check if INSIGHT-BACKLOG.md has unaddressed high-priority items.
129      """
130      if not INSIGHT_BACKLOG_FILE.exists():
131          return ('ok', None)  # File is optional
132  
133      content = INSIGHT_BACKLOG_FILE.read_text()
134      lines = content.split('\n')
135  
136      # Look for ## High Priority section and count unchecked items
137      in_high_priority = False
138      unchecked_items = []
139  
140      for line in lines:
141          if '## High Priority' in line:
142              in_high_priority = True
143              continue
144          if in_high_priority and line.startswith('## '):
145              break  # End of High Priority section
146          if in_high_priority and line.strip().startswith('- [ ]'):
147              # Extract item title
148              item = line.strip()[5:].split(' - ')[0].strip('*').strip()
149              unchecked_items.append(item)
150  
151      if unchecked_items:
152          items_str = ', '.join(unchecked_items[:3])  # Show first 3
153          if len(unchecked_items) > 3:
154              items_str += f' (+{len(unchecked_items) - 3} more)'
155          return ('warning',
156                  f'INSIGHT-BACKLOG has {len(unchecked_items)} HIGH PRIORITY items: {items_str}')
157  
158      # Also check for explicit markers
159      high_priority_markers = ['[HIGH]', '🔴', 'URGENT', 'CRITICAL']
160      for marker in high_priority_markers:
161          if marker in content:
162              return ('warning',
163                      f'INSIGHT-BACKLOG.md has high-priority items ({marker}). Review immediately.')
164  
165      return ('ok', None)
166  
167  
168  def check_session_debrief() -> Tuple[str, Optional[str]]:
169      """
170      Check if there are git commits without a corresponding session debrief.
171  
172      CLAUDE.md MANDATES: "After EVERY work run, you MUST include session debrief"
173      This check enforces that rule.
174      """
175      import subprocess
176  
177      debrief_ledger = Path.home() / ".sovereign" / "debrief-ledger.json"
178  
179      # Get last debrief time
180      last_debrief_time = None
181      if debrief_ledger.exists():
182          try:
183              with open(debrief_ledger) as f:
184                  ledger = json.load(f)
185                  if ledger:
186                      last_debrief_time = datetime.fromisoformat(ledger[-1]['timestamp'])
187          except:
188              pass
189  
190      # Get commits since last debrief
191      try:
192          if last_debrief_time:
193              since = last_debrief_time.strftime('%Y-%m-%d %H:%M:%S')
194              result = subprocess.run(
195                  ['git', 'log', '--oneline', f'--since={since}', '--', '.'],
196                  capture_output=True, text=True, cwd=REPO_ROOT
197              )
198          else:
199              # No debrief ever recorded - check last 24 hours
200              result = subprocess.run(
201                  ['git', 'log', '--oneline', '--since=24 hours ago', '--', '.'],
202                  capture_output=True, text=True, cwd=REPO_ROOT
203              )
204  
205          if result.returncode == 0:
206              commits = [l for l in result.stdout.strip().split('\n') if l]
207              if commits:
208                  return ('warning',
209                          f'{len(commits)} commit(s) since last debrief. '
210                          f'Run: python3 scripts/session_report.py')
211      except Exception:
212          pass  # Git not available or not a repo
213  
214      return ('ok', None)
215  
216  
217  def check_all() -> List[Tuple[str, str, str]]:
218      """
219      Run all hygiene checks.
220  
221      Returns:
222          List of (check_name, status, message) tuples
223      """
224      checks = [
225          ('phoenix_staleness', check_live_compression_staleness),
226          ('phoenix_content', check_live_compression_content),
227          ('insight_backlog', check_insight_backlog),
228          ('session_debrief', check_session_debrief),
229      ]
230  
231      results = []
232      for name, check_fn in checks:
233          status, message = check_fn()
234          if message:
235              results.append((name, status, message))
236  
237      return results
238  
239  
240  def format_output(results: List[Tuple[str, str, str]], format: str = 'text') -> str:
241      """Format results for output."""
242      if format == 'json':
243          return json.dumps([
244              {'check': name, 'status': status, 'message': message}
245              for name, status, message in results
246          ], indent=2)
247  
248      if not results:
249          return ""
250  
251      lines = []
252  
253      # Separate critical from warnings
254      critical = [(n, m) for n, s, m in results if s == 'critical']
255      warnings = [(n, m) for n, s, m in results if s == 'warning']
256  
257      if critical:
258          lines.append("=" * 70)
259          lines.append("🚨 CRITICAL PROTOCOL VIOLATIONS - MUST ADDRESS IMMEDIATELY")
260          lines.append("=" * 70)
261          for name, message in critical:
262              lines.append(f"  [{name}] {message}")
263          lines.append("")
264  
265      if warnings:
266          lines.append("-" * 70)
267          lines.append("⚠️  HYGIENE WARNINGS")
268          lines.append("-" * 70)
269          for name, message in warnings:
270              lines.append(f"  [{name}] {message}")
271          lines.append("")
272  
273      return "\n".join(lines)
274  
275  
276  def main():
277      import argparse
278  
279      parser = argparse.ArgumentParser(
280          description="Phoenix Hygiene Checker - Protocol Compliance Enforcement"
281      )
282      parser.add_argument('--json', action='store_true', help='Output in JSON format')
283      parser.add_argument('--quiet', action='store_true', help='Only output if issues found')
284      parser.add_argument('--check', choices=['staleness', 'content', 'backlog', 'all'],
285                          default='all', help='Which check to run')
286  
287      args = parser.parse_args()
288  
289      # Run checks
290      if args.check == 'all':
291          results = check_all()
292      elif args.check == 'staleness':
293          status, msg = check_live_compression_staleness()
294          results = [('phoenix_staleness', status, msg)] if msg else []
295      elif args.check == 'content':
296          status, msg = check_live_compression_content()
297          results = [('phoenix_content', status, msg)] if msg else []
298      elif args.check == 'backlog':
299          status, msg = check_insight_backlog()
300          results = [('insight_backlog', status, msg)] if msg else []
301  
302      # Output
303      fmt = 'json' if args.json else 'text'
304      output = format_output(results, fmt)
305  
306      if output or not args.quiet:
307          if output:
308              print(output)
309          elif not args.quiet:
310              print("✓ All phoenix hygiene checks pass")
311  
312      # Exit code
313      has_critical = any(s == 'critical' for _, s, _ in results)
314      has_warnings = any(s == 'warning' for _, s, _ in results)
315  
316      if has_critical:
317          sys.exit(2)
318      elif has_warnings:
319          sys.exit(1)
320      else:
321          sys.exit(0)
322  
323  
324  if __name__ == "__main__":
325      main()