/ dashboard / utils / database.py
database.py
   1  """
   2  Database Query Layer
   3  
   4  Provides read-only access to the SQLite database with caching.
   5  All functions return pandas DataFrames for easy visualization.
   6  """
   7  
   8  import sqlite3
   9  import logging
  10  from pathlib import Path
  11  from typing import Optional
  12  import pandas as pd
  13  import streamlit as st
  14  from dashboard import config
  15  
  16  logger = logging.getLogger(__name__)
  17  
  18  
  19  @st.cache_resource
  20  def get_db_connection() -> sqlite3.Connection:
  21      """
  22      Create read-only SQLite connection with connection pooling.
  23  
  24      Returns:
  25          sqlite3.Connection: Read-only database connection
  26      """
  27      conn = sqlite3.connect(
  28          config.DATABASE_PATH,
  29          check_same_thread=False,
  30          isolation_level=None,  # Autocommit mode
  31          timeout=10.0,
  32      )
  33      conn.execute("PRAGMA query_only = ON;")  # Enforce read-only
  34      conn.row_factory = sqlite3.Row  # Dict-like access
  35      return conn
  36  
  37  
  38  # ===== Pipeline Health Queries =====
  39  
  40  
  41  @st.cache_data(ttl=config.CACHE_TTL)
  42  def get_pipeline_funnel() -> pd.DataFrame:
  43      """
  44      Get count of sites at each pipeline stage.
  45  
  46      Excludes 'ignore' and 'failing' statuses as they represent sites
  47      outside the active pipeline (permanently filtered or needing manual intervention).
  48      """
  49      query = """
  50          SELECT status, COUNT(*) as count
  51          FROM sites
  52          WHERE status NOT IN ('ignore', 'failing')
  53          GROUP BY status
  54          ORDER BY
  55              CASE status
  56                  WHEN 'found' THEN 1
  57                  WHEN 'assets_captured' THEN 2
  58                  WHEN 'scored' THEN 3
  59                  WHEN 'rescored' THEN 4
  60                  WHEN 'enriched' THEN 5
  61                  WHEN 'proposals_drafted' THEN 6
  62                  WHEN 'outreach_sent' THEN 7
  63                  WHEN 'high_score' THEN 8
  64              END
  65      """
  66      conn = get_db_connection()
  67      return pd.read_sql_query(query, conn)
  68  
  69  
  70  @st.cache_data(ttl=config.CACHE_TTL)
  71  def get_excluded_sites_count() -> dict:
  72      """
  73      Get count of sites excluded from the active pipeline.
  74  
  75      Returns:
  76          dict with keys 'ignore' and 'failing' containing counts
  77      """
  78      query = """
  79          SELECT
  80              SUM(CASE WHEN status = 'ignore' THEN 1 ELSE 0 END) as ignored,
  81              SUM(CASE WHEN status = 'failing' THEN 1 ELSE 0 END) as failing
  82          FROM sites
  83      """
  84      conn = get_db_connection()
  85      result = pd.read_sql_query(query, conn)
  86      return {
  87          'ignored': int(result['ignored'].iloc[0]) if not result.empty else 0,
  88          'failing': int(result['failing'].iloc[0]) if not result.empty else 0
  89      }
  90  
  91  
  92  @st.cache_data(ttl=config.CACHE_TTL)
  93  def get_funnel_breakdown() -> dict:
  94      """
  95      Get hierarchical funnel breakdown for treemap/sunburst visualization.
  96  
  97      Returns a dict with total, ignored, failing, active counts and
  98      sub-breakdowns for each category, plus outreach delivery by channel.
  99      """
 100      conn = get_db_connection()
 101  
 102      total = pd.read_sql_query("SELECT COUNT(*) as c FROM sites", conn)['c'].iloc[0]
 103  
 104      # Ignore breakdown (categorized by error_message patterns)
 105      ignore_df = pd.read_sql_query("""
 106          SELECT
 107              CASE
 108                  WHEN error_message LIKE 'Cross-border duplicate%' THEN 'Cross-border duplicates'
 109                  WHEN error_message LIKE 'Duplicate domain%' THEN 'Duplicate keyword domain'
 110                  WHEN error_message = 'Ignored: Social media platform' THEN 'Social media'
 111                  WHEN error_message = 'Ignored: Business directory' THEN 'Business directories'
 112                  ELSE 'Other'
 113              END as category,
 114              COUNT(*) as count
 115          FROM sites
 116          WHERE status = 'ignore'
 117          GROUP BY 1
 118          ORDER BY 2 DESC
 119      """, conn)
 120  
 121      # Failing breakdown (categorized by error_message patterns)
 122      fail_df = pd.read_sql_query("""
 123          SELECT
 124              CASE
 125                  WHEN error_message LIKE '%userDataDir%' OR error_message LIKE '%browserType.launch%'
 126                      THEN 'Browser launch error'
 127                  WHEN error_message LIKE '%screenshot_path is NULL%'
 128                      THEN 'NULL screenshot constraint'
 129                  WHEN error_message LIKE '%status code 401%' OR error_message LIKE '%status code 400%'
 130                      THEN 'API auth errors'
 131                  WHEN error_message LIKE '%HTTP 403%'
 132                      THEN 'HTTP 403 blocked'
 133                  WHEN error_message LIKE '%EACCES%'
 134                      THEN 'Permission denied'
 135                  WHEN error_message LIKE '%page.content%' OR error_message LIKE '%navigating%'
 136                      THEN 'Page navigation error'
 137                  ELSE 'Other'
 138              END as category,
 139              COUNT(*) as count
 140          FROM sites
 141          WHERE status = 'failing'
 142          GROUP BY 1
 143          ORDER BY 2 DESC
 144      """, conn)
 145  
 146      # Active pipeline stage breakdown
 147      active_df = pd.read_sql_query("""
 148          SELECT status, COUNT(*) as count
 149          FROM sites
 150          WHERE status NOT IN ('ignore', 'failing')
 151          GROUP BY status
 152          ORDER BY
 153              CASE status
 154                  WHEN 'found' THEN 1
 155                  WHEN 'assets_captured' THEN 2
 156                  WHEN 'scored' THEN 3
 157                  WHEN 'rescored' THEN 4
 158                  WHEN 'enriched' THEN 5
 159                  WHEN 'proposals_drafted' THEN 6
 160                  WHEN 'outreach_sent' THEN 7
 161                  ELSE 8
 162              END
 163      """, conn)
 164  
 165      # Pending outreaches by channel
 166      outreach_df = pd.read_sql_query("""
 167          SELECT contact_method as channel, COUNT(*) as count
 168          FROM messages
 169          WHERE direction = 'outbound'
 170            AND approval_status = 'pending'
 171          GROUP BY contact_method
 172          ORDER BY count DESC
 173      """, conn)
 174  
 175      # GDPR blocked count
 176      gdpr_row = pd.read_sql_query("""
 177          SELECT COUNT(*) as count FROM messages WHERE direction = 'outbound' AND approval_status = 'gdpr_blocked'
 178      """, conn)
 179      gdpr_blocked = int(gdpr_row['count'].iloc[0]) if not gdpr_row.empty else 0
 180  
 181      return {
 182          'total': int(total),
 183          'ignored': int(ignore_df['count'].sum()) if not ignore_df.empty else 0,
 184          'ignored_breakdown': ignore_df.to_dict('records'),
 185          'failing': int(fail_df['count'].sum()) if not fail_df.empty else 0,
 186          'failing_breakdown': fail_df.to_dict('records'),
 187          'active': int(active_df['count'].sum()) if not active_df.empty else 0,
 188          'active_breakdown': active_df.to_dict('records'),
 189          'outreach_breakdown': outreach_df.to_dict('records'),
 190          'outreach_gdpr_blocked': gdpr_blocked,
 191      }
 192  
 193  
 194  @st.cache_data(ttl=config.CACHE_TTL)
 195  def get_total_active_errors() -> int:
 196      """Get total count of sites with active errors (excluding ignored sites)."""
 197      query = """
 198          SELECT COUNT(*) as total
 199          FROM sites
 200          WHERE error_message IS NOT NULL
 201            AND status NOT IN ('ignore', 'failing')
 202      """
 203      conn = get_db_connection()
 204      result = pd.read_sql_query(query, conn)
 205      return int(result["total"].iloc[0]) if not result.empty else 0
 206  
 207  
 208  @st.cache_data(ttl=config.CACHE_TTL)
 209  def get_error_breakdown() -> pd.DataFrame:
 210      """Get most common errors by count (excluding ignored sites)."""
 211      query = """
 212          SELECT
 213              error_message,
 214              status as stage,
 215              COUNT(*) as count
 216          FROM sites
 217          WHERE error_message IS NOT NULL
 218            AND status != 'ignore'
 219          GROUP BY error_message, status
 220          ORDER BY count DESC
 221          LIMIT 20
 222      """
 223      conn = get_db_connection()
 224      return pd.read_sql_query(query, conn)
 225  
 226  
 227  @st.cache_data(ttl=config.CACHE_TTL)
 228  def get_stuck_sites_by_error() -> pd.DataFrame:
 229      """Get sites stuck at same stage with errors."""
 230      query = """
 231          SELECT
 232              id,
 233              domain,
 234              status as stage,
 235              error_message,
 236              updated_at,
 237              julianday('now') - julianday(updated_at) as days_stuck
 238          FROM sites
 239          WHERE error_message IS NOT NULL
 240            AND julianday('now') - julianday(updated_at) > 1
 241          ORDER BY days_stuck DESC
 242          LIMIT 100
 243      """
 244      conn = get_db_connection()
 245      return pd.read_sql_query(query, conn)
 246  
 247  
 248  @st.cache_data(ttl=config.CACHE_TTL)
 249  def get_failing_sites() -> pd.DataFrame:
 250      """Get sites marked as 'failing' after exceeding retry limits."""
 251      query = """
 252          SELECT
 253              id,
 254              domain,
 255              status,
 256              error_message,
 257              retry_count,
 258              last_retry_at,
 259              updated_at
 260          FROM sites
 261          WHERE status = 'failing'
 262          ORDER BY updated_at DESC
 263          LIMIT 100
 264      """
 265      conn = get_db_connection()
 266      return pd.read_sql_query(query, conn)
 267  
 268  
 269  @st.cache_data(ttl=config.CACHE_TTL)
 270  def get_daily_throughput(days: int = 30) -> pd.DataFrame:
 271      """Get sites processed per day."""
 272      query = """
 273          SELECT
 274              DATE(created_at) as date,
 275              COUNT(*) as sites_added
 276          FROM sites
 277          WHERE created_at >= datetime('now', '-{} days')
 278          GROUP BY DATE(created_at)
 279          ORDER BY date
 280      """.format(
 281          days
 282      )
 283      conn = get_db_connection()
 284      return pd.read_sql_query(query, conn)
 285  
 286  
 287  @st.cache_data(ttl=config.CACHE_TTL)
 288  def get_hourly_throughput(hours: int = 48) -> pd.DataFrame:
 289      """Get sites processed per hour for the last N hours."""
 290      # Try cache first for 48-hour request (most common)
 291      if hours == 48:
 292          cached = get_from_cache('chart_hourly_throughput_48h')
 293          if cached:
 294              return pd.DataFrame(cached)
 295  
 296      query = """
 297          SELECT
 298              strftime('%Y-%m-%d %H:00:00', created_at) as hour,
 299              COUNT(*) as sites_added
 300          FROM sites
 301          WHERE created_at >= datetime('now', '-{} hours')
 302          GROUP BY strftime('%Y-%m-%d %H:00:00', created_at)
 303          ORDER BY hour
 304      """.format(
 305          hours
 306      )
 307      conn = get_db_connection()
 308      return pd.read_sql_query(query, conn)
 309  
 310  
 311  @st.cache_data(ttl=config.CACHE_TTL)
 312  def get_hourly_status_breakdown(hours: int = 48) -> pd.DataFrame:
 313      """
 314      Get sites processed per hour broken down by status for the last N hours.
 315      Returns a pivot table suitable for stacked bar charts.
 316  
 317      Filters out 'failing' and 'ignore' statuses as they represent sites
 318      out of the pipeline (manual intervention needed or permanently excluded).
 319      """
 320      # Try cache first for 48-hour request (most common)
 321      if hours == 48:
 322          cached = get_from_cache('chart_hourly_status_breakdown_48h')
 323          if cached:
 324              df = pd.DataFrame(cached)
 325              if not df.empty:
 326                  # Pivot the cached data
 327                  pivot_df = df.pivot_table(
 328                      index='hour',
 329                      columns='status',
 330                      values='count',
 331                      fill_value=0
 332                  ).reset_index()
 333                  return pivot_df
 334              return pd.DataFrame()
 335  
 336      query = """
 337          SELECT
 338              strftime('%Y-%m-%d %H:00:00', created_at) as hour,
 339              status,
 340              COUNT(*) as count
 341          FROM site_status
 342          WHERE created_at >= datetime('now', '-{} hours')
 343              AND status NOT IN ('failing', 'ignore')
 344          GROUP BY strftime('%Y-%m-%d %H:00:00', created_at), status
 345          ORDER BY hour, status
 346      """.format(
 347          hours
 348      )
 349      conn = get_db_connection()
 350      df = pd.read_sql_query(query, conn)
 351  
 352      # Pivot the data to make it suitable for stacked bar charts
 353      # Each row is an hour, each column is a status
 354      if not df.empty:
 355          pivot_df = df.pivot_table(
 356              index='hour',
 357              columns='status',
 358              values='count',
 359              fill_value=0
 360          ).reset_index()
 361          return pivot_df
 362      else:
 363          return pd.DataFrame()
 364  
 365  
 366  # ===== Outreach Queries =====
 367  
 368  
 369  @st.cache_data(ttl=config.CACHE_TTL)
 370  def get_response_rates() -> pd.DataFrame:
 371      """Get response rate by channel."""
 372      query = """
 373          SELECT
 374              o.contact_method as channel,
 375              COUNT(DISTINCT o.id) as total_sent,
 376              COUNT(DISTINCT CASE WHEN r.direction = 'inbound' THEN r.site_id END) as responses,
 377              ROUND(100.0 * COUNT(DISTINCT CASE WHEN r.direction = 'inbound' THEN r.site_id END) / COUNT(DISTINCT o.id), 2) as response_rate
 378          FROM messages o
 379          LEFT JOIN messages r ON r.site_id = o.site_id AND r.direction = 'inbound'
 380          WHERE o.direction = 'outbound'
 381            AND o.delivery_status IN ('sent', 'delivered')
 382          GROUP BY o.contact_method
 383          ORDER BY response_rate DESC
 384      """
 385      conn = get_db_connection()
 386      return pd.read_sql_query(query, conn)
 387  
 388  
 389  @st.cache_data(ttl=config.CACHE_TTL)
 390  def get_sales_data() -> pd.DataFrame:
 391      """Get sales tracking data."""
 392      query = """
 393          SELECT
 394              o.contact_method as channel,
 395              COUNT(CASE WHEN s.resulted_in_sale = 1 THEN 1 END) as sales_count,
 396              COALESCE(SUM(s.sale_amount), 0) as total_revenue,
 397              COUNT(*) as total_outreaches,
 398              ROUND(100.0 * COUNT(CASE WHEN s.resulted_in_sale = 1 THEN 1 END) / COUNT(*), 2) as conversion_rate
 399          FROM messages o
 400          LEFT JOIN sites s ON s.id = o.site_id
 401          WHERE o.direction = 'outbound'
 402            AND o.delivery_status IN ('sent', 'delivered')
 403          GROUP BY o.contact_method
 404      """
 405      conn = get_db_connection()
 406      return pd.read_sql_query(query, conn)
 407  
 408  
 409  @st.cache_data(ttl=config.CACHE_TTL)
 410  def get_outreach_funnel() -> pd.DataFrame:
 411      """Get delivery funnel by channel."""
 412      query = """
 413          SELECT
 414              o.contact_method as channel,
 415              COUNT(*) as total,
 416              SUM(CASE WHEN o.approval_status = 'pending' THEN 1 ELSE 0 END) as pending,
 417              SUM(CASE WHEN o.delivery_status = 'sent' THEN 1 ELSE 0 END) as sent,
 418              SUM(CASE WHEN o.delivery_status = 'delivered' THEN 1 ELSE 0 END) as delivered,
 419              SUM(CASE WHEN o.delivery_status = 'failed' THEN 1 ELSE 0 END) as failed,
 420              SUM(CASE WHEN o.delivery_status = 'bounced' THEN 1 ELSE 0 END) as bounced,
 421              ROUND(100.0 * SUM(CASE WHEN o.delivery_status = 'delivered' THEN 1 ELSE 0 END) / COUNT(*), 2) as delivery_rate
 422          FROM messages o
 423          WHERE o.direction = 'outbound'
 424          GROUP BY o.contact_method
 425          ORDER BY total DESC
 426      """
 427      conn = get_db_connection()
 428      return pd.read_sql_query(query, conn)
 429  
 430  
 431  # ===== LLM Usage & Cost Queries =====
 432  
 433  
 434  @st.cache_data(ttl=config.CACHE_TTL)
 435  def get_llm_usage_by_stage() -> pd.DataFrame:
 436      """Get LLM token usage and costs by pipeline stage."""
 437      query = """
 438          SELECT
 439              stage,
 440              SUM(prompt_tokens) as prompt_tokens,
 441              SUM(completion_tokens) as completion_tokens,
 442              SUM(total_tokens) as total_tokens,
 443              SUM(estimated_cost) as total_cost,
 444              COUNT(*) as request_count,
 445              AVG(estimated_cost) as avg_cost_per_request
 446          FROM llm_usage
 447          GROUP BY stage
 448          ORDER BY total_cost DESC
 449      """
 450      conn = get_db_connection()
 451      return pd.read_sql_query(query, conn)
 452  
 453  
 454  @st.cache_data(ttl=config.CACHE_TTL)
 455  def get_llm_usage_by_provider() -> pd.DataFrame:
 456      """Get LLM costs by provider."""
 457      query = """
 458          SELECT
 459              provider,
 460              model,
 461              SUM(total_tokens) as total_tokens,
 462              SUM(estimated_cost) as total_cost,
 463              COUNT(*) as request_count
 464          FROM llm_usage
 465          GROUP BY provider, model
 466          ORDER BY total_cost DESC
 467      """
 468      conn = get_db_connection()
 469      return pd.read_sql_query(query, conn)
 470  
 471  
 472  @st.cache_data(ttl=config.CACHE_TTL)
 473  def get_llm_daily_costs(days: int = 30) -> pd.DataFrame:
 474      """Get daily LLM costs over time."""
 475      # Try cache first for 30-day request (most common)
 476      if days == 30:
 477          cached = get_from_cache('chart_llm_daily_costs_30d')
 478          if cached:
 479              return pd.DataFrame(cached)
 480  
 481      query = """
 482          SELECT
 483              DATE(created_at) as date,
 484              SUM(estimated_cost) as daily_cost,
 485              SUM(total_tokens) as daily_tokens,
 486              COUNT(*) as request_count
 487          FROM llm_usage
 488          WHERE created_at >= datetime('now', '-{} days')
 489          GROUP BY DATE(created_at)
 490          ORDER BY date
 491      """.format(
 492          days
 493      )
 494      conn = get_db_connection()
 495      return pd.read_sql_query(query, conn)
 496  
 497  
 498  @st.cache_data(ttl=config.CACHE_TTL)
 499  def get_llm_cost_per_outreach() -> pd.DataFrame:
 500      """Calculate average LLM cost per site that resulted in outreach."""
 501      query = """
 502          SELECT
 503              COUNT(DISTINCT s.id) as sites_with_outreach,
 504              COALESCE(SUM(l.estimated_cost), 0) as total_llm_cost,
 505              CASE
 506                  WHEN COUNT(DISTINCT s.id) > 0 THEN COALESCE(SUM(l.estimated_cost), 0) / COUNT(DISTINCT s.id)
 507                  ELSE 0
 508              END as avg_cost_per_site
 509          FROM sites s
 510          INNER JOIN messages o ON o.site_id = s.id AND o.direction = 'outbound'
 511          LEFT JOIN llm_usage l ON l.site_id = s.id
 512          WHERE s.status = 'outreach_sent'
 513      """
 514      conn = get_db_connection()
 515      return pd.read_sql_query(query, conn)
 516  
 517  
 518  @st.cache_data(ttl=config.CACHE_TTL)
 519  def get_llm_cost_by_stage_and_date(days: int = 30) -> pd.DataFrame:
 520      """Get LLM costs broken down by stage over time."""
 521      # Try cache first for 30-day request (most common)
 522      if days == 30:
 523          cached = get_from_cache('chart_llm_cost_by_stage_30d')
 524          if cached:
 525              return pd.DataFrame(cached)
 526  
 527      query = """
 528          SELECT
 529              DATE(created_at) as date,
 530              stage,
 531              SUM(estimated_cost) as cost
 532          FROM llm_usage
 533          WHERE created_at >= datetime('now', '-{} days')
 534          GROUP BY DATE(created_at), stage
 535          ORDER BY date, stage
 536      """.format(
 537          days
 538      )
 539      conn = get_db_connection()
 540      return pd.read_sql_query(query, conn)
 541  
 542  
 543  # ===== Conversation Queries =====
 544  
 545  
 546  @st.cache_data(ttl=config.CACHE_TTL)
 547  def get_conversation_stats() -> pd.DataFrame:
 548      """Get conversation statistics."""
 549      query = """
 550          SELECT
 551              COUNT(*) as total_conversations,
 552              SUM(CASE WHEN direction = 'inbound' THEN 1 ELSE 0 END) as inbound_count,
 553              SUM(CASE WHEN read_at IS NULL AND direction = 'inbound' THEN 1 ELSE 0 END) as unread_count
 554          FROM messages
 555      """
 556      conn = get_db_connection()
 557      return pd.read_sql_query(query, conn)
 558  
 559  
 560  @st.cache_data(ttl=config.CACHE_TTL)
 561  def get_sentiment_distribution() -> pd.DataFrame:
 562      """Get sentiment breakdown."""
 563      query = """
 564          SELECT
 565              sentiment,
 566              COUNT(*) as count
 567          FROM messages
 568          WHERE direction = 'inbound' AND sentiment IS NOT NULL
 569          GROUP BY sentiment
 570          ORDER BY count DESC
 571      """
 572      conn = get_db_connection()
 573      return pd.read_sql_query(query, conn)
 574  
 575  
 576  @st.cache_data(ttl=config.CACHE_TTL)
 577  def get_conversation_threads(limit: int = 50) -> pd.DataFrame:
 578      """Get recent conversation threads."""
 579      query = """
 580          SELECT
 581              c.id,
 582              c.site_id,
 583              c.direction,
 584              c.contact_method as channel,
 585              c.contact_uri as sender_identifier,
 586              c.message_body,
 587              c.sentiment,
 588              c.created_at as received_at,
 589              c.read_at,
 590              c.contact_uri,
 591              s.domain
 592          FROM messages c
 593          LEFT JOIN sites s ON s.id = c.site_id
 594          WHERE c.direction = 'inbound'
 595          ORDER BY c.created_at DESC
 596          LIMIT {}
 597      """.format(
 598          limit
 599      )
 600      conn = get_db_connection()
 601      return pd.read_sql_query(query, conn)
 602  
 603  
 604  @st.cache_data(ttl=config.CACHE_TTL)
 605  def get_threaded_conversations(limit: int = 50) -> pd.DataFrame:
 606      """Get conversation threads grouped by outreach_id with all messages."""
 607      query = """
 608          WITH recent_sites AS (
 609              SELECT DISTINCT site_id
 610              FROM messages
 611              WHERE direction = 'inbound'
 612              ORDER BY created_at DESC
 613              LIMIT {}
 614          )
 615          SELECT
 616              c.id,
 617              c.site_id as outreach_id,
 618              c.direction,
 619              c.contact_method as channel,
 620              c.contact_uri as sender_identifier,
 621              c.message_body,
 622              c.subject_line,
 623              c.sentiment,
 624              c.created_at as received_at,
 625              c.read_at,
 626              c.replied_at,
 627              COALESCE(c.autoresponder_enabled, 1) as autoresponder_enabled,
 628              c.contact_uri,
 629              c.message_body as proposal_text,
 630              s.domain,
 631              s.id as site_id
 632          FROM messages c
 633          LEFT JOIN sites s ON s.id = c.site_id
 634          WHERE c.site_id IN (SELECT site_id FROM recent_sites)
 635          ORDER BY c.site_id DESC, c.created_at ASC
 636      """.format(
 637          limit
 638      )
 639      conn = get_db_connection()
 640      return pd.read_sql_query(query, conn)
 641  
 642  
 643  # ===== Compliance & Rate Limit Queries =====
 644  
 645  
 646  @st.cache_data(ttl=config.CACHE_TTL)
 647  def get_optout_stats() -> pd.DataFrame:
 648      """Get opt-out statistics."""
 649      query = """
 650          SELECT
 651              COUNT(DISTINCT email) as total_email_optouts,
 652              COUNT(DISTINCT phone) as total_sms_optouts
 653          FROM (
 654              SELECT email, NULL as phone FROM unsubscribed_emails
 655              UNION ALL
 656              SELECT email, phone FROM opt_outs
 657          )
 658      """
 659      conn = get_db_connection()
 660      return pd.read_sql_query(query, conn)
 661  
 662  
 663  @st.cache_data(ttl=config.CACHE_TTL)
 664  def get_platform_health() -> pd.DataFrame:
 665      """Get platform health metrics (bounce/complaint rates)."""
 666      query = """
 667          SELECT
 668              contact_method,
 669              COUNT(*) as total_sent,
 670              SUM(CASE WHEN delivery_status = 'bounced' THEN 1 ELSE 0 END) as bounced,
 671              SUM(CASE WHEN delivery_status = 'delivered' THEN 1 ELSE 0 END) as delivered,
 672              ROUND(100.0 * SUM(CASE WHEN delivery_status = 'bounced' THEN 1 ELSE 0 END) / COUNT(*), 2) as bounce_rate,
 673              ROUND(100.0 * SUM(CASE WHEN delivery_status = 'delivered' THEN 1 ELSE 0 END) / COUNT(*), 2) as delivery_rate
 674          FROM messages
 675          WHERE direction = 'outbound'
 676            AND delivery_status IN ('sent', 'delivered', 'bounced', 'failed')
 677          GROUP BY contact_method
 678      """
 679      conn = get_db_connection()
 680      return pd.read_sql_query(query, conn)
 681  
 682  
 683  # ===== System Health Queries =====
 684  
 685  
 686  @st.cache_data(ttl=config.CACHE_TTL)
 687  def get_cron_job_status() -> pd.DataFrame:
 688      """Get cron job last run timestamps from config table."""
 689      query = """
 690          SELECT
 691              REPLACE(key, 'cron_last_run_', '') as job_name,
 692              value as last_run,
 693              description
 694          FROM config
 695          WHERE key LIKE 'cron_last_run_%'
 696          ORDER BY value DESC
 697      """
 698      conn = get_db_connection()
 699      return pd.read_sql_query(query, conn)
 700  
 701  
 702  @st.cache_data(ttl=config.CACHE_TTL)
 703  def get_cron_job_logs(job_name: str = None, limit: int = 50) -> pd.DataFrame:
 704      """Get detailed cron job execution logs."""
 705      if job_name:
 706          query = """
 707              SELECT
 708                  id,
 709                  job_name,
 710                  started_at,
 711                  finished_at,
 712                  status,
 713                  summary,
 714                  items_processed,
 715                  items_failed,
 716                  error_message,
 717                  CAST((julianday(finished_at) - julianday(started_at)) * 24 * 60 AS INTEGER) as duration_minutes
 718              FROM cron_job_logs
 719              WHERE job_name = ?
 720              ORDER BY started_at DESC
 721              LIMIT ?
 722          """
 723          conn = get_db_connection()
 724          return pd.read_sql_query(query, conn, params=(job_name, limit))
 725      else:
 726          query = """
 727              SELECT
 728                  id,
 729                  job_name,
 730                  started_at,
 731                  finished_at,
 732                  status,
 733                  summary,
 734                  items_processed,
 735                  items_failed,
 736                  error_message,
 737                  CAST((julianday(finished_at) - julianday(started_at)) * 24 * 60 AS INTEGER) as duration_minutes
 738              FROM cron_job_logs
 739              ORDER BY started_at DESC
 740              LIMIT ?
 741          """
 742          conn = get_db_connection()
 743          return pd.read_sql_query(query, conn, params=(limit,))
 744  
 745  
 746  @st.cache_data(ttl=config.CACHE_TTL)
 747  def get_cron_job_full_log(log_id: int) -> str:
 748      """Get full log text for a specific cron job execution."""
 749      query = """
 750          SELECT full_log
 751          FROM cron_job_logs
 752          WHERE id = ?
 753      """
 754      conn = get_db_connection()
 755      result = pd.read_sql_query(query, conn, params=(log_id,))
 756      return result["full_log"].iloc[0] if not result.empty else ""
 757  
 758  
 759  @st.cache_data(ttl=config.CACHE_TTL)
 760  def get_cron_job_summary() -> pd.DataFrame:
 761      """
 762      Get summary statistics for each cron job with adaptive time windows.
 763  
 764      Time windows based on frequency:
 765      - Jobs running multiple times per day (minutes/hours): last 24 hours
 766      - Jobs running every 1-6 days: last 7 days
 767      - Jobs running every 7+ days: last 30 days
 768      """
 769      query = """
 770          SELECT
 771              l.job_name,
 772              COUNT(*) as total_runs,
 773              SUM(CASE WHEN l.status = 'success' THEN 1 ELSE 0 END) as successful_runs,
 774              SUM(CASE WHEN l.status = 'failed' THEN 1 ELSE 0 END) as failed_runs,
 775              ROUND(100.0 * SUM(CASE WHEN l.status = 'success' THEN 1 ELSE 0 END) / COUNT(*), 1) as success_rate,
 776              MAX(l.started_at) as last_run,
 777              AVG(CAST((julianday(l.finished_at) - julianday(l.started_at)) * 24 * 60 AS REAL)) as avg_duration_minutes,
 778              j.interval_value,
 779              j.interval_unit
 780          FROM cron_job_logs l
 781          LEFT JOIN cron_jobs j ON j.name = l.job_name
 782          WHERE l.started_at >= datetime('now',
 783              CASE
 784                  WHEN j.interval_unit IN ('minutes', 'hours') THEN '-1 days'
 785                  WHEN j.interval_unit = 'days' AND j.interval_value < 7 THEN '-7 days'
 786                  ELSE '-30 days'
 787              END
 788          )
 789          GROUP BY l.job_name, j.interval_value, j.interval_unit
 790          ORDER BY last_run DESC
 791      """
 792      conn = get_db_connection()
 793      return pd.read_sql_query(query, conn)
 794  
 795  
 796  @st.cache_data(ttl=config.CACHE_TTL)
 797  def get_database_health() -> dict:
 798      """Get database health metrics."""
 799      conn = get_db_connection()
 800  
 801      # Database size
 802      size_query = """
 803          SELECT page_count * page_size as size_bytes
 804          FROM pragma_page_count(), pragma_page_size()
 805      """
 806      size_df = pd.read_sql_query(size_query, conn)
 807      size_mb = size_df["size_bytes"].iloc[0] / (1024 * 1024) if len(size_df) > 0 else 0
 808  
 809      # Table counts
 810      counts_query = """
 811          SELECT 'sites' as table_name, COUNT(*) as count FROM sites
 812          UNION ALL
 813          SELECT 'messages', COUNT(*) FROM messages
 814          UNION ALL
 815          SELECT 'keywords', COUNT(*) FROM keywords
 816      """
 817      counts_df = pd.read_sql_query(counts_query, conn)
 818  
 819      # Integrity check
 820      integrity = conn.execute("PRAGMA integrity_check").fetchone()[0]
 821  
 822      return {"size_mb": size_mb, "integrity": integrity, "table_counts": counts_df}
 823  
 824  
 825  @st.cache_data(ttl=config.CACHE_TTL)
 826  def get_http_error_history(days: int = 30) -> pd.DataFrame:
 827      """Get HTTP error code history for IP burnout tracking."""
 828      # Try cache first for 30-day request (most common)
 829      if days == 30:
 830          cached = get_from_cache('chart_http_errors_30d')
 831          if cached:
 832              return pd.DataFrame(cached)
 833  
 834      query = """
 835          SELECT
 836              DATE(updated_at) as date,
 837              http_status_code,
 838              status as stage,
 839              COUNT(*) as count
 840          FROM sites
 841          WHERE updated_at >= datetime('now', '-{} days')
 842            AND http_status_code IS NOT NULL
 843            AND http_status_code != 200
 844            AND status IN ('assets_captured', 'enriched')
 845          GROUP BY DATE(updated_at), http_status_code, status
 846          ORDER BY date, http_status_code
 847      """.format(
 848          days
 849      )
 850      conn = get_db_connection()
 851      return pd.read_sql_query(query, conn)
 852  
 853  
 854  @st.cache_data(ttl=config.CACHE_TTL)
 855  def get_api_rate_limits() -> pd.DataFrame:
 856      """Get recent rate limit errors from sites."""
 857      query = """
 858          SELECT
 859              status as stage,
 860              error_message,
 861              COUNT(*) as count,
 862              MAX(updated_at) as last_occurrence
 863          FROM sites
 864          WHERE error_message LIKE '%rate%limit%'
 865             OR error_message LIKE '%429%'
 866             OR error_message LIKE '%quota%'
 867          GROUP BY status, error_message
 868          ORDER BY last_occurrence DESC
 869          LIMIT 20
 870      """
 871      conn = get_db_connection()
 872      return pd.read_sql_query(query, conn)
 873  
 874  
 875  # ===== LLM Cost & Forecasting Queries =====
 876  
 877  
 878  @st.cache_data(ttl=config.CACHE_TTL)
 879  def get_llm_cost_by_stage() -> pd.DataFrame:
 880      """Get LLM cost breakdown by pipeline stage."""
 881      query = """
 882          SELECT
 883              stage,
 884              SUM(estimated_cost) as total_cost,
 885              SUM(total_tokens) as total_tokens,
 886              COUNT(*) as request_count,
 887              AVG(estimated_cost) as avg_cost_per_request,
 888              SUM(prompt_tokens) as total_prompt_tokens,
 889              SUM(completion_tokens) as total_completion_tokens
 890          FROM llm_usage
 891          GROUP BY stage
 892          ORDER BY total_cost DESC
 893      """
 894      conn = get_db_connection()
 895      return pd.read_sql_query(query, conn)
 896  
 897  
 898  @st.cache_data(ttl=config.CACHE_TTL)
 899  def get_llm_cost_by_model() -> pd.DataFrame:
 900      """Get LLM cost breakdown by model."""
 901      query = """
 902          SELECT
 903              model,
 904              provider,
 905              SUM(estimated_cost) as total_cost,
 906              SUM(total_tokens) as total_tokens,
 907              COUNT(*) as request_count
 908          FROM llm_usage
 909          GROUP BY model, provider
 910          ORDER BY total_cost DESC
 911      """
 912      conn = get_db_connection()
 913      return pd.read_sql_query(query, conn)
 914  
 915  
 916  @st.cache_data(ttl=config.CACHE_TTL)
 917  def get_llm_cost_over_time(days: int = 30) -> pd.DataFrame:
 918      """Get daily LLM costs over time."""
 919      query = """
 920          SELECT
 921              DATE(created_at) as date,
 922              SUM(estimated_cost) as daily_cost,
 923              SUM(total_tokens) as daily_tokens,
 924              COUNT(*) as daily_requests
 925          FROM llm_usage
 926          WHERE created_at >= datetime('now', '-{} days')
 927          GROUP BY DATE(created_at)
 928          ORDER BY date
 929      """.format(
 930          days
 931      )
 932      conn = get_db_connection()
 933      return pd.read_sql_query(query, conn)
 934  
 935  
 936  @st.cache_data(ttl=config.CACHE_TTL)
 937  def get_cost_per_site() -> pd.DataFrame:
 938      """Get average LLM cost per site by stage."""
 939      query = """
 940          SELECT
 941              s.status,
 942              COUNT(DISTINCT s.id) as site_count,
 943              COALESCE(SUM(l.estimated_cost), 0) as total_cost,
 944              COALESCE(AVG(l.estimated_cost), 0) as avg_cost_per_site
 945          FROM sites s
 946          LEFT JOIN llm_usage l ON l.site_id = s.id
 947          WHERE s.status IN ('scored', 'rescored', 'enriched', 'proposals_drafted', 'outreach_sent')
 948          GROUP BY s.status
 949          ORDER BY
 950              CASE s.status
 951                  WHEN 'scored' THEN 1
 952                  WHEN 'rescored' THEN 2
 953                  WHEN 'enriched' THEN 3
 954                  WHEN 'proposals_drafted' THEN 4
 955                  WHEN 'outreach_sent' THEN 5
 956              END
 957      """
 958      conn = get_db_connection()
 959      return pd.read_sql_query(query, conn)
 960  
 961  
 962  @st.cache_data(ttl=config.CACHE_TTL)
 963  def get_cost_per_response() -> pd.DataFrame:
 964      """Calculate cost per response by channel."""
 965      query = """
 966          SELECT
 967              o.contact_method as channel,
 968              COUNT(DISTINCT CASE WHEN r.direction = 'inbound' THEN r.site_id END) as responses,
 969              COALESCE(SUM(l.estimated_cost), 0) as total_cost,
 970              CASE
 971                  WHEN COUNT(DISTINCT CASE WHEN r.direction = 'inbound' THEN r.site_id END) > 0
 972                  THEN COALESCE(SUM(l.estimated_cost), 0) / COUNT(DISTINCT CASE WHEN r.direction = 'inbound' THEN r.site_id END)
 973                  ELSE 0
 974              END as cost_per_response
 975          FROM messages o
 976          LEFT JOIN messages r ON r.site_id = o.site_id AND r.direction = 'inbound'
 977          LEFT JOIN llm_usage l ON l.site_id = o.site_id
 978          WHERE o.direction = 'outbound'
 979            AND o.delivery_status IN ('sent', 'delivered')
 980          GROUP BY o.contact_method
 981          ORDER BY cost_per_response
 982      """
 983      conn = get_db_connection()
 984      return pd.read_sql_query(query, conn)
 985  
 986  
 987  @st.cache_data(ttl=config.CACHE_TTL)
 988  def get_cost_forecast() -> dict:
 989      """Generate cost forecast based on current usage patterns."""
 990      conn = get_db_connection()
 991  
 992      # Get average cost per site at each stage
 993      stage_costs_query = """
 994          SELECT
 995              stage,
 996              AVG(estimated_cost) as avg_cost
 997          FROM llm_usage
 998          GROUP BY stage
 999      """
