/ cli / commands / task_cmd / event_recorder.py
event_recorder.py
 1  """
 2  Records SSE events to YAML files for debugging and replay.
 3  """
 4  import yaml
 5  from pathlib import Path
 6  from typing import List, Dict, Any
 7  from dataclasses import dataclass, asdict
 8  from datetime import datetime, timezone
 9  
10  
11  @dataclass
12  class RecordedEvent:
13      """A recorded SSE event with metadata."""
14  
15      sequence: int
16      timestamp: str
17      event_type: str
18      data: Dict[str, Any]
19  
20  
21  class EventRecorder:
22      """
23      Records SSE events to YAML files.
24      """
25  
26      def __init__(self, output_dir: Path):
27          self.output_dir = output_dir
28          self.output_dir.mkdir(parents=True, exist_ok=True)
29          self._events: List[RecordedEvent] = []
30          self._sequence = 0
31  
32      def record_event(self, event_type: str, data: Dict[str, Any]):
33          """Record a single SSE event."""
34          self._sequence += 1
35          event = RecordedEvent(
36              sequence=self._sequence,
37              timestamp=datetime.now(timezone.utc).isoformat(),
38              event_type=event_type,
39              data=data,
40          )
41          self._events.append(event)
42  
43      def save(self, filename: str = "sse_events.yaml") -> Path:
44          """Save all recorded events to a YAML file."""
45          output_path = self.output_dir / filename
46  
47          events_data = {
48              "recorded_at": datetime.now(timezone.utc).isoformat(),
49              "total_events": len(self._events),
50              "events": [asdict(e) for e in self._events],
51          }
52  
53          with open(output_path, "w") as f:
54              yaml.dump(
55                  events_data,
56                  f,
57                  sort_keys=False,
58                  allow_unicode=True,
59                  default_flow_style=False,
60                  width=120,
61              )
62  
63          return output_path
64  
65      def get_events(self) -> List[RecordedEvent]:
66          """Get all recorded events."""
67          return list(self._events)
68  
69      def get_event_count(self) -> int:
70          """Get the count of recorded events."""
71          return len(self._events)