/ dashboard-v2 / backend / main.py
main.py
  1  """
  2  333 Method Analytics Dashboard — FastAPI Backend
  3  
  4  7 endpoints (one per page) + cache refresh.
  5  Cache-first: reads dashboard_cache table, falls back to live PostgreSQL queries.
  6  """
  7  
  8  import json
  9  import os
 10  import subprocess
 11  from pathlib import Path
 12  
 13  from fastapi import FastAPI, HTTPException
 14  from fastapi.middleware.cors import CORSMiddleware
 15  from fastapi.responses import JSONResponse
 16  
 17  from cache import get_cached, get_many
 18  from db import fetchall, fetchone, fetchval
 19  
 20  app = FastAPI(title="333 Method Dashboard API", version="2.0.0")
 21  
 22  app.add_middleware(
 23      CORSMiddleware,
 24      allow_origins=["http://localhost:5173", "https://dashboard.molecool.org"],
 25      allow_methods=["GET", "POST"],
 26      allow_headers=["*"],
 27  )
 28  
 29  _PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
 30  CACHE_HEADERS = {"Cache-Control": "max-age=300"}
 31  
 32  
 33  # ─────────────────────────────────────────────────────────────────────────────
 34  # Helper: return cached value or run live query
 35  # ─────────────────────────────────────────────────────────────────────────────
 36  
 37  async def cached_or(key: str, live_fn):
 38      hit = await get_cached(key)
 39      return hit if hit is not None else await live_fn()
 40  
 41  
 42  # ─────────────────────────────────────────────────────────────────────────────
 43  # GET /api/v1/overview  (merged Overview + Pipeline)
 44  # ─────────────────────────────────────────────────────────────────────────────
 45  
 46  @app.get("/api/v1/overview")
 47  async def overview():
 48      cache_hits = await get_many([
 49          "pipeline_funnel",
 50          "error_breakdown",
 51          "chart_hourly_status_breakdown_48h",
 52          "chart_daily_throughput_30d",
 53          "cost_forecast",
 54      ])
 55  
 56      async def live_funnel():
 57          return await fetchall(
 58              "SELECT status, COUNT(*) AS count FROM sites "
 59              "WHERE status NOT IN ('ignore','failing') "
 60              "GROUP BY status ORDER BY CASE status "
 61              "WHEN 'found' THEN 1 WHEN 'assets_captured' THEN 2 "
 62              "WHEN 'scored' THEN 3 WHEN 'rescored' THEN 4 "
 63              "WHEN 'enriched' THEN 5 WHEN 'proposals_drafted' THEN 6 "
 64              "WHEN 'outreach_sent' THEN 7 ELSE 8 END"
 65          )
 66  
 67      async def live_totals():
 68          return await fetchone(
 69              "SELECT "
 70              "COUNT(*) AS total, "
 71              "SUM(CASE WHEN status='ignore' THEN 1 ELSE 0 END) AS ignored, "
 72              "SUM(CASE WHEN status='failing' THEN 1 ELSE 0 END) AS failing "
 73              "FROM sites"
 74          )
 75  
 76      async def live_errors():
 77          return await fetchall(
 78              "SELECT error_message, status AS stage, COUNT(*) AS count "
 79              "FROM sites WHERE error_message IS NOT NULL AND status != 'ignore' "
 80              "GROUP BY error_message, status ORDER BY count DESC LIMIT 20"
 81          )
 82  
 83      async def live_stuck():
 84          return await fetchall(
 85              "SELECT id, domain, status AS stage, error_message, "
 86              "CAST((NOW() - updated_at) AS TEXT) AS age "
 87              "FROM sites WHERE error_message IS NOT NULL "
 88              "AND updated_at < NOW() - INTERVAL '1 day' "
 89              "ORDER BY updated_at ASC LIMIT 100"
 90          )
 91  
 92      async def live_activity_24h():
 93          return await fetchall(
 94              "SELECT 'sites' AS entity, status, COUNT(*) AS count "
 95              "FROM sites WHERE created_at > NOW() - INTERVAL '1 day' "
 96              "GROUP BY status "
 97              "UNION ALL "
 98              "SELECT 'outreaches', contact_method, COUNT(*) "
 99              "FROM outreaches WHERE created_at > NOW() - INTERVAL '1 day' "
100              "GROUP BY contact_method"
101          )
102  
103      async def live_hourly():
104          return await fetchall(
105              "SELECT date_trunc('hour', created_at) AS hour, status, COUNT(*) AS count "
106              "FROM sites WHERE created_at > NOW() - INTERVAL '48 hours' "
107              "AND status NOT IN ('failing','ignore') "
108              "GROUP BY 1, 2 ORDER BY 1, 2"
109          )
110  
111      async def live_throughput():
112          return await fetchall(
113              "SELECT DATE(created_at) AS date, COUNT(*) AS sites_added "
114              "FROM sites WHERE created_at > NOW() - INTERVAL '30 days' "
115              "GROUP BY DATE(created_at) ORDER BY date"
116          )
117  
118      async def live_failing():
119          return await fetchall(
120              "SELECT id, domain, error_message, updated_at "
121              "FROM sites WHERE status='failing' ORDER BY updated_at DESC LIMIT 50"
122          )
123  
124      async def live_cost_forecast():
125          """
126          Profitability forecast: API costs + pipeline cost to process queue.
127          Refreshed every 4 days via dashboard_cache (key: cost_forecast, ttl: 345600s).
128          Falls back to live query on cache miss.
129          """
130          # Average daily API cost (last 30 days rolling)
131          daily_avg = await fetchval(
132              "SELECT ROUND(AVG(daily_cost), 4) FROM ("
133              "  SELECT DATE(created_at) AS d, SUM(estimated_cost) AS daily_cost"
134              "  FROM llm_usage WHERE created_at > NOW() - INTERVAL '30 days'"
135              "  GROUP BY DATE(created_at)"
136              ") sub"
137          )
138  
139          # Average cost per stage (to forecast pipeline cost for pending sites)
140          stage_costs = await fetchall(
141              "SELECT stage, AVG(estimated_cost) AS avg_cost, COUNT(*) AS sample_size "
142              "FROM llm_usage GROUP BY stage ORDER BY avg_cost DESC"
143          )
144  
145          # Sites currently in pipeline (pending processing)
146          pipeline_counts = await fetchall(
147              "SELECT status, COUNT(*) AS count FROM sites "
148              "WHERE status IN ('found','assets_captured','scored','rescored','enriched','proposals_drafted') "
149              "GROUP BY status"
150          )
151  
152          # Actuals: monthly sales + revenue
153          sales_data = await fetchone(
154              "SELECT "
155              "COUNT(CASE WHEN resulted_in_sale=1 AND created_at > NOW() - INTERVAL '30 days' THEN 1 END) AS monthly_sales,"
156              "COALESCE(SUM(CASE WHEN resulted_in_sale=1 AND created_at > NOW() - INTERVAL '30 days' THEN sale_amount END), 0) AS monthly_revenue,"
157              "CASE WHEN COUNT(CASE WHEN resulted_in_sale=1 THEN 1 END) > 0 "
158              "  THEN SUM(sale_amount) / COUNT(CASE WHEN resulted_in_sale=1 THEN 1 END) "
159              "  ELSE 297 END AS avg_deal_value "
160              "FROM outreaches WHERE status IN ('sent','delivered')"
161          )
162  
163          # Build pipeline cost forecast: {stage: {count, avg_cost}}
164          stage_cost_map = {r["stage"]: r["avg_cost"] for r in (stage_costs or [])}
165          pipeline_cost_forecast = {}
166          for row in (pipeline_counts or []):
167              stage = row["status"]
168              pipeline_cost_forecast[stage] = {
169                  "count": row["count"],
170                  "avg_cost": stage_cost_map.get(stage, 0),
171              }
172  
173          return {
174              "daily_api_cost_avg": daily_avg or 0,
175              "monthly_sales": (sales_data or {}).get("monthly_sales") or 0,
176              "monthly_revenue": (sales_data or {}).get("monthly_revenue") or 0,
177              "avg_deal_value": (sales_data or {}).get("avg_deal_value") or 297,
178              "pipeline_cost_forecast": pipeline_cost_forecast,
179          }
180  
181      funnel = cache_hits.get("pipeline_funnel") or await live_funnel()
182      errors = cache_hits.get("error_breakdown") or await live_errors()
183      hourly = cache_hits.get("chart_hourly_status_breakdown_48h") or await live_hourly()
184      throughput = cache_hits.get("chart_daily_throughput_30d") or await live_throughput()
185      cost_forecast = cache_hits.get("cost_forecast") or await live_cost_forecast()
186      totals = await live_totals()
187      stuck = await live_stuck()
188      activity = await live_activity_24h()
189      failing = await live_failing()
190  
191      # Key numbers summary for top metric cards
192      totals_row = totals or {}
193      response_rate = await fetchval(
194          "SELECT ROUND(100.0 * COUNT(DISTINCT CASE WHEN c.direction='inbound' THEN c.outreach_id END) / NULLIF(COUNT(*),0), 2) "
195          "FROM outreaches o LEFT JOIN conversations c ON c.outreach_id=o.id "
196          "WHERE o.status IN ('sent','delivered')"
197      )
198      key_numbers = {
199          "total_sites": totals_row.get("total"),
200          "active_pipeline": sum(r["count"] for r in (funnel or [])),
201          "outreach_sent": await fetchval("SELECT COUNT(*) FROM outreaches WHERE status IN ('sent','delivered')"),
202          "response_rate": response_rate or 0,
203          "sales": (cost_forecast or {}).get("monthly_sales", 0),
204          "revenue": (cost_forecast or {}).get("monthly_revenue", 0),
205      }
206  
207      return JSONResponse({
208          "key_numbers": key_numbers,
209          "pipeline_funnel": funnel,
210          "error_breakdown": errors,
211          "stuck_sites": stuck,
212          "failing_sites": failing,
213          "hourly_status_48h": hourly,
214          "daily_throughput_30d": throughput,
215          "activity_24h": activity,
216          "cost_forecast": cost_forecast,
217      }, headers=CACHE_HEADERS)
218  
219  
220  # ─────────────────────────────────────────────────────────────────────────────
221  # GET /api/v1/outreach
222  # ─────────────────────────────────────────────────────────────────────────────
223  
224  @app.get("/api/v1/outreach")
225  async def outreach():
226      cache_hits = await get_many([
227          "response_rates",
228          "outreach_funnel",
229          "chart_llm_daily_costs_30d",
230          "chart_llm_cost_by_stage_30d",
231      ])
232  
233      async def live_response_rates():
234          return await fetchall(
235              "SELECT o.contact_method AS channel, COUNT(*) AS total_sent, "
236              "COUNT(DISTINCT CASE WHEN c.direction='inbound' THEN c.outreach_id END) AS responses, "
237              "ROUND(100.0 * COUNT(DISTINCT CASE WHEN c.direction='inbound' THEN c.outreach_id END) / COUNT(*), 2) AS response_rate "
238              "FROM outreaches o LEFT JOIN conversations c ON c.outreach_id=o.id "
239              "WHERE o.status IN ('sent','delivered') GROUP BY o.contact_method ORDER BY response_rate DESC"
240          )
241  
242      async def live_outreach_funnel():
243          return await fetchall(
244              "SELECT contact_method AS channel, COUNT(*) AS total, "
245              "SUM(CASE WHEN status='pending' THEN 1 ELSE 0 END) AS pending, "
246              "SUM(CASE WHEN status='sent' THEN 1 ELSE 0 END) AS sent, "
247              "SUM(CASE WHEN status='delivered' THEN 1 ELSE 0 END) AS delivered, "
248              "SUM(CASE WHEN status='failed' THEN 1 ELSE 0 END) AS failed, "
249              "SUM(CASE WHEN status='bounced' THEN 1 ELSE 0 END) AS bounced, "
250              "ROUND(100.0 * SUM(CASE WHEN status='delivered' THEN 1 ELSE 0 END) / COUNT(*), 2) AS delivery_rate "
251              "FROM outreaches GROUP BY contact_method ORDER BY total DESC"
252          )
253  
254      async def live_sales():
255          return await fetchall(
256              "SELECT contact_method AS channel, "
257              "COUNT(CASE WHEN resulted_in_sale=1 THEN 1 END) AS sales_count, "
258              "COALESCE(SUM(sale_amount), 0) AS total_revenue, "
259              "ROUND(100.0 * COUNT(CASE WHEN resulted_in_sale=1 THEN 1 END) / COUNT(*), 2) AS conversion_rate "
260              "FROM outreaches WHERE status IN ('sent','delivered') GROUP BY contact_method"
261          )
262  
263      async def live_llm_by_stage():
264          return await fetchall(
265              "SELECT stage, SUM(prompt_tokens) AS prompt_tokens, "
266              "SUM(completion_tokens) AS completion_tokens, SUM(total_tokens) AS total_tokens, "
267              "SUM(estimated_cost) AS total_cost, COUNT(*) AS request_count, "
268              "AVG(estimated_cost) AS avg_cost_per_request "
269              "FROM llm_usage GROUP BY stage ORDER BY total_cost DESC"
270          )
271  
272      async def live_llm_daily():
273          return await fetchall(
274              "SELECT DATE(created_at) AS date, SUM(estimated_cost) AS daily_cost, "
275              "SUM(total_tokens) AS daily_tokens, COUNT(*) AS request_count "
276              "FROM llm_usage WHERE created_at > NOW() - INTERVAL '30 days' "
277              "GROUP BY DATE(created_at) ORDER BY date"
278          )
279  
280      async def live_llm_by_provider():
281          return await fetchall(
282              "SELECT provider, model, SUM(total_tokens) AS total_tokens, "
283              "SUM(estimated_cost) AS total_cost, COUNT(*) AS request_count "
284              "FROM llm_usage GROUP BY provider, model ORDER BY total_cost DESC"
285          )
286  
287      return JSONResponse({
288          "response_rates": cache_hits.get("response_rates") or await live_response_rates(),
289          "outreach_funnel": cache_hits.get("outreach_funnel") or await live_outreach_funnel(),
290          "sales": await live_sales(),
291          "llm_by_stage": await live_llm_by_stage(),
292          "llm_daily_costs": cache_hits.get("chart_llm_daily_costs_30d") or await live_llm_daily(),
293          "llm_by_provider": await live_llm_by_provider(),
294      }, headers=CACHE_HEADERS)
295  
296  
297  # ─────────────────────────────────────────────────────────────────────────────
298  # GET /api/v1/conversations  (always live — changes too fast)
299  # ─────────────────────────────────────────────────────────────────────────────
300  
301  @app.get("/api/v1/conversations")
302  async def conversations():
303      stats = await fetchone(
304          "SELECT COUNT(*) AS total_conversations, "
305          "SUM(CASE WHEN direction='inbound' THEN 1 ELSE 0 END) AS inbound_count, "
306          "SUM(CASE WHEN read_at IS NULL AND direction='inbound' THEN 1 ELSE 0 END) AS unread_count "
307          "FROM conversations"
308      )
309      sentiment = await fetchall(
310          "SELECT sentiment, COUNT(*) AS count FROM conversations "
311          "WHERE direction='inbound' AND sentiment IS NOT NULL "
312          "GROUP BY sentiment ORDER BY count DESC"
313      )
314      threads = await fetchall(
315          """
316          WITH recent AS (
317              SELECT DISTINCT outreach_id FROM conversations
318              WHERE direction='inbound' ORDER BY received_at DESC LIMIT 50
319          )
320          SELECT c.id, c.outreach_id, c.direction, c.channel, c.sender_identifier,
321                 c.message_body, c.subject_line, c.sentiment, c.received_at, c.read_at,
322                 c.replied_at, COALESCE(c.autoresponder_enabled, 1) AS autoresponder_enabled,
323                 o.contact_uri, o.proposal_text, s.domain, s.id AS site_id
324          FROM conversations c
325          LEFT JOIN outreaches o ON o.id = c.outreach_id
326          LEFT JOIN sites s ON s.id = o.site_id
327          WHERE c.outreach_id IN (SELECT outreach_id FROM recent)
328          ORDER BY c.outreach_id DESC, c.received_at ASC
329          """
330      )
331      return JSONResponse({"stats": stats, "sentiment": sentiment, "threads": threads})
332  
333  
334  # ─────────────────────────────────────────────────────────────────────────────
335  # GET /api/v1/operations  (Cron Jobs + System Health tabs)
336  # ─────────────────────────────────────────────────────────────────────────────
337  
338  @app.get("/api/v1/operations")
339  async def operations():
340      cache_hits = await get_many([
341          "cron_summary",
342          "chart_cron_timeline_24h",
343          "chart_cron_daily_history_7d",
344          "chart_http_errors_30d",
345          "database_health",
346      ])
347  
348      async def live_cron_jobs():
349          return await fetchall(
350              "SELECT name, description, interval_value, interval_unit, enabled, "
351              "last_run_at, next_run_at FROM cron_jobs ORDER BY name"
352          )
353  
354      async def live_cron_summary():
355          return await fetchall(
356              "SELECT l.job_name, COUNT(*) AS total_runs, "
357              "SUM(CASE WHEN l.status='success' THEN 1 ELSE 0 END) AS successful_runs, "
358              "SUM(CASE WHEN l.status='failed' THEN 1 ELSE 0 END) AS failed_runs, "
359              "ROUND(100.0 * SUM(CASE WHEN l.status='success' THEN 1 ELSE 0 END) / COUNT(*), 1) AS success_rate, "
360              "MAX(l.started_at) AS last_run, "
361              "AVG(EXTRACT(EPOCH FROM (l.finished_at - l.started_at)) / 60) AS avg_duration_minutes "
362              "FROM cron_job_logs l WHERE l.started_at > NOW() - INTERVAL '7 days' "
363              "GROUP BY l.job_name ORDER BY last_run DESC"
364          )
365  
366      async def live_cron_recent_logs():
367          return await fetchall(
368              "SELECT id, job_name, started_at, finished_at, status, summary, "
369              "items_processed, items_failed, error_message "
370              "FROM cron_job_logs ORDER BY started_at DESC LIMIT 200"
371          )
372  
373      async def live_http_errors():
374          return await fetchall(
375              "SELECT DATE(updated_at) AS date, http_status_code, status AS stage, COUNT(*) AS count "
376              "FROM sites WHERE updated_at > NOW() - INTERVAL '30 days' "
377              "AND http_status_code IS NOT NULL AND http_status_code != 200 "
378              "AND status IN ('assets_captured','enriched') "
379              "GROUP BY DATE(updated_at), http_status_code, status ORDER BY date, http_status_code"
380          )
381  
382      async def live_rate_limits():
383          return await fetchall(
384              "SELECT status AS stage, error_message, COUNT(*) AS count, MAX(updated_at) AS last_occurrence "
385              "FROM sites WHERE error_message LIKE '%rate%limit%' OR error_message LIKE '%429%' "
386              "OR error_message LIKE '%quota%' GROUP BY status, error_message "
387              "ORDER BY last_occurrence DESC LIMIT 20"
388          )
389  
390      async def live_db_health():
391          counts = await fetchall(
392              "SELECT 'sites' AS tbl, COUNT(*) AS cnt FROM sites "
393              "UNION ALL SELECT 'outreaches', COUNT(*) FROM outreaches "
394              "UNION ALL SELECT 'conversations', COUNT(*) FROM conversations "
395              "UNION ALL SELECT 'keywords', COUNT(*) FROM keywords"
396          )
397          return {"table_counts": counts}
398  
399      return JSONResponse({
400          "cron_jobs": await live_cron_jobs(),
401          "cron_summary": cache_hits.get("cron_summary") or await live_cron_summary(),
402          "cron_recent_logs": await live_cron_recent_logs(),
403          "cron_timeline": cache_hits.get("chart_cron_timeline_24h") or [],
404          "cron_daily_history": cache_hits.get("chart_cron_daily_history_7d") or [],
405          "http_errors": cache_hits.get("chart_http_errors_30d") or await live_http_errors(),
406          "rate_limits": await live_rate_limits(),
407          "db_health": cache_hits.get("database_health") or await live_db_health(),
408      }, headers=CACHE_HEADERS)
409  
410  
411  # ─────────────────────────────────────────────────────────────────────────────
412  # GET /api/v1/quality  (Agent System + Code Coverage tabs)
413  # ─────────────────────────────────────────────────────────────────────────────
414  
415  @app.get("/api/v1/quality")
416  async def quality():
417      async def live_agent_state():
418          return await fetchall(
419              "SELECT agent_name, status, current_task_id, last_active, metrics_json "
420              "FROM agent_state ORDER BY agent_name"
421          )
422  
423      async def live_agent_tasks():
424          return await fetchall(
425              "SELECT assigned_to, status, COUNT(*) AS count "
426              "FROM agent_tasks WHERE created_at > NOW() - INTERVAL '7 days' "
427              "GROUP BY assigned_to, status ORDER BY assigned_to, status"
428          )
429  
430      async def live_agent_logs():
431          return await fetchall(
432              "SELECT created_at, agent_name, log_level, message, task_id "
433              "FROM agent_logs ORDER BY created_at DESC LIMIT 100"
434          )
435  
436      async def live_agent_performance():
437          return await fetchall(
438              "SELECT assigned_to AS agent, COUNT(*) AS total, "
439              "SUM(CASE WHEN status='completed' THEN 1 ELSE 0 END) AS completed, "
440              "SUM(CASE WHEN status='failed' THEN 1 ELSE 0 END) AS failed, "
441              "SUM(CASE WHEN status='blocked' THEN 1 ELSE 0 END) AS blocked, "
442              "ROUND(100.0 * SUM(CASE WHEN status='completed' THEN 1 ELSE 0 END) / COUNT(*), 1) AS success_rate "
443              "FROM agent_tasks WHERE created_at > NOW() - INTERVAL '7 days' "
444              "GROUP BY assigned_to ORDER BY total DESC"
445          )
446  
447      async def live_agent_cost():
448          return await fetchall(
449              "SELECT DATE(created_at) AS date, "
450              "EXTRACT(HOUR FROM created_at) AS hour, "
451              "COUNT(*) AS invocations, SUM(estimated_cost) AS cost "
452              "FROM agent_llm_usage WHERE created_at > NOW() - INTERVAL '1 day' "
453              "GROUP BY 1, 2 ORDER BY 1, 2"
454          )
455  
456      # Coverage data comes from the coverage JSON file on disk
457      coverage_path = _PROJECT_ROOT / "coverage" / "coverage-summary.json"
458      coverage_data = {}
459      if coverage_path.exists():
460          import json as _json
461          with open(coverage_path) as f:
462              coverage_data = _json.load(f)
463  
464      return JSONResponse({
465          "agent_state": await live_agent_state(),
466          "agent_tasks": await live_agent_tasks(),
467          "agent_logs": await live_agent_logs(),
468          "agent_performance": await live_agent_performance(),
469          "agent_cost_24h": await live_agent_cost(),
470          "coverage": coverage_data,
471      }, headers=CACHE_HEADERS)
472  
473  
474  # ─────────────────────────────────────────────────────────────────────────────
475  # GET /api/v1/compliance  (Legal tab + Prompt Learning tab)
476  # ─────────────────────────────────────────────────────────────────────────────
477  
478  @app.get("/api/v1/compliance")
479  async def compliance():
480      async def live_optouts():
481          return await fetchone(
482              "SELECT "
483              "COUNT(DISTINCT email) AS total_email_optouts, "
484              "COUNT(DISTINCT phone) AS total_sms_optouts "
485              "FROM ("
486              "SELECT email, NULL AS phone FROM unsubscribed_emails "
487              "UNION ALL SELECT email, phone FROM opt_outs"
488              ") AS combined"
489          )
490  
491      async def live_platform_health():
492          return await fetchall(
493              "SELECT contact_method, COUNT(*) AS total_sent, "
494              "SUM(CASE WHEN status='bounced' THEN 1 ELSE 0 END) AS bounced, "
495              "SUM(CASE WHEN status='delivered' THEN 1 ELSE 0 END) AS delivered, "
496              "ROUND(100.0 * SUM(CASE WHEN status='bounced' THEN 1 ELSE 0 END) / COUNT(*), 2) AS bounce_rate, "
497              "ROUND(100.0 * SUM(CASE WHEN status='delivered' THEN 1 ELSE 0 END) / COUNT(*), 2) AS delivery_rate "
498              "FROM outreaches WHERE status IN ('sent','delivered','bounced','failed') "
499              "GROUP BY contact_method"
500          )
501  
502      async def live_approval_stats(days: int = 30):
503          return await fetchone(
504              "SELECT COUNT(*) AS total, "
505              "SUM(CASE WHEN feedback_type='approved' THEN 1 ELSE 0 END) AS approved, "
506              "SUM(CASE WHEN feedback_type='rework' THEN 1 ELSE 0 END) AS rework, "
507              "SUM(CASE WHEN feedback_type='rejected' THEN 1 ELSE 0 END) AS rejected, "
508              "ROUND(100.0 * SUM(CASE WHEN feedback_type='approved' THEN 1 ELSE 0 END) / NULLIF(COUNT(*),0), 1) AS approval_rate "
509              "FROM outreach_feedback WHERE created_at > NOW() - INTERVAL '30 days'"
510          )
511  
512      async def live_feedback_categories(days: int = 30):
513          return await fetchall(
514              "SELECT feedback_category, COUNT(*) AS count "
515              "FROM outreach_feedback WHERE created_at > NOW() - INTERVAL '30 days' "
516              "GROUP BY feedback_category ORDER BY count DESC LIMIT 15"
517          )
518  
519      async def live_approval_trend(days: int = 30):
520          return await fetchall(
521              "SELECT DATE(created_at) AS date, "
522              "ROUND(100.0 * SUM(CASE WHEN feedback_type='approved' THEN 1 ELSE 0 END) / COUNT(*), 1) AS approval_rate "
523              "FROM outreach_feedback WHERE created_at > NOW() - INTERVAL '30 days' "
524              "GROUP BY DATE(created_at) ORDER BY date"
525          )
526  
527      async def live_recent_feedback():
528          return await fetchall(
529              "SELECT f.id, f.feedback_type, f.feedback_category, f.feedback_text, "
530              "f.prompt_file, f.created_at, s.domain "
531              "FROM outreach_feedback f LEFT JOIN outreaches o ON o.id=f.outreach_id "
532              "LEFT JOIN sites s ON s.id=o.site_id "
533              "ORDER BY f.created_at DESC LIMIT 50"
534          )
535  
536      return JSONResponse({
537          "optouts": await live_optouts(),
538          "platform_health": await live_platform_health(),
539          "approval_stats": await live_approval_stats(),
540          "feedback_categories": await live_feedback_categories(),
541          "approval_trend": await live_approval_trend(),
542          "recent_feedback": await live_recent_feedback(),
543      }, headers=CACHE_HEADERS)
544  
545  
546  # ─────────────────────────────────────────────────────────────────────────────
547  # GET /api/v1/review  (Human Review — always live)
548  # ─────────────────────────────────────────────────────────────────────────────
549  
550  @app.get("/api/v1/review")
551  async def review():
552      queue_stats = await fetchone(
553          "SELECT COUNT(*) AS total, "
554          "SUM(CASE WHEN status='pending' THEN 1 ELSE 0 END) AS pending, "
555          "SUM(CASE WHEN priority='critical' THEN 1 ELSE 0 END) AS critical, "
556          "SUM(CASE WHEN priority='high' THEN 1 ELSE 0 END) AS high, "
557          "SUM(CASE WHEN status='reviewed' THEN 1 ELSE 0 END) AS reviewed "
558          "FROM human_review_queue"
559      )
560      pending_reviews = await fetchall(
561          "SELECT id, review_type, title, priority, status, context_json, notes, "
562          "created_at, reviewed_at, reviewed_by "
563          "FROM human_review_queue WHERE status='pending' "
564          "ORDER BY CASE priority WHEN 'critical' THEN 1 WHEN 'high' THEN 2 ELSE 3 END, created_at ASC "
565          "LIMIT 100"
566      )
567      pending_outreaches = await fetchone(
568          "SELECT "
569          "SUM(CASE WHEN contact_method='form' THEN 1 ELSE 0 END) AS form, "
570          "SUM(CASE WHEN contact_method='x' THEN 1 ELSE 0 END) AS x, "
571          "SUM(CASE WHEN contact_method='linkedin' THEN 1 ELSE 0 END) AS linkedin, "
572          "COUNT(*) AS total "
573          "FROM outreaches WHERE status='approved'"
574      )
575      failing_sites = await fetchall(
576          "SELECT id, domain, error_message, updated_at "
577          "FROM sites WHERE status='failing' ORDER BY updated_at DESC LIMIT 50"
578      )
579      unread_count = await fetchval(
580          "SELECT COUNT(*) FROM conversations WHERE read_at IS NULL AND direction='inbound'"
581      )
582      return JSONResponse({
583          "queue_stats": queue_stats,
584          "pending_reviews": pending_reviews,
585          "pending_outreaches": pending_outreaches,
586          "failing_sites": failing_sites,
587          "unread_conversations": unread_count,
588      })
589  
590  
591  # ─────────────────────────────────────────────────────────────────────────────
592  # POST /api/v1/cache/refresh  — trigger precompute-dashboard.js
593  # ─────────────────────────────────────────────────────────────────────────────
594  
595  @app.post("/api/v1/cache/refresh")
596  async def cache_refresh():
597      script = str(_PROJECT_ROOT / "src" / "cron" / "precompute-dashboard.js")
598      result = subprocess.run(
599          ["node", script],
600          capture_output=True,
601          text=True,
602          cwd=str(_PROJECT_ROOT),
603          timeout=120,
604      )
605      if result.returncode != 0:
606          raise HTTPException(status_code=500, detail=result.stderr[:500])
607      return {"ok": True, "message": "Cache refreshed"}
608  
609  
610  # ─────────────────────────────────────────────────────────────────────────────
611  # Health check
612  # ─────────────────────────────────────────────────────────────────────────────
613  
614  @app.get("/health")
615  async def health():
616      return {"status": "ok"}