artifact_analyzer.py
1 #!/usr/bin/env python3 2 """ 3 Artifact Analyzer - Extract and analyze Claude.ai artifacts as signal markers. 4 5 Artifacts represent crystallization points - moments where thinking became 6 concrete enough to externalize. They are: 7 1. High-weight signal markers (end of theoretical OODA loops) 8 2. Correlation anchors (what was happening when this crystallized?) 9 3. Content to deconstruct (what insights are in the artifact itself?) 10 11 Data sources: 12 - artifacts.json from Claude export (conversation + code blocks) 13 - Raw artifact text files (extracted content) 14 - Conversation context (what led to artifact creation) 15 16 Usage: 17 analyzer = ArtifactAnalyzer() 18 report = analyzer.analyze(hours_back=168) 19 """ 20 21 import json 22 import re 23 from dataclasses import dataclass, field 24 from datetime import datetime, timedelta 25 from pathlib import Path 26 from typing import Dict, List, Optional, Set, Any, Tuple 27 from collections import defaultdict 28 29 from .signal_words import SignalWordDetector, detect_signals, SignalDetection 30 31 32 @dataclass 33 class Artifact: 34 """A single artifact from Claude.ai.""" 35 id: str 36 conversation_name: str 37 created_at: datetime 38 updated_at: datetime 39 message_count: int 40 41 # Content 42 code_blocks: List[Dict[str, Any]] = field(default_factory=list) 43 total_content_length: int = 0 44 45 # Analysis 46 signals: Optional[SignalDetection] = None 47 weight: float = 1.0 48 topics: Set[str] = field(default_factory=set) 49 languages: Set[str] = field(default_factory=set) 50 51 # Context 52 precursor_messages: List[str] = field(default_factory=list) # What led to this 53 artifact_type: str = "unknown" # code, document, diagram, data 54 55 @property 56 def duration_hours(self) -> float: 57 """How long was this artifact being worked on?""" 58 return (self.updated_at - self.created_at).total_seconds() / 3600 59 60 61 @dataclass 62 class CrystallizationMoment: 63 """A moment when thinking crystallized into an artifact.""" 64 artifact: Artifact 65 timestamp: datetime 66 67 # Concurrent context 68 browser_context: List[str] = field(default_factory=list) # What was being researched 69 voice_context: List[str] = field(default_factory=list) # What was being said 70 session_context: List[str] = field(default_factory=list) # Claude Code work 71 72 # Analysis 73 confidence: float = 0.0 74 topics: Set[str] = field(default_factory=set) 75 76 77 @dataclass 78 class ArtifactReport: 79 """Report from artifact analysis.""" 80 artifacts_analyzed: int = 0 81 time_window_hours: float = 0.0 82 83 # High-value findings 84 crystallization_moments: List[CrystallizationMoment] = field(default_factory=list) 85 high_weight_artifacts: List[Artifact] = field(default_factory=list) 86 unclosed_loops: List[Artifact] = field(default_factory=list) # Created but not iterated 87 88 # Patterns 89 artifact_types: Dict[str, int] = field(default_factory=dict) 90 topic_to_artifacts: Dict[str, List[str]] = field(default_factory=dict) 91 creation_times: List[Tuple[int, int]] = field(default_factory=list) # (hour, count) 92 93 # Content analysis 94 principles_in_artifacts: List[Dict[str, Any]] = field(default_factory=list) 95 decisions_in_artifacts: List[Dict[str, Any]] = field(default_factory=list) 96 97 def to_markdown(self) -> str: 98 lines = [ 99 "# Artifact Analysis Report", 100 f"*{self.artifacts_analyzed} artifacts, {self.time_window_hours:.0f} hours*", 101 "", 102 ] 103 104 # Crystallization moments 105 if self.crystallization_moments: 106 lines.append("## Crystallization Moments") 107 lines.append("*Times when thinking became concrete*") 108 for cm in self.crystallization_moments[:10]: 109 lines.append(f"\n### {cm.artifact.conversation_name}") 110 lines.append(f"- Created: {cm.timestamp.strftime('%Y-%m-%d %H:%M')}") 111 lines.append(f"- Duration: {cm.artifact.duration_hours:.1f} hours") 112 lines.append(f"- Weight: {cm.artifact.weight:.2f}") 113 if cm.browser_context: 114 lines.append(f"- Researching: {', '.join(cm.browser_context[:3])}") 115 if cm.voice_context: 116 lines.append(f"- Voice: {cm.voice_context[0][:100]}...") 117 lines.append("") 118 119 # Unclosed loops 120 if self.unclosed_loops: 121 lines.append("## Unclosed Loops") 122 lines.append("*Artifacts created but never revisited*") 123 for art in self.unclosed_loops[:10]: 124 lines.append(f"- **{art.conversation_name}** ({art.created_at.strftime('%Y-%m-%d')})") 125 lines.append(f" Weight: {art.weight:.2f}, Topics: {', '.join(list(art.topics)[:5])}") 126 lines.append("") 127 128 # Principles found 129 if self.principles_in_artifacts: 130 lines.append("## Principles in Artifacts") 131 for p in self.principles_in_artifacts[:10]: 132 lines.append(f"- [{p['artifact']}] {p['content'][:150]}...") 133 lines.append("") 134 135 # Topic distribution 136 if self.topic_to_artifacts: 137 lines.append("## Topics → Artifacts") 138 for topic, arts in sorted(self.topic_to_artifacts.items(), 139 key=lambda x: -len(x[1]))[:15]: 140 lines.append(f"- **{topic}**: {len(arts)} artifacts") 141 142 return '\n'.join(lines) 143 144 145 class ArtifactAnalyzer: 146 """ 147 Analyzes Claude.ai artifacts as crystallization markers. 148 149 Artifacts are high-signal events - they represent moments where 150 thinking became concrete enough to externalize. 151 """ 152 153 # Default paths 154 DEFAULT_ARTIFACTS_JSON = Path.home() / 'Library/Mobile Documents/iCloud~md~obsidian/Documents/Sovereign_Estate/archive/claude-artifacts/artifacts.json' 155 DEFAULT_ARTIFACTS_DIR = Path.home() / 'repos/Cerf-Meta/archive/claude-artifacts' 156 157 # Artifact type detection 158 TYPE_PATTERNS = { 159 'code': [r'\.py$', r'\.js$', r'\.ts$', r'function\s+\w+', r'class\s+\w+', r'def\s+\w+'], 160 'document': [r'^#\s+', r'\*\*.*\*\*', r'##\s+', r'- \[', r'\n\n'], 161 'diagram': [r'[┌┐└┘│─]', r'─>', r'→', r'\|', r'```mermaid'], 162 'data': [r'\{.*\}', r'\[.*\]', r':\s*\d+', r'JSON', r'\.json'], 163 'task_list': [r'- \[ \]', r'- \[x\]', r'##.*Tasks', r'Acceptance Criteria'], 164 } 165 166 def __init__( 167 self, 168 artifacts_json_path: Optional[str] = None, 169 artifacts_dir: Optional[str] = None 170 ): 171 self.artifacts_json = Path(artifacts_json_path) if artifacts_json_path else self.DEFAULT_ARTIFACTS_JSON 172 self.artifacts_dir = Path(artifacts_dir) if artifacts_dir else self.DEFAULT_ARTIFACTS_DIR 173 174 self.signal_detector = SignalWordDetector() 175 176 # State 177 self._artifacts: List[Artifact] = [] 178 self._crystallization_moments: List[CrystallizationMoment] = [] 179 180 def analyze( 181 self, 182 hours_back: float = 168, 183 min_weight: float = 0.0 184 ) -> ArtifactReport: 185 """ 186 Analyze artifacts from the specified time window. 187 188 Args: 189 hours_back: How far back to look 190 min_weight: Minimum signal weight to include 191 192 Returns: 193 ArtifactReport with findings 194 """ 195 cutoff = datetime.now() - timedelta(hours=hours_back) 196 197 # Load artifacts 198 self._load_artifacts(cutoff) 199 200 # Analyze each artifact 201 for artifact in self._artifacts: 202 self._analyze_artifact(artifact) 203 204 # Filter by weight 205 filtered = [a for a in self._artifacts if a.weight >= min_weight] 206 207 # Build report 208 return self._build_report(filtered, hours_back) 209 210 def _load_artifacts(self, cutoff: datetime): 211 """Load artifacts from JSON file.""" 212 self._artifacts = [] 213 214 if not self.artifacts_json.exists(): 215 return 216 217 try: 218 data = json.loads(self.artifacts_json.read_text()) 219 220 for item in data: 221 # Parse timestamps 222 created_str = item.get('created_at', '') 223 updated_str = item.get('updated_at', '') 224 225 try: 226 created = datetime.fromisoformat(created_str.replace('Z', '+00:00')) 227 updated = datetime.fromisoformat(updated_str.replace('Z', '+00:00')) 228 229 # Remove timezone for comparison 230 created = created.replace(tzinfo=None) 231 updated = updated.replace(tzinfo=None) 232 except: 233 continue 234 235 if created < cutoff: 236 continue 237 238 artifact = Artifact( 239 id=item.get('uuid', ''), 240 conversation_name=item.get('name', 'Untitled'), 241 created_at=created, 242 updated_at=updated, 243 message_count=item.get('message_count', 0), 244 code_blocks=item.get('code_blocks', []) 245 ) 246 247 # Calculate total content length 248 for block in artifact.code_blocks: 249 code = block.get('code', '') 250 artifact.total_content_length += len(code) 251 252 # Track languages 253 lang = block.get('language', 'text') 254 if lang and lang != 'text': 255 artifact.languages.add(lang) 256 257 self._artifacts.append(artifact) 258 259 except Exception as e: 260 print(f"[ArtifactAnalyzer] Error loading artifacts: {e}") 261 262 def _analyze_artifact(self, artifact: Artifact): 263 """Analyze a single artifact for signals and content.""" 264 # Combine all code block content 265 all_content = [] 266 for block in artifact.code_blocks: 267 code = block.get('code', '') 268 if code and 'not supported on your current device' not in code: 269 all_content.append(code) 270 271 combined = '\n'.join(all_content) 272 273 if not combined: 274 artifact.weight = 0.5 # Empty artifact 275 return 276 277 # Detect signals 278 artifact.signals = detect_signals(combined) 279 artifact.weight = artifact.signals.weight_modifier 280 281 # Boost for crystallization markers 282 if artifact.total_content_length > 1000: 283 artifact.weight *= 1.2 # Substantial artifact 284 if artifact.duration_hours > 0.5: 285 artifact.weight *= 1.1 # Extended work session 286 if len(artifact.code_blocks) > 5: 287 artifact.weight *= 1.1 # Multiple iterations 288 289 # Extract topics 290 artifact.topics = artifact.signals.suggested_tags.copy() 291 292 # Detect artifact type 293 artifact.artifact_type = self._detect_type(combined) 294 295 # Look for principles and decisions 296 self._extract_principles(artifact, combined) 297 298 def _detect_type(self, content: str) -> str: 299 """Detect the type of artifact based on content patterns.""" 300 type_scores = defaultdict(int) 301 302 for atype, patterns in self.TYPE_PATTERNS.items(): 303 for pattern in patterns: 304 if re.search(pattern, content, re.MULTILINE): 305 type_scores[atype] += 1 306 307 if type_scores: 308 return max(type_scores.items(), key=lambda x: x[1])[0] 309 return 'unknown' 310 311 def _extract_principles(self, artifact: Artifact, content: str): 312 """Extract principles and decisions from artifact content.""" 313 # This will be populated later during report building 314 pass 315 316 def _build_report( 317 self, 318 artifacts: List[Artifact], 319 hours_back: float 320 ) -> ArtifactReport: 321 """Build the analysis report.""" 322 report = ArtifactReport( 323 artifacts_analyzed=len(artifacts), 324 time_window_hours=hours_back 325 ) 326 327 # Sort by weight 328 sorted_artifacts = sorted(artifacts, key=lambda a: -a.weight) 329 330 # High weight artifacts 331 report.high_weight_artifacts = sorted_artifacts[:20] 332 333 # Find unclosed loops (created but not iterated much) 334 for art in artifacts: 335 if art.duration_hours < 0.1 and len(art.code_blocks) <= 2: 336 report.unclosed_loops.append(art) 337 338 # Build topic map 339 for art in artifacts: 340 for topic in art.topics: 341 if topic not in report.topic_to_artifacts: 342 report.topic_to_artifacts[topic] = [] 343 report.topic_to_artifacts[topic].append(art.conversation_name) 344 345 # Count artifact types 346 for art in artifacts: 347 if art.artifact_type not in report.artifact_types: 348 report.artifact_types[art.artifact_type] = 0 349 report.artifact_types[art.artifact_type] += 1 350 351 # Extract principles from artifacts 352 for art in sorted_artifacts[:30]: 353 for block in art.code_blocks: 354 code = block.get('code', '') 355 if not code: 356 continue 357 358 # Look for principle patterns 359 principle_patterns = [ 360 r'(?:principle|rule|invariant|always|never)[:]\s*(.{50,200})', 361 r'##\s*(?:Principle|Core|Rule).*?\n(.{50,200})', 362 ] 363 364 for pattern in principle_patterns: 365 matches = re.findall(pattern, code, re.IGNORECASE) 366 for match in matches: 367 report.principles_in_artifacts.append({ 368 'artifact': art.conversation_name, 369 'content': match.strip(), 370 'timestamp': art.created_at.isoformat() 371 }) 372 373 # Look for decision patterns 374 decision_patterns = [ 375 r'(?:decision|decided|chose|selecting)[:]\s*(.{30,200})', 376 r'D\d+[:.-]\s*(.{30,200})', 377 ] 378 379 for pattern in decision_patterns: 380 matches = re.findall(pattern, code, re.IGNORECASE) 381 for match in matches: 382 report.decisions_in_artifacts.append({ 383 'artifact': art.conversation_name, 384 'content': match.strip(), 385 'timestamp': art.created_at.isoformat() 386 }) 387 388 # Create crystallization moments for high-weight artifacts 389 for art in sorted_artifacts[:20]: 390 moment = CrystallizationMoment( 391 artifact=art, 392 timestamp=art.created_at, 393 topics=art.topics 394 ) 395 # Note: browser/voice context would be added by correlator 396 report.crystallization_moments.append(moment) 397 398 return report 399 400 def correlate_with_sources( 401 self, 402 browser_events: List[Any], 403 voice_events: List[Any], 404 session_events: List[Any], 405 window_seconds: float = 300 406 ): 407 """ 408 Correlate artifacts with concurrent events from other sources. 409 410 This enriches crystallization moments with context about what 411 was happening when the artifact was created. 412 """ 413 window = timedelta(seconds=window_seconds) 414 415 for moment in self._crystallization_moments: 416 start = moment.timestamp - window 417 end = moment.timestamp + window 418 419 # Find concurrent browser activity 420 for event in browser_events: 421 if hasattr(event, 'timestamp'): 422 if start <= event.timestamp <= end: 423 if hasattr(event, 'browser_title') and event.browser_title: 424 moment.browser_context.append(event.browser_title) 425 426 # Find concurrent voice activity 427 for event in voice_events: 428 if hasattr(event, 'timestamp'): 429 if start <= event.timestamp <= end: 430 if hasattr(event, 'content') and event.content: 431 moment.voice_context.append(event.content) 432 433 434 def analyze_artifacts(hours_back: float = 168) -> ArtifactReport: 435 """Quick artifact analysis.""" 436 analyzer = ArtifactAnalyzer() 437 return analyzer.analyze(hours_back=hours_back) 438 439 440 if __name__ == '__main__': 441 print("=== Artifact Analyzer ===\n") 442 443 analyzer = ArtifactAnalyzer() 444 report = analyzer.analyze(hours_back=336) # 2 weeks 445 446 print(report.to_markdown())