/ hooks / session_close_hook.py
session_close_hook.py
  1  #!/usr/bin/env python3
  2  """
  3  Session Close Hook for Claude Code
  4  
  5  Exports session state and Phoenix configuration at session close.
  6  This ensures cognitive continuity across sessions and machines.
  7  
  8  Runs on the `Stop` hook event when a Claude Code session ends.
  9  
 10  What it does:
 11  1. Reads the session transcript
 12  2. Extracts topics, decisions, open threads, gravity wells
 13  3. Updates LIVE-COMPRESSION.md (local Phoenix state - ALWAYS runs)
 14  4. Generates a Phoenix State YAML
 15  5. Stores to Hypercore for P2P replication (if daemon running)
 16  6. Outputs a summary for the user
 17  
 18  The key insight: LIVE-COMPRESSION.md must be updated on EVERY session close,
 19  regardless of whether Hypercore daemon is running. This prevents the overnight
 20  staleness pattern where sessions end without explicit phoenix flush.
 21  
 22  Install:
 23    Add to ~/.claude/settings.json under "hooks.Stop"
 24  
 25    Example settings.json:
 26    {
 27      "hooks": {
 28        "Stop": [
 29          {
 30            "matcher": "workingDirectory:Sovereign_OS",
 31            "hooks": [
 32              {
 33                "type": "command",
 34                "command": "python3 /Users/rcerf/repos/Sovereign_OS/hooks/session_close_hook.py"
 35              }
 36            ]
 37          }
 38        ]
 39      }
 40    }
 41  """
 42  
 43  import json
 44  import sys
 45  import re
 46  import urllib.request
 47  import urllib.error
 48  from datetime import datetime
 49  from pathlib import Path
 50  from typing import List, Dict, Any, Optional
 51  
 52  DAEMON_URL = "http://localhost:7777"
 53  MESH_URL = "http://localhost:7778"
 54  SOVEREIGN_OS_ROOT = Path(__file__).parent.parent
 55  LIVE_COMPRESSION_PATH = SOVEREIGN_OS_ROOT / "sessions" / "LIVE-COMPRESSION.md"
 56  FO_STATE_PATH = SOVEREIGN_OS_ROOT / "sessions" / "FO-STATE.json"
 57  ECONOMICS_PATH = Path.home() / ".sovereign" / "economics.yaml"
 58  SESSION_REPORT_SCRIPT = SOVEREIGN_OS_ROOT / "scripts" / "session_report.py"
 59  GRAPH_FEEDER_SCRIPT = SOVEREIGN_OS_ROOT / "scripts" / "graph_feeder.py"
 60  
 61  
 62  def load_session_costs() -> Dict[str, Any]:
 63      """Load cost tracking data for session close report."""
 64      costs = {
 65          "sats_per_hour": 172110,
 66          "hourly_rate_usd": 172.11,
 67          "btc_price": 100000,
 68          "claude": {}
 69      }
 70  
 71      # Load economics config
 72      try:
 73          if ECONOMICS_PATH.exists():
 74              with open(ECONOMICS_PATH) as f:
 75                  for line in f:
 76                      if "sats_per_hour:" in line:
 77                          costs["sats_per_hour"] = int(line.split(":")[1].strip())
 78                      elif "hourly_rate_usd:" in line:
 79                          costs["hourly_rate_usd"] = float(line.split(":")[1].strip())
 80                      elif "btc_price_usd:" in line:
 81                          costs["btc_price"] = float(line.split(":")[1].strip())
 82      except:
 83          pass
 84  
 85      # Load FO-STATE costs
 86      try:
 87          if FO_STATE_PATH.exists():
 88              with open(FO_STATE_PATH) as f:
 89                  fo_state = json.load(f)
 90                  costs["claude"] = fo_state.get("costs", {})
 91      except:
 92          pass
 93  
 94      return costs
 95  
 96  
 97  def format_session_economics(session_hours: float, costs: Dict[str, Any]) -> str:
 98      """Format session economics for display."""
 99      sats_per_hour = costs.get("sats_per_hour", 172110)
