/ examples / trace / custom_sink.py
custom_sink.py
  1  """
  2  Custom Trace Sink Example
  3  
  4  Demonstrates how to create custom trace sinks for PraisonAI agents.
  5  The trace system uses a protocol-driven design - implement 3 methods
  6  and your sink works with any agent.
  7  
  8  Protocol: ContextTraceSinkProtocol (AGENTS.md naming: XProtocol for interfaces)
  9  - emit(event) - Receive a trace event
 10  - flush() - Flush any buffered events  
 11  - close() - Release resources
 12  
 13  Examples:
 14  1. HTTP Sink - Send events to a remote server
 15  2. SQLite Sink - Store events in a database
 16  3. Console Sink - Pretty-print events to terminal
 17  """
 18  
 19  from praisonaiagents import (
 20      Agent,
 21      ContextTraceEmitter,
 22      ContextTraceSinkProtocol,  # Protocol for type hints (optional)
 23      trace_context,
 24  )
 25  
 26  
 27  # =============================================================================
 28  # Example 1: HTTP Sink - Send events to a remote server
 29  # =============================================================================
 30  
 31  class HTTPSink:
 32      """Send trace events to a remote HTTP endpoint."""
 33      
 34      def __init__(self, url: str, batch_size: int = 10):
 35          self.url = url
 36          self.batch_size = batch_size
 37          self.buffer = []
 38      
 39      def emit(self, event):
 40          """Buffer events and send in batches."""
 41          self.buffer.append(event.to_dict())
 42          if len(self.buffer) >= self.batch_size:
 43              self.flush()
 44      
 45      def flush(self):
 46          """Send buffered events to server."""
 47          if self.buffer:
 48              # In production, use: requests.post(self.url, json=self.buffer)
 49              print(f"[HTTP] Would send {len(self.buffer)} events to {self.url}")
 50              self.buffer.clear()
 51      
 52      def close(self):
 53          """Flush remaining events."""
 54          self.flush()
 55  
 56  
 57  # =============================================================================
 58  # Example 2: SQLite Sink - Store events in a database
 59  # =============================================================================
 60  
 61  class SQLiteSink:
 62      """Store trace events in SQLite database."""
 63      
 64      def __init__(self, db_path: str = "traces.db"):
 65          import sqlite3
 66          self.conn = sqlite3.connect(db_path)
 67          self.conn.execute("""
 68              CREATE TABLE IF NOT EXISTS events (
 69                  id INTEGER PRIMARY KEY,
 70                  session_id TEXT,
 71                  event_type TEXT,
 72                  agent_name TEXT,
 73                  timestamp REAL,
 74                  data TEXT
 75              )
 76          """)
 77          self.conn.commit()
 78      
 79      def emit(self, event):
 80          """Insert event into database."""
 81          import json
 82          self.conn.execute(
 83              "INSERT INTO events (session_id, event_type, agent_name, timestamp, data) VALUES (?, ?, ?, ?, ?)",
 84              (event.session_id, event.event_type.value, event.agent_name, event.timestamp, json.dumps(event.data))
 85          )
 86      
 87      def flush(self):
 88          """Commit pending transactions."""
 89          self.conn.commit()
 90      
 91      def close(self):
 92          """Commit and close connection."""
 93          self.flush()
 94          self.conn.close()
 95  
 96  
 97  # =============================================================================
 98  # Example 3: Console Sink - Pretty-print events
 99  # =============================================================================
100  
101  class ConsoleSink:
102      """Print trace events to console with colors."""
103      
104      COLORS = {
105          "session_start": "\033[92m",  # Green
106          "session_end": "\033[92m",
107          "agent_start": "\033[94m",    # Blue
108          "agent_end": "\033[94m",
109          "tool_call_start": "\033[93m", # Yellow
110          "tool_call_end": "\033[93m",
111          "llm_request": "\033[95m",    # Magenta
112          "llm_response": "\033[95m",
113      }
114      RESET = "\033[0m"
115      
116      def emit(self, event):
117          """Print event with color coding."""
118          color = self.COLORS.get(event.event_type.value, "")
119          print(f"{color}[{event.event_type.value}]{self.RESET} {event.agent_name or 'session'}")
120      
121      def flush(self):
122          pass
123      
124      def close(self):
125          pass
126  
127  
128  # =============================================================================
129  # Usage Examples
130  # =============================================================================
131  
132  def example_http_sink():
133      """Example: Send events to HTTP endpoint."""
134      print("\n=== HTTP Sink Example ===")
135      
136      sink = HTTPSink(url="https://my-telemetry.example.com/events")
137      emitter = ContextTraceEmitter(sink=sink, session_id="http-demo", enabled=True)
138      
139      with trace_context(emitter):
140          # Events automatically go to HTTPSink
141          emitter.session_start({"demo": True})
142          emitter.agent_start("demo_agent", {"role": "demo"})
143          emitter.agent_end("demo_agent")
144          emitter.session_end()
145      
146      print("HTTP sink example complete")
147  
148  
149  def example_console_sink():
150      """Example: Pretty-print events to console."""
151      print("\n=== Console Sink Example ===")
152      
153      sink = ConsoleSink()
154      emitter = ContextTraceEmitter(sink=sink, session_id="console-demo", enabled=True)
155      
156      with trace_context(emitter):
157          emitter.session_start({})
158          emitter.agent_start("researcher", {"role": "Research Assistant"})
159          emitter.tool_call_start("researcher", "web_search", {"query": "AI trends"})
160          emitter.tool_call_end("researcher", "web_search", "Found 10 results", 150.0)
161          emitter.llm_request("researcher", "gpt-4o-mini", [{"role": "user", "content": "test"}])
162          emitter.llm_response("researcher", 100, 500, "stop", "Response text")
163          emitter.agent_end("researcher")
164          emitter.session_end()
165      
166      print("Console sink example complete")
167  
168  
169  def example_with_real_agent():
170      """Example: Use custom sink with a real agent.
171      
172      Note: Requires OPENAI_API_KEY environment variable.
173      """
174      import os
175      print("\n=== Real Agent with Custom Sink ===")
176      
177      # Check for API key
178      if not os.environ.get("OPENAI_API_KEY"):
179          print("Skipping real agent test (no OPENAI_API_KEY)")
180          return
181      
182      # Create custom sink
183      sink = ConsoleSink()
184      emitter = ContextTraceEmitter(sink=sink, session_id="real-agent-demo", enabled=True)
185      
186      # Use trace_context to capture all agent events
187      with trace_context(emitter):
188          agent = Agent(
189              name="demo_agent",
190              instructions="You are a helpful assistant. Be brief.",
191              llm="gpt-4o-mini",
192          )
193          
194          # This chat will emit events to ConsoleSink
195          result = agent.chat("Say hello in 5 words or less")
196          print(f"\nAgent response: {result}")
197      
198      print("Real agent example complete")
199  
200  
201  if __name__ == "__main__":
202      # Run examples
203      example_http_sink()
204      example_console_sink()
205      example_with_real_agent()
206      
207      print("\n✅ All examples complete!")
208      print("\nKey points:")
209      print("1. Implement emit(), flush(), close() - that's it!")
210      print("2. Use trace_context() for automatic cleanup")
211      print("3. Zero overhead when tracing is disabled")