10_Agent_System.py
1 """ 2 Agent System Monitoring Dashboard 3 4 Real-time monitoring of the multi-agent system: 5 - Task queue depths per agent 6 - Agent status and health 7 - Workflow execution history 8 - Cost tracking and nightmare scenario detection 9 - Human review queue 10 """ 11 12 import streamlit as st 13 import sqlite3 14 import pandas as pd 15 from datetime import datetime, timedelta 16 import plotly.express as px 17 import plotly.graph_objects as go 18 from pathlib import Path 19 import sys 20 21 # Add parent directory to path 22 sys.path.insert(0, str(Path(__file__).parent.parent)) 23 from config import get_db_connection 24 25 st.set_page_config(page_title="Agent System", page_icon="đ¤", layout="wide") 26 27 # Get database connection 28 conn = get_db_connection() 29 30 st.title("đ¤ Agent System Monitoring") 31 32 # Check if agent system is enabled 33 try: 34 agent_enabled = conn.execute( 35 "SELECT COUNT(*) FROM agent_state" 36 ).fetchone()[0] > 0 37 except: 38 agent_enabled = False 39 40 if not agent_enabled: 41 st.warning("â ī¸ Agent system not initialized. Run migration 047 to create agent tables.") 42 st.stop() 43 44 # --- ALERT SECTION: Nightmare Scenario Detection --- 45 st.header("đ¨ System Health Alerts") 46 47 alert_col1, alert_col2, alert_col3 = st.columns(3) 48 49 # Check for high cost (nightmare scenario) 50 hourly_cost = conn.execute(""" 51 SELECT COUNT(*) as invocations, 52 COUNT(*) * 0.015 as estimated_cost_usd 53 FROM agent_logs 54 WHERE created_at > datetime('now', '-1 hour') 55 AND log_level = 'INFO' 56 AND message LIKE '%agent initialized%' 57 """).fetchone() 58 59 with alert_col1: 60 if hourly_cost[1] > 1.0: # More than $1/hour 61 st.error(f"đ¸ High Cost Alert: ${hourly_cost[1]:.2f}/hour ({hourly_cost[0]} invocations)") 62 else: 63 st.success(f"đ° Cost OK: ${hourly_cost[1]:.2f}/hour") 64 65 # Check for circuit breakers 66 circuit_breakers = conn.execute(""" 67 SELECT agent_name, status, metrics_json 68 FROM agent_state 69 WHERE status = 'blocked' 70 """).fetchall() 71 72 with alert_col2: 73 if circuit_breakers: 74 st.error(f"đ´ {len(circuit_breakers)} agent(s) blocked by circuit breaker") 75 else: 76 st.success("đĸ No circuit breakers triggered") 77 78 # Check for stale tasks (pending >1 hour) 79 stale_tasks = conn.execute(""" 80 SELECT COUNT(*) as count 81 FROM agent_tasks 82 WHERE status = 'pending' 83 AND created_at < datetime('now', '-1 hour') 84 """).fetchone()[0] 85 86 with alert_col3: 87 if stale_tasks > 0: 88 st.warning(f"â° {stale_tasks} stale task(s) (pending >1 hour)") 89 else: 90 st.success("â No stale tasks") 91 92 # --- AGENT STATUS OVERVIEW --- 93 st.header("đ Agent Status Overview") 94 95 agent_status = pd.read_sql_query(""" 96 SELECT 97 agent_name, 98 status, 99 current_task_id, 100 last_active, 101 metrics_json 102 FROM agent_state 103 ORDER BY agent_name 104 """, conn) 105 106 status_col1, status_col2 = st.columns(2) 107 108 with status_col1: 109 st.subheader("Agent Status") 110 for _, agent in agent_status.iterrows(): 111 status_icon = "đĸ" if agent['status'] == 'idle' else "đ´" if agent['status'] == 'blocked' else "đĄ" 112 task_info = f" (Task #{agent['current_task_id']})" if agent['current_task_id'] else "" 113 st.write(f"{status_icon} **{agent['agent_name'].title()}**: {agent['status']}{task_info}") 114 115 with status_col2: 116 st.subheader("Last Active") 117 for _, agent in agent_status.iterrows(): 118 st.write(f"**{agent['agent_name'].title()}**: {agent['last_active'] or 'Never'}") 119 120 # --- TASK QUEUE VISUALIZATION --- 121 st.header("đ Task Queue Depths") 122 123 queue_data = pd.read_sql_query(""" 124 SELECT 125 assigned_to as agent, 126 status, 127 COUNT(*) as count 128 FROM agent_tasks 129 GROUP BY assigned_to, status 130 ORDER BY assigned_to, status 131 """, conn) 132 133 if not queue_data.empty: 134 # Pivot for stacked bar chart 135 queue_pivot = queue_data.pivot(index='agent', columns='status', values='count').fillna(0) 136 137 fig = go.Figure() 138 139 colors = { 140 'pending': '#FFA500', 141 'running': '#4169E1', 142 'completed': '#32CD32', 143 'failed': '#DC143C', 144 'blocked': '#8B0000' 145 } 146 147 for status in ['pending', 'running', 'blocked', 'failed', 'completed']: 148 if status in queue_pivot.columns: 149 fig.add_trace(go.Bar( 150 name=status.title(), 151 x=queue_pivot.index, 152 y=queue_pivot[status], 153 marker_color=colors.get(status, '#808080') 154 )) 155 156 fig.update_layout( 157 barmode='stack', 158 title='Task Queue by Agent', 159 xaxis_title='Agent', 160 yaxis_title='Number of Tasks', 161 height=400 162 ) 163 164 st.plotly_chart(fig, use_container_width=True) 165 else: 166 st.info("No tasks in queue") 167 168 # Task breakdown table 169 st.subheader("Detailed Queue Breakdown") 170 st.dataframe(queue_data, use_container_width=True) 171 172 # --- WORKFLOW EXECUTION HISTORY --- 173 st.header("đ Recent Workflows") 174 175 workflows = pd.read_sql_query(""" 176 WITH workflow_roots AS ( 177 SELECT id, task_type, assigned_to, status, created_at, completed_at 178 FROM agent_tasks 179 WHERE parent_task_id IS NULL 180 ORDER BY created_at DESC 181 LIMIT 20 182 ), 183 workflow_children AS ( 184 SELECT 185 wr.id as root_id, 186 wr.task_type as workflow_type, 187 wr.created_at as started_at, 188 wr.completed_at as finished_at, 189 COUNT(at.id) as total_steps, 190 SUM(CASE WHEN at.status = 'completed' THEN 1 ELSE 0 END) as completed_steps, 191 SUM(CASE WHEN at.status = 'failed' THEN 1 ELSE 0 END) as failed_steps, 192 SUM(CASE WHEN at.status = 'blocked' THEN 1 ELSE 0 END) as blocked_steps 193 FROM workflow_roots wr 194 LEFT JOIN agent_tasks at ON ( 195 at.id = wr.id OR 196 at.parent_task_id = wr.id OR 197 at.parent_task_id IN ( 198 SELECT id FROM agent_tasks WHERE parent_task_id = wr.id 199 ) 200 ) 201 GROUP BY wr.id 202 ) 203 SELECT * FROM workflow_children 204 ORDER BY started_at DESC 205 """, conn) 206 207 if not workflows.empty: 208 workflows['progress'] = (workflows['completed_steps'] / workflows['total_steps'] * 100).round(1) 209 workflows['status'] = workflows.apply( 210 lambda row: 'â Complete' if row['completed_steps'] == row['total_steps'] 211 else 'â Failed' if row['failed_steps'] > 0 212 else 'đ´ Blocked' if row['blocked_steps'] > 0 213 else 'đ In Progress', 214 axis=1 215 ) 216 217 st.dataframe( 218 workflows[['workflow_type', 'started_at', 'status', 'progress', 'total_steps', 'completed_steps', 'failed_steps']], 219 use_container_width=True 220 ) 221 else: 222 st.info("No workflow history yet") 223 224 # --- HUMAN REVIEW QUEUE --- 225 st.header("đ¤ Human Review Queue") 226 227 review_queue = pd.read_sql_query(""" 228 SELECT 229 id, 230 review_type, 231 title, 232 priority, 233 status, 234 created_at, 235 reviewed_at 236 FROM human_review_queue 237 WHERE status = 'pending' 238 ORDER BY priority DESC, created_at ASC 239 LIMIT 50 240 """, conn) 241 242 if not review_queue.empty: 243 st.warning(f"â ī¸ {len(review_queue)} item(s) awaiting your review") 244 245 for _, item in review_queue.iterrows(): 246 priority_color = "đ´" if item['priority'] >= 8 else "đĄ" if item['priority'] >= 5 else "đĸ" 247 with st.expander(f"{priority_color} [{item['review_type']}] {item['title']} (Priority: {item['priority']})"): 248 st.write(f"**ID**: {item['id']}") 249 st.write(f"**Created**: {item['created_at']}") 250 251 # Fetch full context 252 full_item = conn.execute( 253 "SELECT context_json, notes FROM human_review_queue WHERE id = ?", 254 (item['id'],) 255 ).fetchone() 256 257 if full_item[0]: 258 st.json(full_item[0]) 259 260 if full_item[1]: 261 st.write(f"**Notes**: {full_item[1]}") 262 263 col1, col2, col3 = st.columns(3) 264 with col1: 265 if st.button(f"Approve #{item['id']}", key=f"approve_{item['id']}"): 266 conn.execute( 267 "UPDATE human_review_queue SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?", 268 (item['id'],) 269 ) 270 conn.commit() 271 st.rerun() 272 273 with col2: 274 if st.button(f"Reject #{item['id']}", key=f"reject_{item['id']}"): 275 conn.execute( 276 "UPDATE human_review_queue SET status = 'rejected', reviewed_at = datetime('now') WHERE id = ?", 277 (item['id'],) 278 ) 279 conn.commit() 280 st.rerun() 281 282 with col3: 283 if st.button(f"Defer #{item['id']}", key=f"defer_{item['id']}"): 284 # Lower priority and skip for now 285 conn.execute( 286 "UPDATE human_review_queue SET priority = priority - 1 WHERE id = ?", 287 (item['id'],) 288 ) 289 conn.commit() 290 st.rerun() 291 else: 292 st.success("â No pending reviews!") 293 294 # --- COST TRACKING --- 295 st.header("đ° Cost Tracking (Last 24 Hours)") 296 297 cost_data = pd.read_sql_query(""" 298 SELECT 299 DATE(created_at) as date, 300 strftime('%H', created_at) as hour, 301 COUNT(*) as invocations, 302 COUNT(*) * 0.015 as estimated_cost_usd 303 FROM agent_logs 304 WHERE created_at > datetime('now', '-24 hours') 305 AND log_level = 'INFO' 306 AND message LIKE '%agent initialized%' 307 GROUP BY date, hour 308 ORDER BY date DESC, hour DESC 309 """, conn) 310 311 if not cost_data.empty: 312 cost_data['hour'] = cost_data['hour'].astype(int) 313 cost_data['datetime'] = pd.to_datetime(cost_data['date'] + ' ' + cost_data['hour'].astype(str) + ':00:00') 314 315 fig = go.Figure() 316 fig.add_trace(go.Scatter( 317 x=cost_data['datetime'], 318 y=cost_data['estimated_cost_usd'], 319 mode='lines+markers', 320 name='Cost per Hour', 321 line=dict(color='#DC143C', width=2) 322 )) 323 324 fig.update_layout( 325 title='Agent System Cost Over Time', 326 xaxis_title='Time', 327 yaxis_title='Cost (USD)', 328 height=300 329 ) 330 331 st.plotly_chart(fig, use_container_width=True) 332 333 total_cost = cost_data['estimated_cost_usd'].sum() 334 total_invocations = cost_data['invocations'].sum() 335 st.metric("Total Cost (24h)", f"${total_cost:.2f}", f"{total_invocations} invocations") 336 else: 337 st.info("No cost data yet") 338 339 # --- AGENT PERFORMANCE METRICS --- 340 st.header("đ Agent Performance (Last 7 Days)") 341 342 performance = pd.read_sql_query(""" 343 SELECT 344 assigned_to as agent, 345 COUNT(*) as total_tasks, 346 SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed, 347 SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed, 348 SUM(CASE WHEN status = 'blocked' THEN 1 ELSE 0 END) as blocked, 349 ROUND(AVG(CASE 350 WHEN status = 'completed' AND started_at IS NOT NULL AND completed_at IS NOT NULL 351 THEN (julianday(completed_at) - julianday(started_at)) * 24 * 60 352 ELSE NULL 353 END), 2) as avg_completion_time_minutes 354 FROM agent_tasks 355 WHERE created_at > datetime('now', '-7 days') 356 GROUP BY assigned_to 357 ORDER BY total_tasks DESC 358 """, conn) 359 360 if not performance.empty: 361 performance['success_rate'] = (performance['completed'] / performance['total_tasks'] * 100).round(1) 362 363 st.dataframe( 364 performance[['agent', 'total_tasks', 'completed', 'failed', 'blocked', 'success_rate', 'avg_completion_time_minutes']], 365 use_container_width=True 366 ) 367 368 # Success rate chart 369 fig = px.bar( 370 performance, 371 x='agent', 372 y='success_rate', 373 title='Success Rate by Agent', 374 color='success_rate', 375 color_continuous_scale='RdYlGn', 376 range_color=[0, 100] 377 ) 378 fig.update_layout(height=300) 379 st.plotly_chart(fig, use_container_width=True) 380 else: 381 st.info("No performance data yet") 382 383 # --- RECENT LOGS --- 384 st.header("đ Recent Agent Logs") 385 386 log_level = st.selectbox("Filter by log level", ["All", "ERROR", "WARN", "INFO", "DEBUG"]) 387 log_agent = st.selectbox("Filter by agent", ["All"] + sorted(agent_status['agent_name'].unique().tolist())) 388 389 log_query = """ 390 SELECT 391 created_at, 392 agent_name, 393 log_level, 394 message, 395 task_id 396 FROM agent_logs 397 WHERE 1=1 398 """ 399 400 params = [] 401 if log_level != "All": 402 log_query += " AND log_level = ?" 403 params.append(log_level) 404 405 if log_agent != "All": 406 log_query += " AND agent_name = ?" 407 params.append(log_agent) 408 409 log_query += " ORDER BY created_at DESC LIMIT 100" 410 411 logs = pd.read_sql_query(log_query, conn, params=params if params else None) 412 413 if not logs.empty: 414 st.dataframe(logs, use_container_width=True, height=400) 415 else: 416 st.info("No logs found") 417 418 # --- GIT REVIEW SECTION --- 419 st.header("đ Agent Changes Review") 420 421 git_col1, git_col2 = st.columns(2) 422 423 with git_col1: 424 st.subheader("Autofix Branch Status") 425 426 try: 427 import subprocess 428 429 # Check if autofix branch exists 430 branches = subprocess.run( 431 ["git", "branch", "-a"], 432 capture_output=True, 433 text=True, 434 cwd=str(Path(__file__).parent.parent) 435 ).stdout 436 437 has_autofix = "autofix" in branches 438 439 if has_autofix: 440 # Get commit count ahead of main 441 ahead_count = subprocess.run( 442 ["git", "rev-list", "--count", "main..autofix"], 443 capture_output=True, 444 text=True, 445 cwd=str(Path(__file__).parent.parent) 446 ).stdout.strip() 447 448 if int(ahead_count) > 0: 449 st.warning(f"â ī¸ Autofix branch is {ahead_count} commit(s) ahead of main") 450 451 # Show recent commits 452 log_output = subprocess.run( 453 ["git", "log", "main..autofix", "--oneline", "-10"], 454 capture_output=True, 455 text=True, 456 cwd=str(Path(__file__).parent.parent) 457 ).stdout 458 459 st.code(log_output or "No commits", language="bash") 460 461 # Show diff stats 462 diff_stats = subprocess.run( 463 ["git", "diff", "main...autofix", "--stat"], 464 capture_output=True, 465 text=True, 466 cwd=str(Path(__file__).parent.parent) 467 ).stdout 468 469 if diff_stats: 470 with st.expander("đ View File Changes"): 471 st.code(diff_stats, language="diff") 472 473 # Show full diff 474 diff_output = subprocess.run( 475 ["git", "diff", "main...autofix"], 476 capture_output=True, 477 text=True, 478 cwd=str(Path(__file__).parent.parent) 479 ).stdout 480 481 if diff_output: 482 with st.expander("đ View Full Diff"): 483 st.code(diff_output[:10000], language="diff") # Limit to 10KB 484 if len(diff_output) > 10000: 485 st.caption(f"Diff truncated (showing first 10KB of {len(diff_output)} bytes)") 486 else: 487 st.success("â Autofix branch is up to date with main") 488 else: 489 st.info("âšī¸ No autofix branch yet - agents will create it on first commit") 490 491 except Exception as e: 492 st.error(f"Failed to check git status: {e}") 493 494 with git_col2: 495 st.subheader("Review Actions") 496 497 if has_autofix and int(ahead_count or 0) > 0: 498 st.write("**Merge autofix â main**") 499 500 if st.button("â Approve & Merge", type="primary"): 501 try: 502 # Merge autofix to main 503 result = subprocess.run( 504 ["git", "checkout", "main"], 505 capture_output=True, 506 text=True, 507 cwd=str(Path(__file__).parent.parent), 508 check=True 509 ) 510 result = subprocess.run( 511 ["git", "merge", "autofix", "--no-ff", "-m", "Merge autofix branch - agent changes approved"], 512 capture_output=True, 513 text=True, 514 cwd=str(Path(__file__).parent.parent), 515 check=True 516 ) 517 st.success(f"â Merged successfully! {result.stdout}") 518 st.rerun() 519 except subprocess.CalledProcessError as e: 520 st.error(f"Merge failed: {e.stderr}") 521 522 if st.button("â Reject & Reset"): 523 try: 524 # Reset autofix branch to main 525 result = subprocess.run( 526 ["git", "checkout", "autofix"], 527 capture_output=True, 528 text=True, 529 cwd=str(Path(__file__).parent.parent), 530 check=True 531 ) 532 result = subprocess.run( 533 ["git", "reset", "--hard", "main"], 534 capture_output=True, 535 text=True, 536 cwd=str(Path(__file__).parent.parent), 537 check=True 538 ) 539 st.success("â Autofix branch reset to main") 540 st.rerun() 541 except subprocess.CalledProcessError as e: 542 st.error(f"Reset failed: {e.stderr}") 543 544 st.write("**Or use terminal:**") 545 st.code(""" 546 git checkout main 547 git merge autofix --no-ff 548 git push origin main 549 """, language="bash") 550 else: 551 st.info("No changes to review yet") 552 553 # --- CONFIGURATION --- 554 st.header("âī¸ Configuration") 555 556 config_col1, config_col2 = st.columns(2) 557 558 with config_col1: 559 st.subheader("Environment Settings") 560 st.code(""" 561 AGENT_SYSTEM_ENABLED=false (change to true to enable) 562 AGENT_AUTO_COMMIT=false (enable summary commits) 563 AGENT_AUTO_COMMIT_BRANCH=autofix 564 AGENT_AUTO_PUSH=false (push to GitHub) 565 AGENT_PARALLEL_EXECUTION=false (run agents in parallel) 566 AGENT_MAX_TASKS_PER_CYCLE=5 (autoscales up to 20) 567 """) 568 569 with config_col2: 570 st.subheader("Quick Actions") 571 572 if st.button("đ Run Agent Cycle Now"): 573 st.info("Run: `npm run agent:run`") 574 575 if st.button("đ View Agent Stats"): 576 st.info("Run: `npm run agent:stats`") 577 578 if st.button("đ View All Tasks"): 579 st.info("Run: `npm run agent:tasks`") 580 581 # Footer 582 st.divider() 583 st.caption(f"Last updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") 584 585 conn.close()