100      btc_price = costs.get("btc_price", 100000)
101  
102      attention_sats = int(session_hours * sats_per_hour)
103      attention_usd = round(attention_sats / 100_000_000 * btc_price, 2)
104      claude_sats = 3333
105      claude_usd = 3.33
106      total_sats = attention_sats + claude_sats
107      total_usd = attention_usd + claude_usd
108  
109      claude_data = costs.get("claude", {})
110      efficiency = claude_data.get("efficiency", {})
111  
112      output = f"""
113  ## Session Economics (Bitcoin-Anchored)
114  
115  **Session Duration:** {session_hours:.1f} hours
116  
117  ```
118  COSTS INVESTED:
119    Attention:     {attention_sats:,} sats (${attention_usd:,.2f})
120    Claude:        {claude_sats:,} sats (${claude_usd:,.2f})
121    TOTAL:         {total_sats:,} sats (${total_usd:,.2f})
122  
123  CLAUDE EFFICIENCY:
124    API:           {efficiency.get('api_efficiency', 0)}x
125    Infrastructure:{efficiency.get('infra_efficiency', 0)}x
126  ```
127  
128  *Attention priced at {sats_per_hour:,} sats/hr (Meta W-2 2024 market rate)*
129  """
130      return output
131  
132  
133  def fetch_json(method: str, path: str, data: dict = None) -> dict:
134      """Make HTTP request to daemon."""
135      try:
136          url = f"{DAEMON_URL}{path}"
137          if data:
138              req = urllib.request.Request(
139                  url,
140                  data=json.dumps(data).encode('utf-8'),
141                  headers={"Content-Type": "application/json"},
142                  method=method
143              )
144          else:
145              req = urllib.request.Request(url, method=method)
146  
147          with urllib.request.urlopen(req, timeout=5) as response:
148              return json.loads(response.read().decode('utf-8'))
149      except Exception as e:
150          return {"error": str(e)}
151  
152  
153  def publish_to_mesh(message_type: str, payload: dict) -> bool:
154      """Publish event to sovereign mesh (N of X - all instances see this)."""
155      try:
156          msg = json.dumps({
157              "type": message_type,
158              "from": "session-close-hook",
159              "payload": payload,
160              "timestamp": datetime.now().isoformat()
161          }).encode('utf-8')
162          req = urllib.request.Request(
163              f"{MESH_URL}/publish",
164              data=msg,
165              headers={"Content-Type": "application/json"},
166              method="POST"
167          )
168          with urllib.request.urlopen(req, timeout=2) as resp:
169              return resp.status == 200
170      except:
171          return False
172  
173  
174  def get_mesh_status() -> Dict[str, Any]:
175      """Check mesh network health."""
176      try:
177          with urllib.request.urlopen(f"{MESH_URL}/", timeout=2) as resp:
178              return json.loads(resp.read().decode())
179      except:
180          return {"online": False, "peers": 0}
181  
182  
183  def run_session_report() -> bool:
184      """
185      Run session_report.py to generate debrief.
186  
187      ENFORCEMENT: This ensures every session gets a report.
188      Belt = CLAUDE.md says to do it
189      Suspenders = This hook forces it
190      """
191      import subprocess
192      try:
193          if SESSION_REPORT_SCRIPT.exists():
194              result = subprocess.run(
195                  ["python3", str(SESSION_REPORT_SCRIPT)],
196                  capture_output=True,
197                  text=True,
198                  timeout=30,
199                  cwd=str(SOVEREIGN_OS_ROOT)
200              )
201              return result.returncode == 0
202      except Exception as e:
203          print(f"Session report failed: {e}", file=sys.stderr)
204      return False
205  
206  
207  def git_auto_push() -> bool:
208      """
209      Auto-push code changes to remote on session close.
210  
211      ENFORCEMENT: Ensures code is synced after every session.
212      Only pushes if there are commits ahead of remote.
213      """
214      import subprocess
215      try:
216          # Check if we're ahead of remote
217          result = subprocess.run(
218              ["git", "status", "-sb"],
219              capture_output=True, text=True,
220              timeout=10, cwd=str(SOVEREIGN_OS_ROOT)
221          )
222          if "ahead" not in result.stdout:
223              return True  # Nothing to push, counts as success
224  
225          # Push to remote
226          result = subprocess.run(
227              ["git", "push", "origin", "main"],
228              capture_output=True, text=True,
229              timeout=60, cwd=str(SOVEREIGN_OS_ROOT)
230          )
231          if result.returncode == 0:
232              print("Git auto-push succeeded", file=sys.stderr)
233              return True
234          else:
235              print(f"Git push failed: {result.stderr}", file=sys.stderr)
236              return False
237      except Exception as e:
238          print(f"Git auto-push error: {e}", file=sys.stderr)
239          return False
240  
241  
242  def run_graph_update() -> bool:
243      """
244      Trigger graph feeder to update Obsidian graph.
245  
246      ENFORCEMENT: This ensures every session contributes to the graph.
247      """
248      import subprocess
249      try:
250          if GRAPH_FEEDER_SCRIPT.exists():
251              result = subprocess.run(
252                  ["python3", str(GRAPH_FEEDER_SCRIPT), "--status"],
253                  capture_output=True,
254                  text=True,
255                  timeout=10,
256                  cwd=str(SOVEREIGN_OS_ROOT)
257              )
258              return result.returncode == 0
259      except Exception as e:
260          print(f"Graph update failed: {e}", file=sys.stderr)
261      return False
262  
263  
264  def extract_topics_from_transcript(transcript_path: str) -> List[str]:
265      """Extract topics/concepts from session transcript."""
266      topics = set()
267  
268      try:
269          with open(transcript_path, 'r') as f:
270              for line in f:
271                  try:
272                      entry = json.loads(line)
273                      content = ""
274  
275                      # Extract content from different message types
276                      if entry.get("type") == "user":
277                          content = entry.get("message", {}).get("content", "")
278                      elif entry.get("type") == "assistant":
279                          msg = entry.get("message", {})
280                          if isinstance(msg.get("content"), list):
281                              for block in msg["content"]:
282                                  if block.get("type") == "text":
283                                      content += block.get("text", "") + " "
284                          elif isinstance(msg.get("content"), str):
285                              content = msg.get("content", "")
286  
287                      if content:
288                          # Extract hashtags
289                          hashtags = re.findall(r'#(\w+)', content)
290                          topics.update(hashtags)
291  
292                          # Extract quoted terms
293                          quoted = re.findall(r'"([^"]{3,30})"', content)
294                          topics.update(quoted)
295  
296                          # Extract **bold** terms
297                          bold = re.findall(r'\*\*([^*]{3,30})\*\*', content)
298                          topics.update(bold)
299  
300                          # Extract CamelCase terms
301                          camel = re.findall(r'\b([A-Z][a-z]+[A-Z][a-zA-Z]*)\b', content)
302                          topics.update(camel)
303  
304                  except json.JSONDecodeError:
305                      continue
306  
307      except Exception as e:
308          pass
309  
310      # Clean and filter
311      cleaned = set()
312      for t in topics:
313          t = t.strip().lower()
314          if len(t) >= 3 and len(t) <= 50 and not t.isdigit():
315              cleaned.add(t)
316  
317      return list(cleaned)[:20]
318  
319  
320  def extract_open_threads(transcript_path: str) -> List[str]:
321      """Extract questions and unresolved items from transcript."""
322      threads = []
323  
324      try:
325          with open(transcript_path, 'r') as f:
326              content = f.read()
327  
328          # Find questions
329          questions = re.findall(r'([^.!?]*\?)', content)
330          for q in questions:
331              q = q.strip()
332              if len(q) > 20 and len(q) < 200:
333                  threads.append(q)
334  
335      except:
336          pass
337  
338      return threads[:10]
339  
340  
341  def extract_decisions(transcript_path: str) -> List[str]:
342      """Extract decisions and actions from transcript."""
343      decisions = []
344  
345      try:
346          with open(transcript_path, 'r') as f:
347              content = f.read()
348  
349          # Decision patterns
350          patterns = [
351              r"(?:let's|we should|I'll|going to|decided to)\s+([^.!?\n]{10,100})",
352              r"(?:TODO|DONE|ACTION):\s*([^.!?\n]{10,100})",
353          ]
354  
355          for pattern in patterns:
356              matches = re.findall(pattern, content, re.IGNORECASE)
357              decisions.extend(matches)
358  
359      except:
360          pass
361  
362      return list(set(decisions))[:10]
363  
364  
365  def compute_altitude(transcript_path: str) -> str:
366      """Estimate the dominant altitude of the session."""
367      try:
368          with open(transcript_path, 'r') as f:
369              content = f.read().lower()
370  
371          # Simple keyword-based detection
372          philosophical_words = ["why", "philosophy", "principle", "theory", "meaning", "purpose"]
373          strategic_words = ["strategy", "plan", "architecture", "design", "approach", "system"]
374          tactical_words = ["how", "implement", "build", "create", "configure", "setup"]
375          operational_words = ["fix", "bug", "error", "run", "test", "deploy"]
376  
377          scores = {
378              "philosophical": sum(content.count(w) for w in philosophical_words),
379              "strategic": sum(content.count(w) for w in strategic_words),
380              "tactical": sum(content.count(w) for w in tactical_words),
381              "operational": sum(content.count(w) for w in operational_words),
382          }
383  
384          return max(scores, key=lambda k: scores[k])
385  
386      except:
387          return "tactical"
388  
389  
390  def generate_phoenix_state(
391      session_id: str,
392      transcript_path: str
393  ) -> Dict[str, Any]:
394      """Generate a Phoenix State from the session."""
395  
396      topics = extract_topics_from_transcript(transcript_path)
397      open_threads = extract_open_threads(transcript_path)
398      decisions = extract_decisions(transcript_path)
399      altitude = compute_altitude(transcript_path)
400  
401      return {
402          "session_id": session_id,
403          "created": datetime.now().isoformat(),
404          "operator": "rick",
405          "domain": "Estate",
406          "operator_altitude": altitude,
407          "system_altitude": "tactical",
408          "pull_rate": 0.5,
409          "gravity_wells": [
410              {"concept": t, "resonance": 0.7, "mention_count": 1}
411              for t in topics[:10]
412          ],
413          "momentum": {
414              "rising": topics[:5],
415              "almost_formed": []
416          },
417          "open_threads": [
418              {"content": t, "importance": 0.6}
419              for t in open_threads[:5]
420          ],
421          "ready_to_implement": decisions[:5],
422          "paths_not_taken": [],
423          "key_files": []
424      }
425  
426  
427  def update_live_compression(
428      session_id: str,
429      phoenix_state: Dict[str, Any],
430      transcript_path: str = ""
431  ) -> bool:
432      """
433      Update LIVE-COMPRESSION.md with session close state.
434  
435      This is the PRIMARY mechanism for preventing overnight staleness.
436      It runs locally and doesn't depend on Hypercore daemon.
437  
438      Returns True if update succeeded, False otherwise.
439      """
440      try:
441          timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
442          date_str = datetime.now().strftime("%Y-%m-%d")
443  
444          # Extract data from phoenix state
445          gravity_wells = phoenix_state.get("gravity_wells", [])
446          open_threads = phoenix_state.get("open_threads", [])
447          decisions = phoenix_state.get("ready_to_implement", [])
448          altitude = phoenix_state.get("operator_altitude", "tactical")
449          topics = [gw.get("concept", "") for gw in gravity_wells[:10]]
450  
451          # Build the markdown content
452          content = f"""# Live Compression - {date_str}/session-close-auto
453  
454  *Auto-generated by session_close_hook.py*
455  
456  ---
457  
458  - **metadata**
459    - updated:: {timestamp}
460    - confidence:: 0.75
461    - free_energy:: F = 0.15 (auto-generated at session close)
462    - status:: SESSION CLOSED - Awaiting next session
463    - checkpoint:: auto
464    - mode:: Session close flush
465    - session_id:: {session_id}
466  
467  ---
468  
469  ## Session Summary (Auto-Extracted)
470  
471  **Session ID:** `{session_id}`
472  **Altitude:** {altitude}
473  **Close Time:** {timestamp}
474  
475  ---
476  
477  ## Gravity Wells (Auto-Detected)
478  
479  """
480          if gravity_wells:
481              content += "- **wells**\n"
482              for gw in gravity_wells[:10]:
483                  concept = gw.get("concept", "unknown")
484                  resonance = gw.get("resonance", 0.5)
485                  content += f"  - [[{concept}]]\n"
486                  content += f"    - strength:: {resonance:.2f}\n"
487                  content += f"    - touched:: session close\n"
488          else:
489              content += "*No gravity wells detected*\n"
490  
491          content += """
492  ---
493  
494  ## Open Threads (Auto-Detected)
495  
496  """
497          if open_threads:
498              for i, thread in enumerate(open_threads[:5], 1):
499                  thread_content = thread.get("content", str(thread)) if isinstance(thread, dict) else str(thread)
500                  content += f"{i}. {thread_content[:200]}...\n" if len(str(thread_content)) > 200 else f"{i}. {thread_content}\n"
501          else:
502              content += "*No open threads detected*\n"
503  
504          content += """
505  ---
506  
507  ## Decisions Made (Auto-Detected)
508  
509  """
510          if decisions:
511              for decision in decisions[:5]:
512                  content += f"- {decision}\n"
513          else:
514              content += "*No explicit decisions detected*\n"
515  
516          content += f"""
517  ---
518  
519  ## Session Close State
520  
521  This file was auto-updated at session close to prevent Phoenix staleness.
522  
523  **What happened:**
524  - Session `{session_id}` closed
525  - Phoenix state extracted from transcript
526  - LIVE-COMPRESSION.md updated (this file)
527  - Hypercore export attempted (if daemon running)
528  
529  **Next session should:**
530  1. Run `python3 scripts/phoenix_hygiene.py` to verify state
531  2. Review this auto-generated state
532  3. Update with actual session focus and thread map
533  
534  ---
535  
536  ## Resurrection Seed
537  
538  ```yaml
539  context:
540    session: {session_id}
541    date: {date_str}
542    auto_generated: true
543    close_time: {timestamp}
544  
545  topics: {json.dumps(topics[:10])}
546  
547  altitude: {altitude}
548  
549  open_threads_count: {len(open_threads)}
550  decisions_count: {len(decisions)}
551  
552  note: |
553    This is an auto-generated session close state.
554    The next session should update LIVE-COMPRESSION.md
555    with actual focus and thread map.
556  
557  bootstrap:
558    - Run: python3 scripts/phoenix_hygiene.py
559    - Review: This auto-generated state
560    - Update: With actual session context
561  
562  f: 0.15
563  trust_f: 0.20
564  ```
565  
566  ---
567  
568  *Session close hook | Auto-generated | {timestamp}*
569  """
570  
571          # Write the file
572          LIVE_COMPRESSION_PATH.parent.mkdir(parents=True, exist_ok=True)
573          with open(LIVE_COMPRESSION_PATH, 'w') as f:
574              f.write(content)
575  
576          return True
577  
578      except Exception as e:
579          print(f"Error updating LIVE-COMPRESSION.md: {e}", file=sys.stderr)
580          return False
581  
582  
583  def main():
584      """Main hook entry point."""
585      # Read hook input from stdin
586      try:
587          input_data = json.load(sys.stdin)
588      except:
589          input_data = {}
590  
591      session_id = input_data.get("session_id", f"session-{datetime.now().strftime('%Y%m%d-%H%M%S')}")
592      transcript_path = input_data.get("transcript_path", "")
593  
594      # Generate Phoenix State
595      phoenix_state = None
596      live_compression_updated = False
597  
598      if transcript_path and Path(transcript_path).exists():
599          phoenix_state = generate_phoenix_state(session_id, transcript_path)
600      else:
601          # Create minimal phoenix state even without transcript
602          # This ensures LIVE-COMPRESSION.md is always updated
603          phoenix_state = {
604              "session_id": session_id,
605              "created": datetime.now().isoformat(),
606              "operator": "rick",
607              "domain": "Sovereign_OS",
608              "operator_altitude": "tactical",
609              "system_altitude": "tactical",
610              "gravity_wells": [],
611              "momentum": {"rising": [], "almost_formed": []},
612              "open_threads": [],
613              "ready_to_implement": [],
614              "paths_not_taken": [],
615              "key_files": [],
616              "note": "Minimal state - no transcript available"
617          }
618  
619      # CRITICAL: Update LIVE-COMPRESSION.md FIRST (local, always works)
620      # This prevents overnight staleness even if Hypercore daemon is down
621      live_compression_updated = update_live_compression(
622          session_id, phoenix_state, transcript_path
623      )
624      if live_compression_updated:
625          print(f"LIVE-COMPRESSION.md updated: {session_id}", file=sys.stderr)
626      else:
627          print(f"WARNING: Failed to update LIVE-COMPRESSION.md", file=sys.stderr)
628  
629      # Store to Hypercore (only if we have a full phoenix state with transcript data)
630      if transcript_path and Path(transcript_path).exists():
631          # Store to Hypercore
632          result = fetch_json("POST", "/phoenix", {
633              "sessionId": session_id,
634              "state": phoenix_state
635          })
636  
637          if result.get("success"):
638              print(f"Phoenix state stored: {session_id}", file=sys.stderr)
639  
640          # Sync topics as attractors
641          topics = phoenix_state.get("gravity_wells", [])
642          for gw in topics:
643              topic = gw.get("concept", "")
644              if topic:
645                  fetch_json("POST", "/topic", {
646                      "topic": topic,
647                      "sessionId": session_id,
648                      "strength": gw.get("resonance", 0.5)
649                  })
650  
651          # Write session close event
652          fetch_json("POST", "/event", {
653              "type": "session_close",
654              "sessionId": session_id,
655              "topics": [gw.get("concept") for gw in topics],
656              "altitude": phoenix_state.get("operator_altitude"),
657              "decisions_count": len(phoenix_state.get("ready_to_implement", [])),
658              "open_threads_count": len(phoenix_state.get("open_threads", []))
659          })
660  
661      # Load session costs
662      costs = load_session_costs()
663  
664      # Estimate session duration from transcript timestamps
665      session_hours = 1.0  # Default
666      if transcript_path and Path(transcript_path).exists():
667          try:
668              with open(transcript_path, 'r') as f:
669                  lines = f.readlines()
670                  if len(lines) >= 2:
671                      first = json.loads(lines[0])
672                      last = json.loads(lines[-1])
673                      start_ts = first.get("timestamp", "")
674                      end_ts = last.get("timestamp", "")
675                      if start_ts and end_ts:
676                          start = datetime.fromisoformat(start_ts.replace("Z", "+00:00"))
677                          end = datetime.fromisoformat(end_ts.replace("Z", "+00:00"))
678                          session_hours = max(0.1, (end - start).total_seconds() / 3600)
679          except:
680              pass
681  
682      # Calculate session economics
683      sats_per_hour = costs.get("sats_per_hour", 172110)
684      btc_price = costs.get("btc_price", 100000)
685      attention_sats = int(session_hours * sats_per_hour)
686      attention_usd = round(attention_sats / 100_000_000 * btc_price, 2)
687      total_sats = attention_sats + 3333
688      total_usd = attention_usd + 3.33
689  
690      # ═══════════════════════════════════════════════════════════════════════
691      # BELT AND SUSPENDERS ENFORCEMENT
692      # ═══════════════════════════════════════════════════════════════════════
693      # Belt = CLAUDE.md documentation says to do these things
694      # Suspenders = This hook FORCES them to happen
695  
696      # 1. MESH HEALTH CHECK (streaming bus enforcement)
697      mesh_status = get_mesh_status()
698      mesh_online = mesh_status.get("online", False)
699      mesh_peers = mesh_status.get("peers", 0)
700  
701      # 2. SESSION REPORT (Obsidian documentation enforcement)
702      report_generated = run_session_report()
703      if report_generated:
704          print("Session report generated (belt+suspenders)", file=sys.stderr)
705      else:
706          print("WARNING: Session report failed - manual report needed", file=sys.stderr)
707  
708      # 3. GRAPH UPDATE CHECK (Obsidian graph enforcement)
709      graph_ok = run_graph_update()
710  
711      # 4. PUBLISH TO MESH (all instances see session close)
712      if mesh_online:
713          publish_to_mesh("session_close", {
714              "session_id": session_id,
715              "duration_hours": session_hours,
716              "attention_sats": attention_sats,
717              "report_generated": report_generated,
718              "live_compression_updated": live_compression_updated,
719              "topics": [gw.get("concept") for gw in phoenix_state.get("gravity_wells", [])[:10]]
720          })
721      # ═══════════════════════════════════════════════════════════════════════
722  
723      # Output summary (visible to user)
724      output_lines = [
725          "",
726          "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━",
727          "SESSION CLOSED",
728          "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━",
729          "",
730          f"Session: {session_id}",
731          f"Duration: {session_hours:.1f} hours",
732          "",
733          "₿ SESSION ECONOMICS:",
734          f"  Attention:  {attention_sats:,} sats (${attention_usd:,.2f})",
735          f"  Claude:     3,333 sats ($3.33)",
736          f"  TOTAL:      {total_sats:,} sats (${total_usd:,.2f})",
737          "",
738      ]
739  
740      # LIVE-COMPRESSION status (primary)
741      if live_compression_updated:
742          output_lines.append(f"Phoenix: LIVE-COMPRESSION.md updated")
743      else:
744          output_lines.append(f"Phoenix: FAILED (check manually)")
745  
746      if phoenix_state:
747          topics = [gw.get("concept") for gw in phoenix_state.get("gravity_wells", [])[:5]]
748          if topics:
749              output_lines.append(f"Topics: {', '.join(topics)}")
750  
751          open_count = len(phoenix_state.get("open_threads", []))
752          decision_count = len(phoenix_state.get("ready_to_implement", []))
753          output_lines.append(f"Open threads: {open_count} | Decisions: {decision_count}")
754  
755      # Check P2P status
756      status = fetch_json("GET", "/status", None)
757      peers = status.get("peers", 0)
758      if peers > 0:
759          output_lines.append(f"P2P: Replicated to {peers} peer(s)")
760  
761      # 5. GIT AUTO-PUSH (sync code on session close)
762      git_pushed = git_auto_push()
763  
764      # Belt+Suspenders status
765      output_lines.append("")
766      output_lines.append("ENFORCEMENT (Belt+Suspenders):")
767      output_lines.append(f"  Session Report: {'✓' if report_generated else '✗ MANUAL NEEDED'}")
768      output_lines.append(f"  Mesh Bus:       {'✓ ' + str(mesh_peers) + ' peers' if mesh_online else '✗ OFFLINE'}")
769      output_lines.append(f"  Graph Status:   {'✓' if graph_ok else '✗ CHECK NEEDED'}")
770      output_lines.append(f"  Git Push:       {'✓' if git_pushed else '✗ CHECK NEEDED'}")
771  
772      output_lines.append("")
773      output_lines.append("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
774      output_lines.append("")
775  
776      print("\n".join(output_lines))
777  
778  
779  if __name__ == "__main__":
780      main()