1000      stage_costs = pd.read_sql_query(stage_costs_query, conn)
1001  
1002      # Get current pipeline counts
1003      pipeline_query = """
1004          SELECT
1005              CASE
1006                  WHEN status IN ('pending', 'assets_captured') THEN 'pending_scoring'
1007                  WHEN status = 'scored' THEN 'pending_proposals'
1008                  WHEN status IN ('rescored', 'enriched', 'proposals_drafted') THEN 'pending_outreach'
1009                  ELSE 'completed'
1010              END as pipeline_stage,
1011              COUNT(*) as count
1012          FROM sites
1013          GROUP BY pipeline_stage
1014      """
1015      pipeline = pd.read_sql_query(pipeline_query, conn)
1016  
1017      # Calculate forecast
1018      forecast = {
1019          "stage_costs": stage_costs.to_dict("records"),
1020          "pipeline_counts": pipeline.to_dict("records"),
1021      }
1022  
1023      return forecast
1024  
1025  
1026  @st.cache_data(ttl=config.CACHE_TTL)
1027  def get_new_records_24h() -> pd.DataFrame:
1028      """Get count of new records created in the last 24 hours, broken down by table and status."""
1029      conn = get_db_connection()
1030  
1031      # Sites table
1032      sites_query = """
1033          SELECT
1034              'sites' as table_name,
1035              status,
1036              COUNT(*) as count
1037          FROM sites
1038          WHERE created_at >= datetime('now', '-24 hours')
1039          GROUP BY status
1040      """
1041  
1042      # Outbound messages (formerly outreaches)
1043      outreaches_query = """
1044          SELECT
1045              'outreaches' as table_name,
1046              COALESCE(delivery_status, approval_status) as status,
1047              COUNT(*) as count
1048          FROM messages
1049          WHERE direction = 'outbound'
1050            AND created_at >= datetime('now', '-24 hours')
1051          GROUP BY COALESCE(delivery_status, approval_status)
1052      """
1053  
1054      # Inbound messages (formerly conversations)
1055      conversations_query = """
1056          SELECT
1057              'conversations' as table_name,
1058              direction as status,
1059              COUNT(*) as count
1060          FROM messages
1061          WHERE direction = 'inbound'
1062            AND created_at >= datetime('now', '-24 hours')
1063          GROUP BY direction
1064      """
1065  
1066      # Keywords table
1067      keywords_query = """
1068          SELECT
1069              'keywords' as table_name,
1070              status,
1071              COUNT(*) as count
1072          FROM keywords
1073          WHERE created_at >= datetime('now', '-24 hours')
1074          GROUP BY status
1075      """
1076  
1077      # LLM usage table
1078      llm_query = """
1079          SELECT
1080              'llm_usage' as table_name,
1081              stage as status,
1082              COUNT(*) as count
1083          FROM llm_usage
1084          WHERE created_at >= datetime('now', '-24 hours')
1085          GROUP BY stage
1086      """
1087  
1088      # Combine all queries
1089      combined_query = f"""
1090          {sites_query}
1091          UNION ALL
1092          {outreaches_query}
1093          UNION ALL
1094          {conversations_query}
1095          UNION ALL
1096          {keywords_query}
1097          UNION ALL
1098          {llm_query}
1099          ORDER BY table_name, status
1100      """
1101  
1102      return pd.read_sql_query(combined_query, conn)
1103  
1104  
1105  # ===== Cron Job Management Queries =====
1106  
1107  
1108  @st.cache_resource
1109  def get_writable_db_connection() -> sqlite3.Connection:
1110      """
1111      Create writable SQLite connection for cron job management.
1112  
1113      Returns:
1114          sqlite3.Connection: Writable database connection
1115      """
1116      conn = sqlite3.connect(
1117          config.DATABASE_PATH, check_same_thread=False, isolation_level=None, timeout=10.0
1118      )
1119      conn.row_factory = sqlite3.Row
1120      return conn
1121  
1122  
1123  @st.cache_data(ttl=30)  # Cache for 30 seconds (more frequent updates)
1124  def get_cron_jobs() -> pd.DataFrame:
1125      """Get all cron jobs with their current status."""
1126      query = """
1127          SELECT
1128              id,
1129              name,
1130              task_key,
1131              description,
1132              handler_type,
1133              handler_value,
1134              interval_value,
1135              interval_unit,
1136              enabled,
1137              last_run_at,
1138              created_at,
1139              updated_at
1140          FROM cron_jobs
1141          ORDER BY interval_value, interval_unit, name
1142      """
1143      conn = get_db_connection()
1144      return pd.read_sql_query(query, conn)
1145  
1146  
1147  @st.cache_data(ttl=30)
1148  def get_cron_job_stats() -> dict:
1149      """
1150      Get cron job statistics with adaptive time windows.
1151  
1152      Success rate calculated using frequency-based windows:
1153      - Jobs running multiple times per day: last 24 hours
1154      - Jobs running every 1-6 days: last 7 days
1155      - Jobs running every 7+ days: last 30 days
1156      """
1157      conn = get_db_connection()
1158  
1159      stats_query = """
1160          SELECT
1161              COUNT(*) as total,
1162              SUM(CASE WHEN enabled = 1 THEN 1 ELSE 0 END) as enabled,
1163              SUM(CASE WHEN enabled = 0 THEN 1 ELSE 0 END) as disabled
1164          FROM cron_jobs
1165      """
1166      stats = conn.execute(stats_query).fetchone()
1167  
1168      # Aggregate execution stats across all jobs using adaptive time windows
1169      exec_query = """
1170          SELECT
1171              SUM(total_runs) as total_executions,
1172              SUM(successful_runs) as successful,
1173              SUM(failed_runs) as failed
1174          FROM (
1175              SELECT
1176                  l.job_name,
1177                  COUNT(*) as total_runs,
1178                  SUM(CASE WHEN l.status = 'success' THEN 1 ELSE 0 END) as successful_runs,
1179                  SUM(CASE WHEN l.status = 'failed' THEN 1 ELSE 0 END) as failed_runs
1180              FROM cron_job_logs l
1181              LEFT JOIN cron_jobs j ON j.name = l.job_name
1182              WHERE l.started_at >= datetime('now',
1183                  CASE
1184                      WHEN j.interval_unit IN ('minutes', 'hours') THEN '-1 days'
1185                      WHEN j.interval_unit = 'days' AND j.interval_value < 7 THEN '-7 days'
1186                      ELSE '-30 days'
1187                  END
1188              )
1189              GROUP BY l.job_name
1190          )
1191      """
1192      exec_stats = conn.execute(exec_query).fetchone()
1193  
1194      return {
1195          "total_jobs": stats["total"],
1196          "enabled_jobs": stats["enabled"],
1197          "disabled_jobs": stats["disabled"],
1198          "total_executions": exec_stats["total_executions"] or 0,
1199          "successful_executions": exec_stats["successful"] or 0,
1200          "failed_executions": exec_stats["failed"] or 0,
1201      }
1202  
1203  
1204  @st.cache_data(ttl=30)
1205  def get_cron_job_logs(task_name: Optional[str] = None, limit: int = 100) -> pd.DataFrame:
1206      """Get cron job execution logs."""
1207      if task_name:
1208          query = """
1209              SELECT
1210                  id,
1211                  job_name,
1212                  started_at,
1213                  finished_at,
1214                  status,
1215                  summary,
1216                  items_processed,
1217                  items_failed,
1218                  error_message,
1219                  ROUND((julianday(finished_at) - julianday(started_at)) * 24 * 60, 2) as duration_minutes
1220              FROM cron_job_logs
1221              WHERE job_name = ?
1222              ORDER BY started_at DESC
1223              LIMIT ?
1224          """
1225          conn = get_db_connection()
1226          return pd.read_sql_query(query, conn, params=(task_name, limit))
1227      else:
1228          query = """
1229              SELECT
1230                  id,
1231                  job_name,
1232                  started_at,
1233                  finished_at,
1234                  status,
1235                  summary,
1236                  items_processed,
1237                  items_failed,
1238                  error_message,
1239                  ROUND((julianday(finished_at) - julianday(started_at)) * 24 * 60, 2) as duration_minutes
1240              FROM cron_job_logs
1241              ORDER BY started_at DESC
1242              LIMIT ?
1243          """
1244          conn = get_db_connection()
1245          return pd.read_sql_query(query, conn, params=(limit,))
1246  
1247  
1248  @st.cache_data(ttl=30)
1249  def get_recent_cron_failures(days: int = 7, limit: int = 10) -> pd.DataFrame:
1250      """Get recent cron job failures."""
1251      query = """
1252          SELECT
1253              job_name,
1254              COUNT(*) as failure_count,
1255              MAX(started_at) as last_failure
1256          FROM cron_job_logs
1257          WHERE status = 'failed'
1258            AND started_at >= datetime('now', '-{} days')
1259          GROUP BY job_name
1260          ORDER BY failure_count DESC
1261          LIMIT ?
1262      """.format(
1263          days
1264      )
1265      conn = get_db_connection()
1266      return pd.read_sql_query(query, conn, params=(limit,))
1267  
1268  
1269  def update_cron_job_enabled(task_key: str, enabled: bool) -> bool:
1270      """
1271      Enable or disable a cron job.
1272  
1273      Args:
1274          task_key: The task_key of the job to update
1275          enabled: True to enable, False to disable
1276  
1277      Returns:
1278          bool: True if successful, False otherwise
1279      """
1280      try:
1281          conn = get_writable_db_connection()
1282          conn.execute(
1283              "UPDATE cron_jobs SET enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE task_key = ?",
1284              (1 if enabled else 0, task_key),
1285          )
1286          conn.commit()
1287          # Clear cache to reflect changes
1288          get_cron_jobs.clear()
1289          get_cron_job_stats.clear()
1290          return True
1291      except Exception as e:
1292          logger.error(f"Failed to update job: {e}", exc_info=True)
1293          st.error(f"Failed to update job: {e}")
1294          return False
1295  
1296  
1297  def update_cron_job_interval(
1298      task_key: str, interval_value: int, interval_unit: str
1299  ) -> bool:
1300      """
1301      Update a cron job's interval.
1302  
1303      Args:
1304          task_key: The task_key of the job to update
1305          interval_value: The interval value (number)
1306          interval_unit: The interval unit (minutes, hours, days, weeks)
1307  
1308      Returns:
1309          bool: True if successful, False otherwise
1310      """
1311      try:
1312          conn = get_writable_db_connection()
1313          conn.execute(
1314              """
1315              UPDATE cron_jobs
1316              SET interval_value = ?, interval_unit = ?, updated_at = CURRENT_TIMESTAMP
1317              WHERE task_key = ?
1318          """,
1319              (interval_value, interval_unit, task_key),
1320          )
1321          conn.commit()
1322          # Clear cache
1323          get_cron_jobs.clear()
1324          return True
1325      except Exception as e:
1326          logger.error(f"Failed to update interval: {e}", exc_info=True)
1327          st.error(f"Failed to update interval: {e}")
1328          return False
1329  
1330  
1331  @st.cache_data(ttl=config.CACHE_TTL)
1332  def get_cron_daily_history(days: int = 7) -> pd.DataFrame:
1333      """Get daily success/failure counts for cron jobs over the last N days."""
1334      # Try cache first for 7-day request (most common)
1335      if days == 7:
1336          cached = get_from_cache('chart_cron_daily_history_7d')
1337          if cached:
1338              return pd.DataFrame(cached)
1339  
1340      query = """
1341          SELECT
1342              DATE(started_at) as date,
1343              SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successful,
1344              SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,
1345              COUNT(*) as total
1346          FROM cron_job_logs
1347          WHERE started_at >= datetime('now', '-{} days')
1348          GROUP BY DATE(started_at)
1349          ORDER BY date
1350      """.format(
1351          days
1352      )
1353      conn = get_db_connection()
1354      return pd.read_sql_query(query, conn)
1355  
1356  
1357  @st.cache_data(ttl=30)  # Cache for 30 seconds (frequent updates for timeline)
1358  def get_cron_job_timeline(hours: int = 24, limit: int = 200) -> pd.DataFrame:
1359      """
1360      Get cron job execution timeline for Gantt chart.
1361      Returns recent job executions with start/end times for the last N hours.
1362  
1363      Limited to most recent executions to avoid overwhelming the browser
1364      with thousands of Plotly shapes (performance optimization).
1365      """
1366      # Try cache first for 24-hour/200-limit request (most common)
1367      if hours == 24 and limit == 200:
1368          cached = get_from_cache('chart_cron_timeline_24h')
1369          if cached:
1370              return pd.DataFrame(cached)
1371  
1372      query = """
1373          SELECT
1374              job_name,
1375              started_at,
1376              finished_at,
1377              status,
1378              ROUND((julianday(COALESCE(finished_at, datetime('now'))) - julianday(started_at)) * 24 * 60, 2) as duration_minutes
1379          FROM cron_job_logs
1380          WHERE started_at >= datetime('now', '-{} hours')
1381          ORDER BY started_at DESC
1382          LIMIT {}
1383      """.format(
1384          hours, limit
1385      )
1386      conn = get_db_connection()
1387      df = pd.read_sql_query(query, conn)
1388      # Re-sort chronologically for timeline display (query sorted DESC for LIMIT)
1389      return df.sort_values('started_at') if not df.empty else df
1390  
1391  
1392  @st.cache_data(ttl=30)
1393  def get_last_run_status_all_jobs() -> pd.DataFrame:
1394      """
1395      Get the last run status for all jobs in a single batched query.
1396      Returns DataFrame with columns: job_name, status, started_at
1397  
1398      This replaces N individual queries with one query, dramatically improving performance.
1399      """
1400      query = """
1401          WITH latest_logs AS (
1402              SELECT
1403                  job_name,
1404                  status,
1405                  started_at,
1406                  ROW_NUMBER() OVER (PARTITION BY job_name ORDER BY started_at DESC) as rn
1407              FROM cron_job_logs
1408          )
1409          SELECT
1410              job_name,
1411              status,
1412              started_at
1413          FROM latest_logs
1414          WHERE rn = 1
1415      """
1416      conn = get_db_connection()
1417      return pd.read_sql_query(query, conn)
1418  
1419  
1420  @st.cache_data(ttl=30)  # Cache for 30 seconds (frequent updates)
1421  def get_system_metrics_timeline(hours: int = 24) -> pd.DataFrame:
1422      """
1423      Get system metrics timeline for overlay on Gantt chart.
1424      Returns CPU, disk I/O, and memory usage for the last N hours.
1425      """
1426      # Try cache first for 24-hour request (most common)
1427      if hours == 24:
1428          cached = get_from_cache('chart_system_metrics_24h')
1429          if cached:
1430              return pd.DataFrame(cached)
1431  
1432      query = """
1433          SELECT
1434              recorded_at,
1435              cpu_percent,
1436              disk_read_mb,
1437              disk_write_mb,
1438              memory_percent
1439          FROM system_metrics
1440          WHERE recorded_at >= datetime('now', '-{} hours')
1441          ORDER BY recorded_at
1442      """.format(
1443          hours
1444      )
1445      conn = get_db_connection()
1446      return pd.read_sql_query(query, conn)
1447  
1448  
1449  # ===== Conversation Management =====
1450  
1451  
1452  def update_autoresponder_status(outreach_id: int, enabled: bool) -> bool:
1453      """
1454      Enable or disable autoresponder for a specific conversation thread.
1455  
1456      Args:
1457          outreach_id: The site_id of the conversation (was outreach_id, now site_id)
1458          enabled: True to enable autoresponder, False to disable
1459  
1460      Returns:
1461          bool: True if successful, False otherwise
1462      """
1463      try:
1464          conn = get_writable_db_connection()
1465          conn.execute(
1466              """
1467              UPDATE messages
1468              SET autoresponder_enabled = ?
1469              WHERE site_id = ?
1470          """,
1471              (1 if enabled else 0, outreach_id),
1472          )
1473          conn.commit()
1474          # Clear cache to reflect changes
1475          get_threaded_conversations.clear()
1476          return True
1477      except Exception as e:
1478          logger.error(f"Failed to update autoresponder status: {e}", exc_info=True)
1479          st.error(f"Failed to update autoresponder status: {e}")
1480          return False
1481  
1482  
1483  def insert_manual_reply(outreach_id: int, message_body: str, channel: str) -> bool:
1484      """
1485      Insert a manual reply into the messages table.
1486  
1487      Args:
1488          outreach_id: The site_id to reply to (was outreach_id, now site_id)
1489          message_body: The message text
1490          channel: The channel (sms, email, form, x, linkedin)
1491  
1492      Returns:
1493          bool: True if successful, False otherwise
1494      """
1495      try:
1496          conn = get_writable_db_connection()
1497          conn.execute(
1498              """
1499              INSERT INTO messages (
1500                  site_id,
1501                  direction,
1502                  contact_method,
1503                  message_body,
1504                  created_at,
1505                  autoresponder_enabled
1506              ) VALUES (?, 'outbound', ?, ?, CURRENT_TIMESTAMP, 0)
1507          """,
1508              (outreach_id, channel, message_body),
1509          )
1510          conn.commit()
1511          # Clear cache to reflect changes
1512          get_threaded_conversations.clear()
1513          get_conversation_stats.clear()
1514          return True
1515      except Exception as e:
1516          logger.error(f"Failed to insert manual reply: {e}", exc_info=True)
1517          st.error(f"Failed to insert manual reply: {e}")
1518          return False
1519  
1520  
1521  def trigger_cron_job(task_key: str, handler_type: str, handler_value: str) -> dict:
1522      """
1523      Manually trigger a cron job to run immediately.
1524  
1525      Args:
1526          task_key: The task_key of the job to run
1527          handler_type: The handler type (function or command)
1528          handler_value: The handler value (function name or command)
1529  
1530      Returns:
1531          dict: Result with success status and message
1532      """
1533      import subprocess
1534      from pathlib import Path
1535  
1536      try:
1537          # Get project root (dashboard is in dashboard/, project root is parent)
1538          project_root = Path(__file__).parent.parent.parent
1539  
1540          # Use the run-cron-job.js script
1541          script_path = project_root / 'scripts' / 'run-cron-job.js'
1542  
1543          # Execute the script in the background
1544          process = subprocess.Popen(
1545              ['node', str(script_path), task_key],
1546              cwd=str(project_root),
1547              stdout=subprocess.PIPE,
1548              stderr=subprocess.PIPE,
1549              text=True
1550          )
1551  
1552          # Don't wait for completion - let it run in background
1553          # The job will log its own results to the database
1554  
1555          return {
1556              'success': True,
1557              'message': f'Job triggered successfully (running in background)',
1558              'pid': process.pid
1559          }
1560  
1561      except Exception as e:
1562          logger.error(f"Failed to trigger cron job: {e}", exc_info=True)
1563          return {
1564              'success': False,
1565              'error': str(e)
1566          }
1567  
1568  
1569  # ===== Dashboard Cache Management =====
1570  
1571  
1572  def get_from_cache(key: str):
1573      """
1574      Get a value from the dashboard cache.
1575  
1576      Args:
1577          key: The cache key
1578  
1579      Returns:
1580          dict or None: The cached value (parsed from JSON) or None if expired/missing
1581      """
1582      try:
1583          conn = get_db_connection()
1584          result = conn.execute(
1585              """
1586              SELECT cache_value, expires_at
1587              FROM dashboard_cache
1588              WHERE cache_key = ? AND expires_at > datetime('now')
1589              """,
1590              (key,)
1591          ).fetchone()
1592  
1593          if result:
1594              import json
1595              return json.loads(result["cache_value"])
1596          return None
1597      except Exception as e:
1598          logger.error(f"Failed to get cache value for {key}: {e}", exc_info=True)
1599          return None
1600  
1601  
1602  def set_cache(key: str, value, expiration_minutes: int = 15) -> bool:
1603      """
1604      Set a value in the dashboard cache.
1605  
1606      Args:
1607          key: The cache key
1608          value: The value to cache (will be JSON serialized)
1609          expiration_minutes: How long until cache expires (default: 15 minutes)
1610  
1611      Returns:
1612          bool: True if successful, False otherwise
1613      """
1614      try:
1615          import json
1616          from datetime import datetime, timedelta
1617  
1618          conn = get_writable_db_connection()
1619          expires_at = (datetime.now() + timedelta(minutes=expiration_minutes)).isoformat()
1620  
1621          conn.execute(
1622              """
1623              INSERT INTO dashboard_cache (cache_key, cache_value, expires_at, updated_at)
1624              VALUES (?, ?, ?, CURRENT_TIMESTAMP)
1625              ON CONFLICT(cache_key) DO UPDATE SET
1626                  cache_value = excluded.cache_value,
1627                  expires_at = excluded.expires_at,
1628                  updated_at = CURRENT_TIMESTAMP
1629              """,
1630              (key, json.dumps(value), expires_at),
1631          )
1632          conn.commit()
1633          return True
1634      except Exception as e:
1635          logger.error(f"Failed to set cache value for {key}: {e}", exc_info=True)
1636          return False
1637  
1638  
1639  def clear_all_cache() -> bool:
1640      """
1641      Clear all dashboard cache entries.
1642      Used for manual refresh functionality.
1643  
1644      Returns:
1645          bool: True if successful, False otherwise
1646      """
1647      try:
1648          conn = get_writable_db_connection()
1649          conn.execute("DELETE FROM dashboard_cache")
1650          conn.commit()
1651          logger.info("Cleared all dashboard cache entries")
1652          return True
1653      except Exception as e:
1654          logger.error(f"Failed to clear cache: {e}", exc_info=True)
1655          return False
1656  
1657  
1658  def get_cache_stats() -> dict:
1659      """
1660      Get statistics about the dashboard cache.
1661  
1662      Returns:
1663          dict: Cache statistics (total entries, fresh entries, expired entries)
1664      """
1665      try:
1666          conn = get_db_connection()
1667  
1668          stats_query = """
1669              SELECT
1670                  COUNT(*) as total_entries,
1671                  SUM(CASE WHEN expires_at > datetime('now') THEN 1 ELSE 0 END) as fresh_entries,
1672                  SUM(CASE WHEN expires_at <= datetime('now') THEN 1 ELSE 0 END) as expired_entries
1673              FROM dashboard_cache
1674          """
1675          result = conn.execute(stats_query).fetchone()
1676  
1677          return {
1678              'total_entries': result['total_entries'] or 0,
1679              'fresh_entries': result['fresh_entries'] or 0,
1680              'expired_entries': result['expired_entries'] or 0,
1681          }
1682      except Exception as e:
1683          logger.error(f"Failed to get cache stats: {e}", exc_info=True)
1684          return {'total_entries': 0, 'fresh_entries': 0, 'expired_entries': 0}
1685  
1686  
1687  # ===== Status Tree (error breakdown) =====
1688  
1689  
1690  def get_status_tree() -> list:
1691      """
1692      Get the pipeline status tree (sites) from dashboard_cache.
1693      Falls back to empty list if cache is missing/expired.
1694      Returns a list of dicts with status, total, delta_24h, delta_1h, children.
1695      """
1696      cached = get_cache('status_tree')
1697      if cached is not None:
1698          return cached
1699      return []
1700  
1701  
1702  def get_outreach_tree() -> list:
1703      """
1704      Get the outreach status tree from dashboard_cache.
1705      Falls back to empty list if cache is missing/expired.
1706      """
1707      cached = get_cache('outreach_tree')
1708      if cached is not None:
1709          return cached
1710      return []
1711  
1712  
1713  def get_error_proposals(status: str = 'pending') -> list:
1714      """
1715      Get error pattern proposals from the database.
1716  
1717      Args:
1718          status: Filter by status ('pending', 'approved', 'rejected')
1719  
1720      Returns:
1721          list of dicts with proposal fields
1722      """
1723      try:
1724          import json
1725          conn = get_db_connection()
1726          rows = conn.execute(
1727              """
1728              SELECT id, pattern, label, group_name, context,
1729                     example_errors, occurrence_count, status, created_at
1730              FROM error_pattern_proposals
1731              WHERE status = ?
1732              ORDER BY occurrence_count DESC, created_at DESC
1733              """,
1734              (status,)
1735          ).fetchall()
1736  
1737          result = []
1738          for row in rows:
1739              r = dict(row)
1740              if r.get('example_errors'):
1741                  try:
1742                      r['example_errors'] = json.loads(r['example_errors'])
1743                  except Exception:
1744                      r['example_errors'] = []
1745              else:
1746                  r['example_errors'] = []
1747              result.append(r)
1748          return result
1749      except Exception as e:
1750          logger.error(f"Failed to get error proposals: {e}", exc_info=True)
1751          return []
1752  
1753  
1754  def review_error_proposal(proposal_id: int, decision: str, reviewer: str = 'dashboard') -> bool:
1755      """
1756      Approve or reject an error pattern proposal.
1757  
1758      Args:
1759          proposal_id: The proposal ID
1760          decision: 'approved' or 'rejected'
1761          reviewer: Who reviewed it
1762  
1763      Returns:
1764          bool: True if successful
1765      """
1766      if decision not in ('approved', 'rejected'):
1767          return False
1768      try:
1769          conn = get_writable_db_connection()
1770          conn.execute(
1771              """
1772              UPDATE error_pattern_proposals
1773              SET status = ?, reviewed_at = CURRENT_TIMESTAMP, reviewed_by = ?
1774              WHERE id = ?
1775              """,
1776              (decision, reviewer, proposal_id)
1777          )
1778          conn.commit()
1779          return True
1780      except Exception as e:
1781          logger.error(f"Failed to review error proposal {proposal_id}: {e}", exc_info=True)
1782          return False
1783  
1784  
1785  # ===== Config Management =====
1786  
1787  
1788  def get_config_value(key: str, default: Optional[str] = None) -> Optional[str]:
1789      """
1790      Get a configuration value from the config table.
1791  
1792      Args:
1793          key: The configuration key
1794          default: Default value if key doesn't exist
1795  
1796      Returns:
1797          str: The configuration value or default
1798      """
1799      try:
1800          conn = get_db_connection()
1801          result = conn.execute(
1802              "SELECT value FROM config WHERE key = ?",
1803              (key,)
1804          ).fetchone()
1805          return result["value"] if result else default
1806      except Exception as e:
1807          logger.error(f"Failed to get config value for {key}: {e}", exc_info=True)
1808          return default
1809  
1810  
1811  def set_config_value(key: str, value: str, description: Optional[str] = None) -> bool:
1812      """
1813      Set a configuration value in the config table.
1814  
1815      Args:
1816          key: The configuration key
1817          value: The value to set
1818          description: Optional description of the config
1819  
1820      Returns:
1821          bool: True if successful, False otherwise
1822      """
1823      try:
1824          conn = get_writable_db_connection()
1825          # Use INSERT OR REPLACE to handle both new and existing keys
1826          if description:
1827              conn.execute(
1828                  """
1829                  INSERT INTO config (key, value, description)
1830                  VALUES (?, ?, ?)
1831                  ON CONFLICT(key) DO UPDATE SET
1832                      value = excluded.value,
1833                      description = COALESCE(excluded.description, description)
1834                  """,
1835                  (key, value, description),
1836              )
1837          else:
1838              conn.execute(
1839                  """
1840                  INSERT INTO config (key, value)
1841                  VALUES (?, ?)
1842                  ON CONFLICT(key) DO UPDATE SET
1843                      value = excluded.value
1844                  """,
1845                  (key, value),
1846              )
1847          conn.commit()
1848          return True
1849      except Exception as e:
1850          logger.error(f"Failed to set config value for {key}: {e}", exc_info=True)
1851          return False