main.py
1 """ 2 333 Method Analytics Dashboard — FastAPI Backend 3 4 7 endpoints (one per page) + cache refresh. 5 Cache-first: reads dashboard_cache table, falls back to live PostgreSQL queries. 6 """ 7 8 import json 9 import os 10 import subprocess 11 from pathlib import Path 12 13 from fastapi import FastAPI, HTTPException 14 from fastapi.middleware.cors import CORSMiddleware 15 from fastapi.responses import JSONResponse 16 17 from cache import get_cached, get_many 18 from db import fetchall, fetchone, fetchval 19 20 app = FastAPI(title="333 Method Dashboard API", version="2.0.0") 21 22 app.add_middleware( 23 CORSMiddleware, 24 allow_origins=["http://localhost:5173", "https://dashboard.molecool.org"], 25 allow_methods=["GET", "POST"], 26 allow_headers=["*"], 27 ) 28 29 _PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent 30 CACHE_HEADERS = {"Cache-Control": "max-age=300"} 31 32 33 # ───────────────────────────────────────────────────────────────────────────── 34 # Helper: return cached value or run live query 35 # ───────────────────────────────────────────────────────────────────────────── 36 37 async def cached_or(key: str, live_fn): 38 hit = await get_cached(key) 39 return hit if hit is not None else await live_fn() 40 41 42 # ───────────────────────────────────────────────────────────────────────────── 43 # GET /api/v1/overview (merged Overview + Pipeline) 44 # ───────────────────────────────────────────────────────────────────────────── 45 46 @app.get("/api/v1/overview") 47 async def overview(): 48 cache_hits = await get_many([ 49 "pipeline_funnel", 50 "error_breakdown", 51 "chart_hourly_status_breakdown_48h", 52 "chart_daily_throughput_30d", 53 "cost_forecast", 54 ]) 55 56 async def live_funnel(): 57 return await fetchall( 58 "SELECT status, COUNT(*) AS count FROM sites " 59 "WHERE status NOT IN ('ignore','failing') " 60 "GROUP BY status ORDER BY CASE status " 61 "WHEN 'found' THEN 1 WHEN 'assets_captured' THEN 2 " 62 "WHEN 'scored' THEN 3 WHEN 'rescored' THEN 4 " 63 "WHEN 'enriched' THEN 5 WHEN 'proposals_drafted' THEN 6 " 64 "WHEN 'outreach_sent' THEN 7 ELSE 8 END" 65 ) 66 67 async def live_totals(): 68 return await fetchone( 69 "SELECT " 70 "COUNT(*) AS total, " 71 "SUM(CASE WHEN status='ignore' THEN 1 ELSE 0 END) AS ignored, " 72 "SUM(CASE WHEN status='failing' THEN 1 ELSE 0 END) AS failing " 73 "FROM sites" 74 ) 75 76 async def live_errors(): 77 return await fetchall( 78 "SELECT error_message, status AS stage, COUNT(*) AS count " 79 "FROM sites WHERE error_message IS NOT NULL AND status != 'ignore' " 80 "GROUP BY error_message, status ORDER BY count DESC LIMIT 20" 81 ) 82 83 async def live_stuck(): 84 return await fetchall( 85 "SELECT id, domain, status AS stage, error_message, " 86 "CAST((NOW() - updated_at) AS TEXT) AS age " 87 "FROM sites WHERE error_message IS NOT NULL " 88 "AND updated_at < NOW() - INTERVAL '1 day' " 89 "ORDER BY updated_at ASC LIMIT 100" 90 ) 91 92 async def live_activity_24h(): 93 return await fetchall( 94 "SELECT 'sites' AS entity, status, COUNT(*) AS count " 95 "FROM sites WHERE created_at > NOW() - INTERVAL '1 day' " 96 "GROUP BY status " 97 "UNION ALL " 98 "SELECT 'outreaches', contact_method, COUNT(*) " 99 "FROM outreaches WHERE created_at > NOW() - INTERVAL '1 day' " 100 "GROUP BY contact_method" 101 ) 102 103 async def live_hourly(): 104 return await fetchall( 105 "SELECT date_trunc('hour', created_at) AS hour, status, COUNT(*) AS count " 106 "FROM sites WHERE created_at > NOW() - INTERVAL '48 hours' " 107 "AND status NOT IN ('failing','ignore') " 108 "GROUP BY 1, 2 ORDER BY 1, 2" 109 ) 110 111 async def live_throughput(): 112 return await fetchall( 113 "SELECT DATE(created_at) AS date, COUNT(*) AS sites_added " 114 "FROM sites WHERE created_at > NOW() - INTERVAL '30 days' " 115 "GROUP BY DATE(created_at) ORDER BY date" 116 ) 117 118 async def live_failing(): 119 return await fetchall( 120 "SELECT id, domain, error_message, updated_at " 121 "FROM sites WHERE status='failing' ORDER BY updated_at DESC LIMIT 50" 122 ) 123 124 async def live_cost_forecast(): 125 """ 126 Profitability forecast: API costs + pipeline cost to process queue. 127 Refreshed every 4 days via dashboard_cache (key: cost_forecast, ttl: 345600s). 128 Falls back to live query on cache miss. 129 """ 130 # Average daily API cost (last 30 days rolling) 131 daily_avg = await fetchval( 132 "SELECT ROUND(AVG(daily_cost), 4) FROM (" 133 " SELECT DATE(created_at) AS d, SUM(estimated_cost) AS daily_cost" 134 " FROM llm_usage WHERE created_at > NOW() - INTERVAL '30 days'" 135 " GROUP BY DATE(created_at)" 136 ") sub" 137 ) 138 139 # Average cost per stage (to forecast pipeline cost for pending sites) 140 stage_costs = await fetchall( 141 "SELECT stage, AVG(estimated_cost) AS avg_cost, COUNT(*) AS sample_size " 142 "FROM llm_usage GROUP BY stage ORDER BY avg_cost DESC" 143 ) 144 145 # Sites currently in pipeline (pending processing) 146 pipeline_counts = await fetchall( 147 "SELECT status, COUNT(*) AS count FROM sites " 148 "WHERE status IN ('found','assets_captured','scored','rescored','enriched','proposals_drafted') " 149 "GROUP BY status" 150 ) 151 152 # Actuals: monthly sales + revenue 153 sales_data = await fetchone( 154 "SELECT " 155 "COUNT(CASE WHEN resulted_in_sale=1 AND created_at > NOW() - INTERVAL '30 days' THEN 1 END) AS monthly_sales," 156 "COALESCE(SUM(CASE WHEN resulted_in_sale=1 AND created_at > NOW() - INTERVAL '30 days' THEN sale_amount END), 0) AS monthly_revenue," 157 "CASE WHEN COUNT(CASE WHEN resulted_in_sale=1 THEN 1 END) > 0 " 158 " THEN SUM(sale_amount) / COUNT(CASE WHEN resulted_in_sale=1 THEN 1 END) " 159 " ELSE 297 END AS avg_deal_value " 160 "FROM outreaches WHERE status IN ('sent','delivered')" 161 ) 162 163 # Build pipeline cost forecast: {stage: {count, avg_cost}} 164 stage_cost_map = {r["stage"]: r["avg_cost"] for r in (stage_costs or [])} 165 pipeline_cost_forecast = {} 166 for row in (pipeline_counts or []): 167 stage = row["status"] 168 pipeline_cost_forecast[stage] = { 169 "count": row["count"], 170 "avg_cost": stage_cost_map.get(stage, 0), 171 } 172 173 return { 174 "daily_api_cost_avg": daily_avg or 0, 175 "monthly_sales": (sales_data or {}).get("monthly_sales") or 0, 176 "monthly_revenue": (sales_data or {}).get("monthly_revenue") or 0, 177 "avg_deal_value": (sales_data or {}).get("avg_deal_value") or 297, 178 "pipeline_cost_forecast": pipeline_cost_forecast, 179 } 180 181 funnel = cache_hits.get("pipeline_funnel") or await live_funnel() 182 errors = cache_hits.get("error_breakdown") or await live_errors() 183 hourly = cache_hits.get("chart_hourly_status_breakdown_48h") or await live_hourly() 184 throughput = cache_hits.get("chart_daily_throughput_30d") or await live_throughput() 185 cost_forecast = cache_hits.get("cost_forecast") or await live_cost_forecast() 186 totals = await live_totals() 187 stuck = await live_stuck() 188 activity = await live_activity_24h() 189 failing = await live_failing() 190 191 # Key numbers summary for top metric cards 192 totals_row = totals or {} 193 response_rate = await fetchval( 194 "SELECT ROUND(100.0 * COUNT(DISTINCT CASE WHEN c.direction='inbound' THEN c.outreach_id END) / NULLIF(COUNT(*),0), 2) " 195 "FROM outreaches o LEFT JOIN conversations c ON c.outreach_id=o.id " 196 "WHERE o.status IN ('sent','delivered')" 197 ) 198 key_numbers = { 199 "total_sites": totals_row.get("total"), 200 "active_pipeline": sum(r["count"] for r in (funnel or [])), 201 "outreach_sent": await fetchval("SELECT COUNT(*) FROM outreaches WHERE status IN ('sent','delivered')"), 202 "response_rate": response_rate or 0, 203 "sales": (cost_forecast or {}).get("monthly_sales", 0), 204 "revenue": (cost_forecast or {}).get("monthly_revenue", 0), 205 } 206 207 return JSONResponse({ 208 "key_numbers": key_numbers, 209 "pipeline_funnel": funnel, 210 "error_breakdown": errors, 211 "stuck_sites": stuck, 212 "failing_sites": failing, 213 "hourly_status_48h": hourly, 214 "daily_throughput_30d": throughput, 215 "activity_24h": activity, 216 "cost_forecast": cost_forecast, 217 }, headers=CACHE_HEADERS) 218 219 220 # ───────────────────────────────────────────────────────────────────────────── 221 # GET /api/v1/outreach 222 # ───────────────────────────────────────────────────────────────────────────── 223 224 @app.get("/api/v1/outreach") 225 async def outreach(): 226 cache_hits = await get_many([ 227 "response_rates", 228 "outreach_funnel", 229 "chart_llm_daily_costs_30d", 230 "chart_llm_cost_by_stage_30d", 231 ]) 232 233 async def live_response_rates(): 234 return await fetchall( 235 "SELECT o.contact_method AS channel, COUNT(*) AS total_sent, " 236 "COUNT(DISTINCT CASE WHEN c.direction='inbound' THEN c.outreach_id END) AS responses, " 237 "ROUND(100.0 * COUNT(DISTINCT CASE WHEN c.direction='inbound' THEN c.outreach_id END) / COUNT(*), 2) AS response_rate " 238 "FROM outreaches o LEFT JOIN conversations c ON c.outreach_id=o.id " 239 "WHERE o.status IN ('sent','delivered') GROUP BY o.contact_method ORDER BY response_rate DESC" 240 ) 241 242 async def live_outreach_funnel(): 243 return await fetchall( 244 "SELECT contact_method AS channel, COUNT(*) AS total, " 245 "SUM(CASE WHEN status='pending' THEN 1 ELSE 0 END) AS pending, " 246 "SUM(CASE WHEN status='sent' THEN 1 ELSE 0 END) AS sent, " 247 "SUM(CASE WHEN status='delivered' THEN 1 ELSE 0 END) AS delivered, " 248 "SUM(CASE WHEN status='failed' THEN 1 ELSE 0 END) AS failed, " 249 "SUM(CASE WHEN status='bounced' THEN 1 ELSE 0 END) AS bounced, " 250 "ROUND(100.0 * SUM(CASE WHEN status='delivered' THEN 1 ELSE 0 END) / COUNT(*), 2) AS delivery_rate " 251 "FROM outreaches GROUP BY contact_method ORDER BY total DESC" 252 ) 253 254 async def live_sales(): 255 return await fetchall( 256 "SELECT contact_method AS channel, " 257 "COUNT(CASE WHEN resulted_in_sale=1 THEN 1 END) AS sales_count, " 258 "COALESCE(SUM(sale_amount), 0) AS total_revenue, " 259 "ROUND(100.0 * COUNT(CASE WHEN resulted_in_sale=1 THEN 1 END) / COUNT(*), 2) AS conversion_rate " 260 "FROM outreaches WHERE status IN ('sent','delivered') GROUP BY contact_method" 261 ) 262 263 async def live_llm_by_stage(): 264 return await fetchall( 265 "SELECT stage, SUM(prompt_tokens) AS prompt_tokens, " 266 "SUM(completion_tokens) AS completion_tokens, SUM(total_tokens) AS total_tokens, " 267 "SUM(estimated_cost) AS total_cost, COUNT(*) AS request_count, " 268 "AVG(estimated_cost) AS avg_cost_per_request " 269 "FROM llm_usage GROUP BY stage ORDER BY total_cost DESC" 270 ) 271 272 async def live_llm_daily(): 273 return await fetchall( 274 "SELECT DATE(created_at) AS date, SUM(estimated_cost) AS daily_cost, " 275 "SUM(total_tokens) AS daily_tokens, COUNT(*) AS request_count " 276 "FROM llm_usage WHERE created_at > NOW() - INTERVAL '30 days' " 277 "GROUP BY DATE(created_at) ORDER BY date" 278 ) 279 280 async def live_llm_by_provider(): 281 return await fetchall( 282 "SELECT provider, model, SUM(total_tokens) AS total_tokens, " 283 "SUM(estimated_cost) AS total_cost, COUNT(*) AS request_count " 284 "FROM llm_usage GROUP BY provider, model ORDER BY total_cost DESC" 285 ) 286 287 return JSONResponse({ 288 "response_rates": cache_hits.get("response_rates") or await live_response_rates(), 289 "outreach_funnel": cache_hits.get("outreach_funnel") or await live_outreach_funnel(), 290 "sales": await live_sales(), 291 "llm_by_stage": await live_llm_by_stage(), 292 "llm_daily_costs": cache_hits.get("chart_llm_daily_costs_30d") or await live_llm_daily(), 293 "llm_by_provider": await live_llm_by_provider(), 294 }, headers=CACHE_HEADERS) 295 296 297 # ───────────────────────────────────────────────────────────────────────────── 298 # GET /api/v1/conversations (always live — changes too fast) 299 # ───────────────────────────────────────────────────────────────────────────── 300 301 @app.get("/api/v1/conversations") 302 async def conversations(): 303 stats = await fetchone( 304 "SELECT COUNT(*) AS total_conversations, " 305 "SUM(CASE WHEN direction='inbound' THEN 1 ELSE 0 END) AS inbound_count, " 306 "SUM(CASE WHEN read_at IS NULL AND direction='inbound' THEN 1 ELSE 0 END) AS unread_count " 307 "FROM conversations" 308 ) 309 sentiment = await fetchall( 310 "SELECT sentiment, COUNT(*) AS count FROM conversations " 311 "WHERE direction='inbound' AND sentiment IS NOT NULL " 312 "GROUP BY sentiment ORDER BY count DESC" 313 ) 314 threads = await fetchall( 315 """ 316 WITH recent AS ( 317 SELECT DISTINCT outreach_id FROM conversations 318 WHERE direction='inbound' ORDER BY received_at DESC LIMIT 50 319 ) 320 SELECT c.id, c.outreach_id, c.direction, c.channel, c.sender_identifier, 321 c.message_body, c.subject_line, c.sentiment, c.received_at, c.read_at, 322 c.replied_at, COALESCE(c.autoresponder_enabled, 1) AS autoresponder_enabled, 323 o.contact_uri, o.proposal_text, s.domain, s.id AS site_id 324 FROM conversations c 325 LEFT JOIN outreaches o ON o.id = c.outreach_id 326 LEFT JOIN sites s ON s.id = o.site_id 327 WHERE c.outreach_id IN (SELECT outreach_id FROM recent) 328 ORDER BY c.outreach_id DESC, c.received_at ASC 329 """ 330 ) 331 return JSONResponse({"stats": stats, "sentiment": sentiment, "threads": threads}) 332 333 334 # ───────────────────────────────────────────────────────────────────────────── 335 # GET /api/v1/operations (Cron Jobs + System Health tabs) 336 # ───────────────────────────────────────────────────────────────────────────── 337 338 @app.get("/api/v1/operations") 339 async def operations(): 340 cache_hits = await get_many([ 341 "cron_summary", 342 "chart_cron_timeline_24h", 343 "chart_cron_daily_history_7d", 344 "chart_http_errors_30d", 345 "database_health", 346 ]) 347 348 async def live_cron_jobs(): 349 return await fetchall( 350 "SELECT name, description, interval_value, interval_unit, enabled, " 351 "last_run_at, next_run_at FROM cron_jobs ORDER BY name" 352 ) 353 354 async def live_cron_summary(): 355 return await fetchall( 356 "SELECT l.job_name, COUNT(*) AS total_runs, " 357 "SUM(CASE WHEN l.status='success' THEN 1 ELSE 0 END) AS successful_runs, " 358 "SUM(CASE WHEN l.status='failed' THEN 1 ELSE 0 END) AS failed_runs, " 359 "ROUND(100.0 * SUM(CASE WHEN l.status='success' THEN 1 ELSE 0 END) / COUNT(*), 1) AS success_rate, " 360 "MAX(l.started_at) AS last_run, " 361 "AVG(EXTRACT(EPOCH FROM (l.finished_at - l.started_at)) / 60) AS avg_duration_minutes " 362 "FROM cron_job_logs l WHERE l.started_at > NOW() - INTERVAL '7 days' " 363 "GROUP BY l.job_name ORDER BY last_run DESC" 364 ) 365 366 async def live_cron_recent_logs(): 367 return await fetchall( 368 "SELECT id, job_name, started_at, finished_at, status, summary, " 369 "items_processed, items_failed, error_message " 370 "FROM cron_job_logs ORDER BY started_at DESC LIMIT 200" 371 ) 372 373 async def live_http_errors(): 374 return await fetchall( 375 "SELECT DATE(updated_at) AS date, http_status_code, status AS stage, COUNT(*) AS count " 376 "FROM sites WHERE updated_at > NOW() - INTERVAL '30 days' " 377 "AND http_status_code IS NOT NULL AND http_status_code != 200 " 378 "AND status IN ('assets_captured','enriched') " 379 "GROUP BY DATE(updated_at), http_status_code, status ORDER BY date, http_status_code" 380 ) 381 382 async def live_rate_limits(): 383 return await fetchall( 384 "SELECT status AS stage, error_message, COUNT(*) AS count, MAX(updated_at) AS last_occurrence " 385 "FROM sites WHERE error_message LIKE '%rate%limit%' OR error_message LIKE '%429%' " 386 "OR error_message LIKE '%quota%' GROUP BY status, error_message " 387 "ORDER BY last_occurrence DESC LIMIT 20" 388 ) 389 390 async def live_db_health(): 391 counts = await fetchall( 392 "SELECT 'sites' AS tbl, COUNT(*) AS cnt FROM sites " 393 "UNION ALL SELECT 'outreaches', COUNT(*) FROM outreaches " 394 "UNION ALL SELECT 'conversations', COUNT(*) FROM conversations " 395 "UNION ALL SELECT 'keywords', COUNT(*) FROM keywords" 396 ) 397 return {"table_counts": counts} 398 399 return JSONResponse({ 400 "cron_jobs": await live_cron_jobs(), 401 "cron_summary": cache_hits.get("cron_summary") or await live_cron_summary(), 402 "cron_recent_logs": await live_cron_recent_logs(), 403 "cron_timeline": cache_hits.get("chart_cron_timeline_24h") or [], 404 "cron_daily_history": cache_hits.get("chart_cron_daily_history_7d") or [], 405 "http_errors": cache_hits.get("chart_http_errors_30d") or await live_http_errors(), 406 "rate_limits": await live_rate_limits(), 407 "db_health": cache_hits.get("database_health") or await live_db_health(), 408 }, headers=CACHE_HEADERS) 409 410 411 # ───────────────────────────────────────────────────────────────────────────── 412 # GET /api/v1/quality (Agent System + Code Coverage tabs) 413 # ───────────────────────────────────────────────────────────────────────────── 414 415 @app.get("/api/v1/quality") 416 async def quality(): 417 async def live_agent_state(): 418 return await fetchall( 419 "SELECT agent_name, status, current_task_id, last_active, metrics_json " 420 "FROM agent_state ORDER BY agent_name" 421 ) 422 423 async def live_agent_tasks(): 424 return await fetchall( 425 "SELECT assigned_to, status, COUNT(*) AS count " 426 "FROM agent_tasks WHERE created_at > NOW() - INTERVAL '7 days' " 427 "GROUP BY assigned_to, status ORDER BY assigned_to, status" 428 ) 429 430 async def live_agent_logs(): 431 return await fetchall( 432 "SELECT created_at, agent_name, log_level, message, task_id " 433 "FROM agent_logs ORDER BY created_at DESC LIMIT 100" 434 ) 435 436 async def live_agent_performance(): 437 return await fetchall( 438 "SELECT assigned_to AS agent, COUNT(*) AS total, " 439 "SUM(CASE WHEN status='completed' THEN 1 ELSE 0 END) AS completed, " 440 "SUM(CASE WHEN status='failed' THEN 1 ELSE 0 END) AS failed, " 441 "SUM(CASE WHEN status='blocked' THEN 1 ELSE 0 END) AS blocked, " 442 "ROUND(100.0 * SUM(CASE WHEN status='completed' THEN 1 ELSE 0 END) / COUNT(*), 1) AS success_rate " 443 "FROM agent_tasks WHERE created_at > NOW() - INTERVAL '7 days' " 444 "GROUP BY assigned_to ORDER BY total DESC" 445 ) 446 447 async def live_agent_cost(): 448 return await fetchall( 449 "SELECT DATE(created_at) AS date, " 450 "EXTRACT(HOUR FROM created_at) AS hour, " 451 "COUNT(*) AS invocations, SUM(estimated_cost) AS cost " 452 "FROM agent_llm_usage WHERE created_at > NOW() - INTERVAL '1 day' " 453 "GROUP BY 1, 2 ORDER BY 1, 2" 454 ) 455 456 # Coverage data comes from the coverage JSON file on disk 457 coverage_path = _PROJECT_ROOT / "coverage" / "coverage-summary.json" 458 coverage_data = {} 459 if coverage_path.exists(): 460 import json as _json 461 with open(coverage_path) as f: 462 coverage_data = _json.load(f) 463 464 return JSONResponse({ 465 "agent_state": await live_agent_state(), 466 "agent_tasks": await live_agent_tasks(), 467 "agent_logs": await live_agent_logs(), 468 "agent_performance": await live_agent_performance(), 469 "agent_cost_24h": await live_agent_cost(), 470 "coverage": coverage_data, 471 }, headers=CACHE_HEADERS) 472 473 474 # ───────────────────────────────────────────────────────────────────────────── 475 # GET /api/v1/compliance (Legal tab + Prompt Learning tab) 476 # ───────────────────────────────────────────────────────────────────────────── 477 478 @app.get("/api/v1/compliance") 479 async def compliance(): 480 async def live_optouts(): 481 return await fetchone( 482 "SELECT " 483 "COUNT(DISTINCT email) AS total_email_optouts, " 484 "COUNT(DISTINCT phone) AS total_sms_optouts " 485 "FROM (" 486 "SELECT email, NULL AS phone FROM unsubscribed_emails " 487 "UNION ALL SELECT email, phone FROM opt_outs" 488 ") AS combined" 489 ) 490 491 async def live_platform_health(): 492 return await fetchall( 493 "SELECT contact_method, COUNT(*) AS total_sent, " 494 "SUM(CASE WHEN status='bounced' THEN 1 ELSE 0 END) AS bounced, " 495 "SUM(CASE WHEN status='delivered' THEN 1 ELSE 0 END) AS delivered, " 496 "ROUND(100.0 * SUM(CASE WHEN status='bounced' THEN 1 ELSE 0 END) / COUNT(*), 2) AS bounce_rate, " 497 "ROUND(100.0 * SUM(CASE WHEN status='delivered' THEN 1 ELSE 0 END) / COUNT(*), 2) AS delivery_rate " 498 "FROM outreaches WHERE status IN ('sent','delivered','bounced','failed') " 499 "GROUP BY contact_method" 500 ) 501 502 async def live_approval_stats(days: int = 30): 503 return await fetchone( 504 "SELECT COUNT(*) AS total, " 505 "SUM(CASE WHEN feedback_type='approved' THEN 1 ELSE 0 END) AS approved, " 506 "SUM(CASE WHEN feedback_type='rework' THEN 1 ELSE 0 END) AS rework, " 507 "SUM(CASE WHEN feedback_type='rejected' THEN 1 ELSE 0 END) AS rejected, " 508 "ROUND(100.0 * SUM(CASE WHEN feedback_type='approved' THEN 1 ELSE 0 END) / NULLIF(COUNT(*),0), 1) AS approval_rate " 509 "FROM outreach_feedback WHERE created_at > NOW() - INTERVAL '30 days'" 510 ) 511 512 async def live_feedback_categories(days: int = 30): 513 return await fetchall( 514 "SELECT feedback_category, COUNT(*) AS count " 515 "FROM outreach_feedback WHERE created_at > NOW() - INTERVAL '30 days' " 516 "GROUP BY feedback_category ORDER BY count DESC LIMIT 15" 517 ) 518 519 async def live_approval_trend(days: int = 30): 520 return await fetchall( 521 "SELECT DATE(created_at) AS date, " 522 "ROUND(100.0 * SUM(CASE WHEN feedback_type='approved' THEN 1 ELSE 0 END) / COUNT(*), 1) AS approval_rate " 523 "FROM outreach_feedback WHERE created_at > NOW() - INTERVAL '30 days' " 524 "GROUP BY DATE(created_at) ORDER BY date" 525 ) 526 527 async def live_recent_feedback(): 528 return await fetchall( 529 "SELECT f.id, f.feedback_type, f.feedback_category, f.feedback_text, " 530 "f.prompt_file, f.created_at, s.domain " 531 "FROM outreach_feedback f LEFT JOIN outreaches o ON o.id=f.outreach_id " 532 "LEFT JOIN sites s ON s.id=o.site_id " 533 "ORDER BY f.created_at DESC LIMIT 50" 534 ) 535 536 return JSONResponse({ 537 "optouts": await live_optouts(), 538 "platform_health": await live_platform_health(), 539 "approval_stats": await live_approval_stats(), 540 "feedback_categories": await live_feedback_categories(), 541 "approval_trend": await live_approval_trend(), 542 "recent_feedback": await live_recent_feedback(), 543 }, headers=CACHE_HEADERS) 544 545 546 # ───────────────────────────────────────────────────────────────────────────── 547 # GET /api/v1/review (Human Review — always live) 548 # ───────────────────────────────────────────────────────────────────────────── 549 550 @app.get("/api/v1/review") 551 async def review(): 552 queue_stats = await fetchone( 553 "SELECT COUNT(*) AS total, " 554 "SUM(CASE WHEN status='pending' THEN 1 ELSE 0 END) AS pending, " 555 "SUM(CASE WHEN priority='critical' THEN 1 ELSE 0 END) AS critical, " 556 "SUM(CASE WHEN priority='high' THEN 1 ELSE 0 END) AS high, " 557 "SUM(CASE WHEN status='reviewed' THEN 1 ELSE 0 END) AS reviewed " 558 "FROM human_review_queue" 559 ) 560 pending_reviews = await fetchall( 561 "SELECT id, review_type, title, priority, status, context_json, notes, " 562 "created_at, reviewed_at, reviewed_by " 563 "FROM human_review_queue WHERE status='pending' " 564 "ORDER BY CASE priority WHEN 'critical' THEN 1 WHEN 'high' THEN 2 ELSE 3 END, created_at ASC " 565 "LIMIT 100" 566 ) 567 pending_outreaches = await fetchone( 568 "SELECT " 569 "SUM(CASE WHEN contact_method='form' THEN 1 ELSE 0 END) AS form, " 570 "SUM(CASE WHEN contact_method='x' THEN 1 ELSE 0 END) AS x, " 571 "SUM(CASE WHEN contact_method='linkedin' THEN 1 ELSE 0 END) AS linkedin, " 572 "COUNT(*) AS total " 573 "FROM outreaches WHERE status='approved'" 574 ) 575 failing_sites = await fetchall( 576 "SELECT id, domain, error_message, updated_at " 577 "FROM sites WHERE status='failing' ORDER BY updated_at DESC LIMIT 50" 578 ) 579 unread_count = await fetchval( 580 "SELECT COUNT(*) FROM conversations WHERE read_at IS NULL AND direction='inbound'" 581 ) 582 return JSONResponse({ 583 "queue_stats": queue_stats, 584 "pending_reviews": pending_reviews, 585 "pending_outreaches": pending_outreaches, 586 "failing_sites": failing_sites, 587 "unread_conversations": unread_count, 588 }) 589 590 591 # ───────────────────────────────────────────────────────────────────────────── 592 # POST /api/v1/cache/refresh — trigger precompute-dashboard.js 593 # ───────────────────────────────────────────────────────────────────────────── 594 595 @app.post("/api/v1/cache/refresh") 596 async def cache_refresh(): 597 script = str(_PROJECT_ROOT / "src" / "cron" / "precompute-dashboard.js") 598 result = subprocess.run( 599 ["node", script], 600 capture_output=True, 601 text=True, 602 cwd=str(_PROJECT_ROOT), 603 timeout=120, 604 ) 605 if result.returncode != 0: 606 raise HTTPException(status_code=500, detail=result.stderr[:500]) 607 return {"ok": True, "message": "Cache refreshed"} 608 609 610 # ───────────────────────────────────────────────────────────────────────────── 611 # Health check 612 # ───────────────────────────────────────────────────────────────────────────── 613 614 @app.get("/health") 615 async def health(): 616 return {"status": "ok"}