custom_sink.py
1 """ 2 Custom Trace Sink Example 3 4 Demonstrates how to create custom trace sinks for PraisonAI agents. 5 The trace system uses a protocol-driven design - implement 3 methods 6 and your sink works with any agent. 7 8 Protocol: ContextTraceSinkProtocol (AGENTS.md naming: XProtocol for interfaces) 9 - emit(event) - Receive a trace event 10 - flush() - Flush any buffered events 11 - close() - Release resources 12 13 Examples: 14 1. HTTP Sink - Send events to a remote server 15 2. SQLite Sink - Store events in a database 16 3. Console Sink - Pretty-print events to terminal 17 """ 18 19 from praisonaiagents import ( 20 Agent, 21 ContextTraceEmitter, 22 ContextTraceSinkProtocol, # Protocol for type hints (optional) 23 trace_context, 24 ) 25 26 27 # ============================================================================= 28 # Example 1: HTTP Sink - Send events to a remote server 29 # ============================================================================= 30 31 class HTTPSink: 32 """Send trace events to a remote HTTP endpoint.""" 33 34 def __init__(self, url: str, batch_size: int = 10): 35 self.url = url 36 self.batch_size = batch_size 37 self.buffer = [] 38 39 def emit(self, event): 40 """Buffer events and send in batches.""" 41 self.buffer.append(event.to_dict()) 42 if len(self.buffer) >= self.batch_size: 43 self.flush() 44 45 def flush(self): 46 """Send buffered events to server.""" 47 if self.buffer: 48 # In production, use: requests.post(self.url, json=self.buffer) 49 print(f"[HTTP] Would send {len(self.buffer)} events to {self.url}") 50 self.buffer.clear() 51 52 def close(self): 53 """Flush remaining events.""" 54 self.flush() 55 56 57 # ============================================================================= 58 # Example 2: SQLite Sink - Store events in a database 59 # ============================================================================= 60 61 class SQLiteSink: 62 """Store trace events in SQLite database.""" 63 64 def __init__(self, db_path: str = "traces.db"): 65 import sqlite3 66 self.conn = sqlite3.connect(db_path) 67 self.conn.execute(""" 68 CREATE TABLE IF NOT EXISTS events ( 69 id INTEGER PRIMARY KEY, 70 session_id TEXT, 71 event_type TEXT, 72 agent_name TEXT, 73 timestamp REAL, 74 data TEXT 75 ) 76 """) 77 self.conn.commit() 78 79 def emit(self, event): 80 """Insert event into database.""" 81 import json 82 self.conn.execute( 83 "INSERT INTO events (session_id, event_type, agent_name, timestamp, data) VALUES (?, ?, ?, ?, ?)", 84 (event.session_id, event.event_type.value, event.agent_name, event.timestamp, json.dumps(event.data)) 85 ) 86 87 def flush(self): 88 """Commit pending transactions.""" 89 self.conn.commit() 90 91 def close(self): 92 """Commit and close connection.""" 93 self.flush() 94 self.conn.close() 95 96 97 # ============================================================================= 98 # Example 3: Console Sink - Pretty-print events 99 # ============================================================================= 100 101 class ConsoleSink: 102 """Print trace events to console with colors.""" 103 104 COLORS = { 105 "session_start": "\033[92m", # Green 106 "session_end": "\033[92m", 107 "agent_start": "\033[94m", # Blue 108 "agent_end": "\033[94m", 109 "tool_call_start": "\033[93m", # Yellow 110 "tool_call_end": "\033[93m", 111 "llm_request": "\033[95m", # Magenta 112 "llm_response": "\033[95m", 113 } 114 RESET = "\033[0m" 115 116 def emit(self, event): 117 """Print event with color coding.""" 118 color = self.COLORS.get(event.event_type.value, "") 119 print(f"{color}[{event.event_type.value}]{self.RESET} {event.agent_name or 'session'}") 120 121 def flush(self): 122 pass 123 124 def close(self): 125 pass 126 127 128 # ============================================================================= 129 # Usage Examples 130 # ============================================================================= 131 132 def example_http_sink(): 133 """Example: Send events to HTTP endpoint.""" 134 print("\n=== HTTP Sink Example ===") 135 136 sink = HTTPSink(url="https://my-telemetry.example.com/events") 137 emitter = ContextTraceEmitter(sink=sink, session_id="http-demo", enabled=True) 138 139 with trace_context(emitter): 140 # Events automatically go to HTTPSink 141 emitter.session_start({"demo": True}) 142 emitter.agent_start("demo_agent", {"role": "demo"}) 143 emitter.agent_end("demo_agent") 144 emitter.session_end() 145 146 print("HTTP sink example complete") 147 148 149 def example_console_sink(): 150 """Example: Pretty-print events to console.""" 151 print("\n=== Console Sink Example ===") 152 153 sink = ConsoleSink() 154 emitter = ContextTraceEmitter(sink=sink, session_id="console-demo", enabled=True) 155 156 with trace_context(emitter): 157 emitter.session_start({}) 158 emitter.agent_start("researcher", {"role": "Research Assistant"}) 159 emitter.tool_call_start("researcher", "web_search", {"query": "AI trends"}) 160 emitter.tool_call_end("researcher", "web_search", "Found 10 results", 150.0) 161 emitter.llm_request("researcher", "gpt-4o-mini", [{"role": "user", "content": "test"}]) 162 emitter.llm_response("researcher", 100, 500, "stop", "Response text") 163 emitter.agent_end("researcher") 164 emitter.session_end() 165 166 print("Console sink example complete") 167 168 169 def example_with_real_agent(): 170 """Example: Use custom sink with a real agent. 171 172 Note: Requires OPENAI_API_KEY environment variable. 173 """ 174 import os 175 print("\n=== Real Agent with Custom Sink ===") 176 177 # Check for API key 178 if not os.environ.get("OPENAI_API_KEY"): 179 print("Skipping real agent test (no OPENAI_API_KEY)") 180 return 181 182 # Create custom sink 183 sink = ConsoleSink() 184 emitter = ContextTraceEmitter(sink=sink, session_id="real-agent-demo", enabled=True) 185 186 # Use trace_context to capture all agent events 187 with trace_context(emitter): 188 agent = Agent( 189 name="demo_agent", 190 instructions="You are a helpful assistant. Be brief.", 191 llm="gpt-4o-mini", 192 ) 193 194 # This chat will emit events to ConsoleSink 195 result = agent.chat("Say hello in 5 words or less") 196 print(f"\nAgent response: {result}") 197 198 print("Real agent example complete") 199 200 201 if __name__ == "__main__": 202 # Run examples 203 example_http_sink() 204 example_console_sink() 205 example_with_real_agent() 206 207 print("\n✅ All examples complete!") 208 print("\nKey points:") 209 print("1. Implement emit(), flush(), close() - that's it!") 210 print("2. Use trace_context() for automatic cleanup") 211 print("3. Zero overhead when tracing is disabled")