database.py
1 """ 2 Database Query Layer 3 4 Provides read-only access to the SQLite database with caching. 5 All functions return pandas DataFrames for easy visualization. 6 """ 7 8 import sqlite3 9 import logging 10 from pathlib import Path 11 from typing import Optional 12 import pandas as pd 13 import streamlit as st 14 from dashboard import config 15 16 logger = logging.getLogger(__name__) 17 18 19 @st.cache_resource 20 def get_db_connection() -> sqlite3.Connection: 21 """ 22 Create read-only SQLite connection with connection pooling. 23 24 Returns: 25 sqlite3.Connection: Read-only database connection 26 """ 27 conn = sqlite3.connect( 28 config.DATABASE_PATH, 29 check_same_thread=False, 30 isolation_level=None, # Autocommit mode 31 timeout=10.0, 32 ) 33 conn.execute("PRAGMA query_only = ON;") # Enforce read-only 34 conn.row_factory = sqlite3.Row # Dict-like access 35 return conn 36 37 38 # ===== Pipeline Health Queries ===== 39 40 41 @st.cache_data(ttl=config.CACHE_TTL) 42 def get_pipeline_funnel() -> pd.DataFrame: 43 """ 44 Get count of sites at each pipeline stage. 45 46 Excludes 'ignore' and 'failing' statuses as they represent sites 47 outside the active pipeline (permanently filtered or needing manual intervention). 48 """ 49 query = """ 50 SELECT status, COUNT(*) as count 51 FROM sites 52 WHERE status NOT IN ('ignore', 'failing') 53 GROUP BY status 54 ORDER BY 55 CASE status 56 WHEN 'found' THEN 1 57 WHEN 'assets_captured' THEN 2 58 WHEN 'scored' THEN 3 59 WHEN 'rescored' THEN 4 60 WHEN 'enriched' THEN 5 61 WHEN 'proposals_drafted' THEN 6 62 WHEN 'outreach_sent' THEN 7 63 WHEN 'high_score' THEN 8 64 END 65 """ 66 conn = get_db_connection() 67 return pd.read_sql_query(query, conn) 68 69 70 @st.cache_data(ttl=config.CACHE_TTL) 71 def get_excluded_sites_count() -> dict: 72 """ 73 Get count of sites excluded from the active pipeline. 74 75 Returns: 76 dict with keys 'ignore' and 'failing' containing counts 77 """ 78 query = """ 79 SELECT 80 SUM(CASE WHEN status = 'ignore' THEN 1 ELSE 0 END) as ignored, 81 SUM(CASE WHEN status = 'failing' THEN 1 ELSE 0 END) as failing 82 FROM sites 83 """ 84 conn = get_db_connection() 85 result = pd.read_sql_query(query, conn) 86 return { 87 'ignored': int(result['ignored'].iloc[0]) if not result.empty else 0, 88 'failing': int(result['failing'].iloc[0]) if not result.empty else 0 89 } 90 91 92 @st.cache_data(ttl=config.CACHE_TTL) 93 def get_funnel_breakdown() -> dict: 94 """ 95 Get hierarchical funnel breakdown for treemap/sunburst visualization. 96 97 Returns a dict with total, ignored, failing, active counts and 98 sub-breakdowns for each category, plus outreach delivery by channel. 99 """ 100 conn = get_db_connection() 101 102 total = pd.read_sql_query("SELECT COUNT(*) as c FROM sites", conn)['c'].iloc[0] 103 104 # Ignore breakdown (categorized by error_message patterns) 105 ignore_df = pd.read_sql_query(""" 106 SELECT 107 CASE 108 WHEN error_message LIKE 'Cross-border duplicate%' THEN 'Cross-border duplicates' 109 WHEN error_message LIKE 'Duplicate domain%' THEN 'Duplicate keyword domain' 110 WHEN error_message = 'Ignored: Social media platform' THEN 'Social media' 111 WHEN error_message = 'Ignored: Business directory' THEN 'Business directories' 112 ELSE 'Other' 113 END as category, 114 COUNT(*) as count 115 FROM sites 116 WHERE status = 'ignore' 117 GROUP BY 1 118 ORDER BY 2 DESC 119 """, conn) 120 121 # Failing breakdown (categorized by error_message patterns) 122 fail_df = pd.read_sql_query(""" 123 SELECT 124 CASE 125 WHEN error_message LIKE '%userDataDir%' OR error_message LIKE '%browserType.launch%' 126 THEN 'Browser launch error' 127 WHEN error_message LIKE '%screenshot_path is NULL%' 128 THEN 'NULL screenshot constraint' 129 WHEN error_message LIKE '%status code 401%' OR error_message LIKE '%status code 400%' 130 THEN 'API auth errors' 131 WHEN error_message LIKE '%HTTP 403%' 132 THEN 'HTTP 403 blocked' 133 WHEN error_message LIKE '%EACCES%' 134 THEN 'Permission denied' 135 WHEN error_message LIKE '%page.content%' OR error_message LIKE '%navigating%' 136 THEN 'Page navigation error' 137 ELSE 'Other' 138 END as category, 139 COUNT(*) as count 140 FROM sites 141 WHERE status = 'failing' 142 GROUP BY 1 143 ORDER BY 2 DESC 144 """, conn) 145 146 # Active pipeline stage breakdown 147 active_df = pd.read_sql_query(""" 148 SELECT status, COUNT(*) as count 149 FROM sites 150 WHERE status NOT IN ('ignore', 'failing') 151 GROUP BY status 152 ORDER BY 153 CASE status 154 WHEN 'found' THEN 1 155 WHEN 'assets_captured' THEN 2 156 WHEN 'scored' THEN 3 157 WHEN 'rescored' THEN 4 158 WHEN 'enriched' THEN 5 159 WHEN 'proposals_drafted' THEN 6 160 WHEN 'outreach_sent' THEN 7 161 ELSE 8 162 END 163 """, conn) 164 165 # Pending outreaches by channel 166 outreach_df = pd.read_sql_query(""" 167 SELECT contact_method as channel, COUNT(*) as count 168 FROM messages 169 WHERE direction = 'outbound' 170 AND approval_status = 'pending' 171 GROUP BY contact_method 172 ORDER BY count DESC 173 """, conn) 174 175 # GDPR blocked count 176 gdpr_row = pd.read_sql_query(""" 177 SELECT COUNT(*) as count FROM messages WHERE direction = 'outbound' AND approval_status = 'gdpr_blocked' 178 """, conn) 179 gdpr_blocked = int(gdpr_row['count'].iloc[0]) if not gdpr_row.empty else 0 180 181 return { 182 'total': int(total), 183 'ignored': int(ignore_df['count'].sum()) if not ignore_df.empty else 0, 184 'ignored_breakdown': ignore_df.to_dict('records'), 185 'failing': int(fail_df['count'].sum()) if not fail_df.empty else 0, 186 'failing_breakdown': fail_df.to_dict('records'), 187 'active': int(active_df['count'].sum()) if not active_df.empty else 0, 188 'active_breakdown': active_df.to_dict('records'), 189 'outreach_breakdown': outreach_df.to_dict('records'), 190 'outreach_gdpr_blocked': gdpr_blocked, 191 } 192 193 194 @st.cache_data(ttl=config.CACHE_TTL) 195 def get_total_active_errors() -> int: 196 """Get total count of sites with active errors (excluding ignored sites).""" 197 query = """ 198 SELECT COUNT(*) as total 199 FROM sites 200 WHERE error_message IS NOT NULL 201 AND status NOT IN ('ignore', 'failing') 202 """ 203 conn = get_db_connection() 204 result = pd.read_sql_query(query, conn) 205 return int(result["total"].iloc[0]) if not result.empty else 0 206 207 208 @st.cache_data(ttl=config.CACHE_TTL) 209 def get_error_breakdown() -> pd.DataFrame: 210 """Get most common errors by count (excluding ignored sites).""" 211 query = """ 212 SELECT 213 error_message, 214 status as stage, 215 COUNT(*) as count 216 FROM sites 217 WHERE error_message IS NOT NULL 218 AND status != 'ignore' 219 GROUP BY error_message, status 220 ORDER BY count DESC 221 LIMIT 20 222 """ 223 conn = get_db_connection() 224 return pd.read_sql_query(query, conn) 225 226 227 @st.cache_data(ttl=config.CACHE_TTL) 228 def get_stuck_sites_by_error() -> pd.DataFrame: 229 """Get sites stuck at same stage with errors.""" 230 query = """ 231 SELECT 232 id, 233 domain, 234 status as stage, 235 error_message, 236 updated_at, 237 julianday('now') - julianday(updated_at) as days_stuck 238 FROM sites 239 WHERE error_message IS NOT NULL 240 AND julianday('now') - julianday(updated_at) > 1 241 ORDER BY days_stuck DESC 242 LIMIT 100 243 """ 244 conn = get_db_connection() 245 return pd.read_sql_query(query, conn) 246 247 248 @st.cache_data(ttl=config.CACHE_TTL) 249 def get_failing_sites() -> pd.DataFrame: 250 """Get sites marked as 'failing' after exceeding retry limits.""" 251 query = """ 252 SELECT 253 id, 254 domain, 255 status, 256 error_message, 257 retry_count, 258 last_retry_at, 259 updated_at 260 FROM sites 261 WHERE status = 'failing' 262 ORDER BY updated_at DESC 263 LIMIT 100 264 """ 265 conn = get_db_connection() 266 return pd.read_sql_query(query, conn) 267 268 269 @st.cache_data(ttl=config.CACHE_TTL) 270 def get_daily_throughput(days: int = 30) -> pd.DataFrame: 271 """Get sites processed per day.""" 272 query = """ 273 SELECT 274 DATE(created_at) as date, 275 COUNT(*) as sites_added 276 FROM sites 277 WHERE created_at >= datetime('now', '-{} days') 278 GROUP BY DATE(created_at) 279 ORDER BY date 280 """.format( 281 days 282 ) 283 conn = get_db_connection() 284 return pd.read_sql_query(query, conn) 285 286 287 @st.cache_data(ttl=config.CACHE_TTL) 288 def get_hourly_throughput(hours: int = 48) -> pd.DataFrame: 289 """Get sites processed per hour for the last N hours.""" 290 # Try cache first for 48-hour request (most common) 291 if hours == 48: 292 cached = get_from_cache('chart_hourly_throughput_48h') 293 if cached: 294 return pd.DataFrame(cached) 295 296 query = """ 297 SELECT 298 strftime('%Y-%m-%d %H:00:00', created_at) as hour, 299 COUNT(*) as sites_added 300 FROM sites 301 WHERE created_at >= datetime('now', '-{} hours') 302 GROUP BY strftime('%Y-%m-%d %H:00:00', created_at) 303 ORDER BY hour 304 """.format( 305 hours 306 ) 307 conn = get_db_connection() 308 return pd.read_sql_query(query, conn) 309 310 311 @st.cache_data(ttl=config.CACHE_TTL) 312 def get_hourly_status_breakdown(hours: int = 48) -> pd.DataFrame: 313 """ 314 Get sites processed per hour broken down by status for the last N hours. 315 Returns a pivot table suitable for stacked bar charts. 316 317 Filters out 'failing' and 'ignore' statuses as they represent sites 318 out of the pipeline (manual intervention needed or permanently excluded). 319 """ 320 # Try cache first for 48-hour request (most common) 321 if hours == 48: 322 cached = get_from_cache('chart_hourly_status_breakdown_48h') 323 if cached: 324 df = pd.DataFrame(cached) 325 if not df.empty: 326 # Pivot the cached data 327 pivot_df = df.pivot_table( 328 index='hour', 329 columns='status', 330 values='count', 331 fill_value=0 332 ).reset_index() 333 return pivot_df 334 return pd.DataFrame() 335 336 query = """ 337 SELECT 338 strftime('%Y-%m-%d %H:00:00', created_at) as hour, 339 status, 340 COUNT(*) as count 341 FROM site_status 342 WHERE created_at >= datetime('now', '-{} hours') 343 AND status NOT IN ('failing', 'ignore') 344 GROUP BY strftime('%Y-%m-%d %H:00:00', created_at), status 345 ORDER BY hour, status 346 """.format( 347 hours 348 ) 349 conn = get_db_connection() 350 df = pd.read_sql_query(query, conn) 351 352 # Pivot the data to make it suitable for stacked bar charts 353 # Each row is an hour, each column is a status 354 if not df.empty: 355 pivot_df = df.pivot_table( 356 index='hour', 357 columns='status', 358 values='count', 359 fill_value=0 360 ).reset_index() 361 return pivot_df 362 else: 363 return pd.DataFrame() 364 365 366 # ===== Outreach Queries ===== 367 368 369 @st.cache_data(ttl=config.CACHE_TTL) 370 def get_response_rates() -> pd.DataFrame: 371 """Get response rate by channel.""" 372 query = """ 373 SELECT 374 o.contact_method as channel, 375 COUNT(DISTINCT o.id) as total_sent, 376 COUNT(DISTINCT CASE WHEN r.direction = 'inbound' THEN r.site_id END) as responses, 377 ROUND(100.0 * COUNT(DISTINCT CASE WHEN r.direction = 'inbound' THEN r.site_id END) / COUNT(DISTINCT o.id), 2) as response_rate 378 FROM messages o 379 LEFT JOIN messages r ON r.site_id = o.site_id AND r.direction = 'inbound' 380 WHERE o.direction = 'outbound' 381 AND o.delivery_status IN ('sent', 'delivered') 382 GROUP BY o.contact_method 383 ORDER BY response_rate DESC 384 """ 385 conn = get_db_connection() 386 return pd.read_sql_query(query, conn) 387 388 389 @st.cache_data(ttl=config.CACHE_TTL) 390 def get_sales_data() -> pd.DataFrame: 391 """Get sales tracking data.""" 392 query = """ 393 SELECT 394 o.contact_method as channel, 395 COUNT(CASE WHEN s.resulted_in_sale = 1 THEN 1 END) as sales_count, 396 COALESCE(SUM(s.sale_amount), 0) as total_revenue, 397 COUNT(*) as total_outreaches, 398 ROUND(100.0 * COUNT(CASE WHEN s.resulted_in_sale = 1 THEN 1 END) / COUNT(*), 2) as conversion_rate 399 FROM messages o 400 LEFT JOIN sites s ON s.id = o.site_id 401 WHERE o.direction = 'outbound' 402 AND o.delivery_status IN ('sent', 'delivered') 403 GROUP BY o.contact_method 404 """ 405 conn = get_db_connection() 406 return pd.read_sql_query(query, conn) 407 408 409 @st.cache_data(ttl=config.CACHE_TTL) 410 def get_outreach_funnel() -> pd.DataFrame: 411 """Get delivery funnel by channel.""" 412 query = """ 413 SELECT 414 o.contact_method as channel, 415 COUNT(*) as total, 416 SUM(CASE WHEN o.approval_status = 'pending' THEN 1 ELSE 0 END) as pending, 417 SUM(CASE WHEN o.delivery_status = 'sent' THEN 1 ELSE 0 END) as sent, 418 SUM(CASE WHEN o.delivery_status = 'delivered' THEN 1 ELSE 0 END) as delivered, 419 SUM(CASE WHEN o.delivery_status = 'failed' THEN 1 ELSE 0 END) as failed, 420 SUM(CASE WHEN o.delivery_status = 'bounced' THEN 1 ELSE 0 END) as bounced, 421 ROUND(100.0 * SUM(CASE WHEN o.delivery_status = 'delivered' THEN 1 ELSE 0 END) / COUNT(*), 2) as delivery_rate 422 FROM messages o 423 WHERE o.direction = 'outbound' 424 GROUP BY o.contact_method 425 ORDER BY total DESC 426 """ 427 conn = get_db_connection() 428 return pd.read_sql_query(query, conn) 429 430 431 # ===== LLM Usage & Cost Queries ===== 432 433 434 @st.cache_data(ttl=config.CACHE_TTL) 435 def get_llm_usage_by_stage() -> pd.DataFrame: 436 """Get LLM token usage and costs by pipeline stage.""" 437 query = """ 438 SELECT 439 stage, 440 SUM(prompt_tokens) as prompt_tokens, 441 SUM(completion_tokens) as completion_tokens, 442 SUM(total_tokens) as total_tokens, 443 SUM(estimated_cost) as total_cost, 444 COUNT(*) as request_count, 445 AVG(estimated_cost) as avg_cost_per_request 446 FROM llm_usage 447 GROUP BY stage 448 ORDER BY total_cost DESC 449 """ 450 conn = get_db_connection() 451 return pd.read_sql_query(query, conn) 452 453 454 @st.cache_data(ttl=config.CACHE_TTL) 455 def get_llm_usage_by_provider() -> pd.DataFrame: 456 """Get LLM costs by provider.""" 457 query = """ 458 SELECT 459 provider, 460 model, 461 SUM(total_tokens) as total_tokens, 462 SUM(estimated_cost) as total_cost, 463 COUNT(*) as request_count 464 FROM llm_usage 465 GROUP BY provider, model 466 ORDER BY total_cost DESC 467 """ 468 conn = get_db_connection() 469 return pd.read_sql_query(query, conn) 470 471 472 @st.cache_data(ttl=config.CACHE_TTL) 473 def get_llm_daily_costs(days: int = 30) -> pd.DataFrame: 474 """Get daily LLM costs over time.""" 475 # Try cache first for 30-day request (most common) 476 if days == 30: 477 cached = get_from_cache('chart_llm_daily_costs_30d') 478 if cached: 479 return pd.DataFrame(cached) 480 481 query = """ 482 SELECT 483 DATE(created_at) as date, 484 SUM(estimated_cost) as daily_cost, 485 SUM(total_tokens) as daily_tokens, 486 COUNT(*) as request_count 487 FROM llm_usage 488 WHERE created_at >= datetime('now', '-{} days') 489 GROUP BY DATE(created_at) 490 ORDER BY date 491 """.format( 492 days 493 ) 494 conn = get_db_connection() 495 return pd.read_sql_query(query, conn) 496 497 498 @st.cache_data(ttl=config.CACHE_TTL) 499 def get_llm_cost_per_outreach() -> pd.DataFrame: 500 """Calculate average LLM cost per site that resulted in outreach.""" 501 query = """ 502 SELECT 503 COUNT(DISTINCT s.id) as sites_with_outreach, 504 COALESCE(SUM(l.estimated_cost), 0) as total_llm_cost, 505 CASE 506 WHEN COUNT(DISTINCT s.id) > 0 THEN COALESCE(SUM(l.estimated_cost), 0) / COUNT(DISTINCT s.id) 507 ELSE 0 508 END as avg_cost_per_site 509 FROM sites s 510 INNER JOIN messages o ON o.site_id = s.id AND o.direction = 'outbound' 511 LEFT JOIN llm_usage l ON l.site_id = s.id 512 WHERE s.status = 'outreach_sent' 513 """ 514 conn = get_db_connection() 515 return pd.read_sql_query(query, conn) 516 517 518 @st.cache_data(ttl=config.CACHE_TTL) 519 def get_llm_cost_by_stage_and_date(days: int = 30) -> pd.DataFrame: 520 """Get LLM costs broken down by stage over time.""" 521 # Try cache first for 30-day request (most common) 522 if days == 30: 523 cached = get_from_cache('chart_llm_cost_by_stage_30d') 524 if cached: 525 return pd.DataFrame(cached) 526 527 query = """ 528 SELECT 529 DATE(created_at) as date, 530 stage, 531 SUM(estimated_cost) as cost 532 FROM llm_usage 533 WHERE created_at >= datetime('now', '-{} days') 534 GROUP BY DATE(created_at), stage 535 ORDER BY date, stage 536 """.format( 537 days 538 ) 539 conn = get_db_connection() 540 return pd.read_sql_query(query, conn) 541 542 543 # ===== Conversation Queries ===== 544 545 546 @st.cache_data(ttl=config.CACHE_TTL) 547 def get_conversation_stats() -> pd.DataFrame: 548 """Get conversation statistics.""" 549 query = """ 550 SELECT 551 COUNT(*) as total_conversations, 552 SUM(CASE WHEN direction = 'inbound' THEN 1 ELSE 0 END) as inbound_count, 553 SUM(CASE WHEN read_at IS NULL AND direction = 'inbound' THEN 1 ELSE 0 END) as unread_count 554 FROM messages 555 """ 556 conn = get_db_connection() 557 return pd.read_sql_query(query, conn) 558 559 560 @st.cache_data(ttl=config.CACHE_TTL) 561 def get_sentiment_distribution() -> pd.DataFrame: 562 """Get sentiment breakdown.""" 563 query = """ 564 SELECT 565 sentiment, 566 COUNT(*) as count 567 FROM messages 568 WHERE direction = 'inbound' AND sentiment IS NOT NULL 569 GROUP BY sentiment 570 ORDER BY count DESC 571 """ 572 conn = get_db_connection() 573 return pd.read_sql_query(query, conn) 574 575 576 @st.cache_data(ttl=config.CACHE_TTL) 577 def get_conversation_threads(limit: int = 50) -> pd.DataFrame: 578 """Get recent conversation threads.""" 579 query = """ 580 SELECT 581 c.id, 582 c.site_id, 583 c.direction, 584 c.contact_method as channel, 585 c.contact_uri as sender_identifier, 586 c.message_body, 587 c.sentiment, 588 c.created_at as received_at, 589 c.read_at, 590 c.contact_uri, 591 s.domain 592 FROM messages c 593 LEFT JOIN sites s ON s.id = c.site_id 594 WHERE c.direction = 'inbound' 595 ORDER BY c.created_at DESC 596 LIMIT {} 597 """.format( 598 limit 599 ) 600 conn = get_db_connection() 601 return pd.read_sql_query(query, conn) 602 603 604 @st.cache_data(ttl=config.CACHE_TTL) 605 def get_threaded_conversations(limit: int = 50) -> pd.DataFrame: 606 """Get conversation threads grouped by outreach_id with all messages.""" 607 query = """ 608 WITH recent_sites AS ( 609 SELECT DISTINCT site_id 610 FROM messages 611 WHERE direction = 'inbound' 612 ORDER BY created_at DESC 613 LIMIT {} 614 ) 615 SELECT 616 c.id, 617 c.site_id as outreach_id, 618 c.direction, 619 c.contact_method as channel, 620 c.contact_uri as sender_identifier, 621 c.message_body, 622 c.subject_line, 623 c.sentiment, 624 c.created_at as received_at, 625 c.read_at, 626 c.replied_at, 627 COALESCE(c.autoresponder_enabled, 1) as autoresponder_enabled, 628 c.contact_uri, 629 c.message_body as proposal_text, 630 s.domain, 631 s.id as site_id 632 FROM messages c 633 LEFT JOIN sites s ON s.id = c.site_id 634 WHERE c.site_id IN (SELECT site_id FROM recent_sites) 635 ORDER BY c.site_id DESC, c.created_at ASC 636 """.format( 637 limit 638 ) 639 conn = get_db_connection() 640 return pd.read_sql_query(query, conn) 641 642 643 # ===== Compliance & Rate Limit Queries ===== 644 645 646 @st.cache_data(ttl=config.CACHE_TTL) 647 def get_optout_stats() -> pd.DataFrame: 648 """Get opt-out statistics.""" 649 query = """ 650 SELECT 651 COUNT(DISTINCT email) as total_email_optouts, 652 COUNT(DISTINCT phone) as total_sms_optouts 653 FROM ( 654 SELECT email, NULL as phone FROM unsubscribed_emails 655 UNION ALL 656 SELECT email, phone FROM opt_outs 657 ) 658 """ 659 conn = get_db_connection() 660 return pd.read_sql_query(query, conn) 661 662 663 @st.cache_data(ttl=config.CACHE_TTL) 664 def get_platform_health() -> pd.DataFrame: 665 """Get platform health metrics (bounce/complaint rates).""" 666 query = """ 667 SELECT 668 contact_method, 669 COUNT(*) as total_sent, 670 SUM(CASE WHEN delivery_status = 'bounced' THEN 1 ELSE 0 END) as bounced, 671 SUM(CASE WHEN delivery_status = 'delivered' THEN 1 ELSE 0 END) as delivered, 672 ROUND(100.0 * SUM(CASE WHEN delivery_status = 'bounced' THEN 1 ELSE 0 END) / COUNT(*), 2) as bounce_rate, 673 ROUND(100.0 * SUM(CASE WHEN delivery_status = 'delivered' THEN 1 ELSE 0 END) / COUNT(*), 2) as delivery_rate 674 FROM messages 675 WHERE direction = 'outbound' 676 AND delivery_status IN ('sent', 'delivered', 'bounced', 'failed') 677 GROUP BY contact_method 678 """ 679 conn = get_db_connection() 680 return pd.read_sql_query(query, conn) 681 682 683 # ===== System Health Queries ===== 684 685 686 @st.cache_data(ttl=config.CACHE_TTL) 687 def get_cron_job_status() -> pd.DataFrame: 688 """Get cron job last run timestamps from config table.""" 689 query = """ 690 SELECT 691 REPLACE(key, 'cron_last_run_', '') as job_name, 692 value as last_run, 693 description 694 FROM config 695 WHERE key LIKE 'cron_last_run_%' 696 ORDER BY value DESC 697 """ 698 conn = get_db_connection() 699 return pd.read_sql_query(query, conn) 700 701 702 @st.cache_data(ttl=config.CACHE_TTL) 703 def get_cron_job_logs(job_name: str = None, limit: int = 50) -> pd.DataFrame: 704 """Get detailed cron job execution logs.""" 705 if job_name: 706 query = """ 707 SELECT 708 id, 709 job_name, 710 started_at, 711 finished_at, 712 status, 713 summary, 714 items_processed, 715 items_failed, 716 error_message, 717 CAST((julianday(finished_at) - julianday(started_at)) * 24 * 60 AS INTEGER) as duration_minutes 718 FROM cron_job_logs 719 WHERE job_name = ? 720 ORDER BY started_at DESC 721 LIMIT ? 722 """ 723 conn = get_db_connection() 724 return pd.read_sql_query(query, conn, params=(job_name, limit)) 725 else: 726 query = """ 727 SELECT 728 id, 729 job_name, 730 started_at, 731 finished_at, 732 status, 733 summary, 734 items_processed, 735 items_failed, 736 error_message, 737 CAST((julianday(finished_at) - julianday(started_at)) * 24 * 60 AS INTEGER) as duration_minutes 738 FROM cron_job_logs 739 ORDER BY started_at DESC 740 LIMIT ? 741 """ 742 conn = get_db_connection() 743 return pd.read_sql_query(query, conn, params=(limit,)) 744 745 746 @st.cache_data(ttl=config.CACHE_TTL) 747 def get_cron_job_full_log(log_id: int) -> str: 748 """Get full log text for a specific cron job execution.""" 749 query = """ 750 SELECT full_log 751 FROM cron_job_logs 752 WHERE id = ? 753 """ 754 conn = get_db_connection() 755 result = pd.read_sql_query(query, conn, params=(log_id,)) 756 return result["full_log"].iloc[0] if not result.empty else "" 757 758 759 @st.cache_data(ttl=config.CACHE_TTL) 760 def get_cron_job_summary() -> pd.DataFrame: 761 """ 762 Get summary statistics for each cron job with adaptive time windows. 763 764 Time windows based on frequency: 765 - Jobs running multiple times per day (minutes/hours): last 24 hours 766 - Jobs running every 1-6 days: last 7 days 767 - Jobs running every 7+ days: last 30 days 768 """ 769 query = """ 770 SELECT 771 l.job_name, 772 COUNT(*) as total_runs, 773 SUM(CASE WHEN l.status = 'success' THEN 1 ELSE 0 END) as successful_runs, 774 SUM(CASE WHEN l.status = 'failed' THEN 1 ELSE 0 END) as failed_runs, 775 ROUND(100.0 * SUM(CASE WHEN l.status = 'success' THEN 1 ELSE 0 END) / COUNT(*), 1) as success_rate, 776 MAX(l.started_at) as last_run, 777 AVG(CAST((julianday(l.finished_at) - julianday(l.started_at)) * 24 * 60 AS REAL)) as avg_duration_minutes, 778 j.interval_value, 779 j.interval_unit 780 FROM cron_job_logs l 781 LEFT JOIN cron_jobs j ON j.name = l.job_name 782 WHERE l.started_at >= datetime('now', 783 CASE 784 WHEN j.interval_unit IN ('minutes', 'hours') THEN '-1 days' 785 WHEN j.interval_unit = 'days' AND j.interval_value < 7 THEN '-7 days' 786 ELSE '-30 days' 787 END 788 ) 789 GROUP BY l.job_name, j.interval_value, j.interval_unit 790 ORDER BY last_run DESC 791 """ 792 conn = get_db_connection() 793 return pd.read_sql_query(query, conn) 794 795 796 @st.cache_data(ttl=config.CACHE_TTL) 797 def get_database_health() -> dict: 798 """Get database health metrics.""" 799 conn = get_db_connection() 800 801 # Database size 802 size_query = """ 803 SELECT page_count * page_size as size_bytes 804 FROM pragma_page_count(), pragma_page_size() 805 """ 806 size_df = pd.read_sql_query(size_query, conn) 807 size_mb = size_df["size_bytes"].iloc[0] / (1024 * 1024) if len(size_df) > 0 else 0 808 809 # Table counts 810 counts_query = """ 811 SELECT 'sites' as table_name, COUNT(*) as count FROM sites 812 UNION ALL 813 SELECT 'messages', COUNT(*) FROM messages 814 UNION ALL 815 SELECT 'keywords', COUNT(*) FROM keywords 816 """ 817 counts_df = pd.read_sql_query(counts_query, conn) 818 819 # Integrity check 820 integrity = conn.execute("PRAGMA integrity_check").fetchone()[0] 821 822 return {"size_mb": size_mb, "integrity": integrity, "table_counts": counts_df} 823 824 825 @st.cache_data(ttl=config.CACHE_TTL) 826 def get_http_error_history(days: int = 30) -> pd.DataFrame: 827 """Get HTTP error code history for IP burnout tracking.""" 828 # Try cache first for 30-day request (most common) 829 if days == 30: 830 cached = get_from_cache('chart_http_errors_30d') 831 if cached: 832 return pd.DataFrame(cached) 833 834 query = """ 835 SELECT 836 DATE(updated_at) as date, 837 http_status_code, 838 status as stage, 839 COUNT(*) as count 840 FROM sites 841 WHERE updated_at >= datetime('now', '-{} days') 842 AND http_status_code IS NOT NULL 843 AND http_status_code != 200 844 AND status IN ('assets_captured', 'enriched') 845 GROUP BY DATE(updated_at), http_status_code, status 846 ORDER BY date, http_status_code 847 """.format( 848 days 849 ) 850 conn = get_db_connection() 851 return pd.read_sql_query(query, conn) 852 853 854 @st.cache_data(ttl=config.CACHE_TTL) 855 def get_api_rate_limits() -> pd.DataFrame: 856 """Get recent rate limit errors from sites.""" 857 query = """ 858 SELECT 859 status as stage, 860 error_message, 861 COUNT(*) as count, 862 MAX(updated_at) as last_occurrence 863 FROM sites 864 WHERE error_message LIKE '%rate%limit%' 865 OR error_message LIKE '%429%' 866 OR error_message LIKE '%quota%' 867 GROUP BY status, error_message 868 ORDER BY last_occurrence DESC 869 LIMIT 20 870 """ 871 conn = get_db_connection() 872 return pd.read_sql_query(query, conn) 873 874 875 # ===== LLM Cost & Forecasting Queries ===== 876 877 878 @st.cache_data(ttl=config.CACHE_TTL) 879 def get_llm_cost_by_stage() -> pd.DataFrame: 880 """Get LLM cost breakdown by pipeline stage.""" 881 query = """ 882 SELECT 883 stage, 884 SUM(estimated_cost) as total_cost, 885 SUM(total_tokens) as total_tokens, 886 COUNT(*) as request_count, 887 AVG(estimated_cost) as avg_cost_per_request, 888 SUM(prompt_tokens) as total_prompt_tokens, 889 SUM(completion_tokens) as total_completion_tokens 890 FROM llm_usage 891 GROUP BY stage 892 ORDER BY total_cost DESC 893 """ 894 conn = get_db_connection() 895 return pd.read_sql_query(query, conn) 896 897 898 @st.cache_data(ttl=config.CACHE_TTL) 899 def get_llm_cost_by_model() -> pd.DataFrame: 900 """Get LLM cost breakdown by model.""" 901 query = """ 902 SELECT 903 model, 904 provider, 905 SUM(estimated_cost) as total_cost, 906 SUM(total_tokens) as total_tokens, 907 COUNT(*) as request_count 908 FROM llm_usage 909 GROUP BY model, provider 910 ORDER BY total_cost DESC 911 """ 912 conn = get_db_connection() 913 return pd.read_sql_query(query, conn) 914 915 916 @st.cache_data(ttl=config.CACHE_TTL) 917 def get_llm_cost_over_time(days: int = 30) -> pd.DataFrame: 918 """Get daily LLM costs over time.""" 919 query = """ 920 SELECT 921 DATE(created_at) as date, 922 SUM(estimated_cost) as daily_cost, 923 SUM(total_tokens) as daily_tokens, 924 COUNT(*) as daily_requests 925 FROM llm_usage 926 WHERE created_at >= datetime('now', '-{} days') 927 GROUP BY DATE(created_at) 928 ORDER BY date 929 """.format( 930 days 931 ) 932 conn = get_db_connection() 933 return pd.read_sql_query(query, conn) 934 935 936 @st.cache_data(ttl=config.CACHE_TTL) 937 def get_cost_per_site() -> pd.DataFrame: 938 """Get average LLM cost per site by stage.""" 939 query = """ 940 SELECT 941 s.status, 942 COUNT(DISTINCT s.id) as site_count, 943 COALESCE(SUM(l.estimated_cost), 0) as total_cost, 944 COALESCE(AVG(l.estimated_cost), 0) as avg_cost_per_site 945 FROM sites s 946 LEFT JOIN llm_usage l ON l.site_id = s.id 947 WHERE s.status IN ('scored', 'rescored', 'enriched', 'proposals_drafted', 'outreach_sent') 948 GROUP BY s.status 949 ORDER BY 950 CASE s.status 951 WHEN 'scored' THEN 1 952 WHEN 'rescored' THEN 2 953 WHEN 'enriched' THEN 3 954 WHEN 'proposals_drafted' THEN 4 955 WHEN 'outreach_sent' THEN 5 956 END 957 """ 958 conn = get_db_connection() 959 return pd.read_sql_query(query, conn) 960 961 962 @st.cache_data(ttl=config.CACHE_TTL) 963 def get_cost_per_response() -> pd.DataFrame: 964 """Calculate cost per response by channel.""" 965 query = """ 966 SELECT 967 o.contact_method as channel, 968 COUNT(DISTINCT CASE WHEN r.direction = 'inbound' THEN r.site_id END) as responses, 969 COALESCE(SUM(l.estimated_cost), 0) as total_cost, 970 CASE 971 WHEN COUNT(DISTINCT CASE WHEN r.direction = 'inbound' THEN r.site_id END) > 0 972 THEN COALESCE(SUM(l.estimated_cost), 0) / COUNT(DISTINCT CASE WHEN r.direction = 'inbound' THEN r.site_id END) 973 ELSE 0 974 END as cost_per_response 975 FROM messages o 976 LEFT JOIN messages r ON r.site_id = o.site_id AND r.direction = 'inbound' 977 LEFT JOIN llm_usage l ON l.site_id = o.site_id 978 WHERE o.direction = 'outbound' 979 AND o.delivery_status IN ('sent', 'delivered') 980 GROUP BY o.contact_method 981 ORDER BY cost_per_response 982 """ 983 conn = get_db_connection() 984 return pd.read_sql_query(query, conn) 985 986 987 @st.cache_data(ttl=config.CACHE_TTL) 988 def get_cost_forecast() -> dict: 989 """Generate cost forecast based on current usage patterns.""" 990 conn = get_db_connection() 991 992 # Get average cost per site at each stage 993 stage_costs_query = """ 994 SELECT 995 stage, 996 AVG(estimated_cost) as avg_cost 997 FROM llm_usage 998 GROUP BY stage 999 """ 1000 stage_costs = pd.read_sql_query(stage_costs_query, conn) 1001 1002 # Get current pipeline counts 1003 pipeline_query = """ 1004 SELECT 1005 CASE 1006 WHEN status IN ('pending', 'assets_captured') THEN 'pending_scoring' 1007 WHEN status = 'scored' THEN 'pending_proposals' 1008 WHEN status IN ('rescored', 'enriched', 'proposals_drafted') THEN 'pending_outreach' 1009 ELSE 'completed' 1010 END as pipeline_stage, 1011 COUNT(*) as count 1012 FROM sites 1013 GROUP BY pipeline_stage 1014 """ 1015 pipeline = pd.read_sql_query(pipeline_query, conn) 1016 1017 # Calculate forecast 1018 forecast = { 1019 "stage_costs": stage_costs.to_dict("records"), 1020 "pipeline_counts": pipeline.to_dict("records"), 1021 } 1022 1023 return forecast 1024 1025 1026 @st.cache_data(ttl=config.CACHE_TTL) 1027 def get_new_records_24h() -> pd.DataFrame: 1028 """Get count of new records created in the last 24 hours, broken down by table and status.""" 1029 conn = get_db_connection() 1030 1031 # Sites table 1032 sites_query = """ 1033 SELECT 1034 'sites' as table_name, 1035 status, 1036 COUNT(*) as count 1037 FROM sites 1038 WHERE created_at >= datetime('now', '-24 hours') 1039 GROUP BY status 1040 """ 1041 1042 # Outbound messages (formerly outreaches) 1043 outreaches_query = """ 1044 SELECT 1045 'outreaches' as table_name, 1046 COALESCE(delivery_status, approval_status) as status, 1047 COUNT(*) as count 1048 FROM messages 1049 WHERE direction = 'outbound' 1050 AND created_at >= datetime('now', '-24 hours') 1051 GROUP BY COALESCE(delivery_status, approval_status) 1052 """ 1053 1054 # Inbound messages (formerly conversations) 1055 conversations_query = """ 1056 SELECT 1057 'conversations' as table_name, 1058 direction as status, 1059 COUNT(*) as count 1060 FROM messages 1061 WHERE direction = 'inbound' 1062 AND created_at >= datetime('now', '-24 hours') 1063 GROUP BY direction 1064 """ 1065 1066 # Keywords table 1067 keywords_query = """ 1068 SELECT 1069 'keywords' as table_name, 1070 status, 1071 COUNT(*) as count 1072 FROM keywords 1073 WHERE created_at >= datetime('now', '-24 hours') 1074 GROUP BY status 1075 """ 1076 1077 # LLM usage table 1078 llm_query = """ 1079 SELECT 1080 'llm_usage' as table_name, 1081 stage as status, 1082 COUNT(*) as count 1083 FROM llm_usage 1084 WHERE created_at >= datetime('now', '-24 hours') 1085 GROUP BY stage 1086 """ 1087 1088 # Combine all queries 1089 combined_query = f""" 1090 {sites_query} 1091 UNION ALL 1092 {outreaches_query} 1093 UNION ALL 1094 {conversations_query} 1095 UNION ALL 1096 {keywords_query} 1097 UNION ALL 1098 {llm_query} 1099 ORDER BY table_name, status 1100 """ 1101 1102 return pd.read_sql_query(combined_query, conn) 1103 1104 1105 # ===== Cron Job Management Queries ===== 1106 1107 1108 @st.cache_resource 1109 def get_writable_db_connection() -> sqlite3.Connection: 1110 """ 1111 Create writable SQLite connection for cron job management. 1112 1113 Returns: 1114 sqlite3.Connection: Writable database connection 1115 """ 1116 conn = sqlite3.connect( 1117 config.DATABASE_PATH, check_same_thread=False, isolation_level=None, timeout=10.0 1118 ) 1119 conn.row_factory = sqlite3.Row 1120 return conn 1121 1122 1123 @st.cache_data(ttl=30) # Cache for 30 seconds (more frequent updates) 1124 def get_cron_jobs() -> pd.DataFrame: 1125 """Get all cron jobs with their current status.""" 1126 query = """ 1127 SELECT 1128 id, 1129 name, 1130 task_key, 1131 description, 1132 handler_type, 1133 handler_value, 1134 interval_value, 1135 interval_unit, 1136 enabled, 1137 last_run_at, 1138 created_at, 1139 updated_at 1140 FROM cron_jobs 1141 ORDER BY interval_value, interval_unit, name 1142 """ 1143 conn = get_db_connection() 1144 return pd.read_sql_query(query, conn) 1145 1146 1147 @st.cache_data(ttl=30) 1148 def get_cron_job_stats() -> dict: 1149 """ 1150 Get cron job statistics with adaptive time windows. 1151 1152 Success rate calculated using frequency-based windows: 1153 - Jobs running multiple times per day: last 24 hours 1154 - Jobs running every 1-6 days: last 7 days 1155 - Jobs running every 7+ days: last 30 days 1156 """ 1157 conn = get_db_connection() 1158 1159 stats_query = """ 1160 SELECT 1161 COUNT(*) as total, 1162 SUM(CASE WHEN enabled = 1 THEN 1 ELSE 0 END) as enabled, 1163 SUM(CASE WHEN enabled = 0 THEN 1 ELSE 0 END) as disabled 1164 FROM cron_jobs 1165 """ 1166 stats = conn.execute(stats_query).fetchone() 1167 1168 # Aggregate execution stats across all jobs using adaptive time windows 1169 exec_query = """ 1170 SELECT 1171 SUM(total_runs) as total_executions, 1172 SUM(successful_runs) as successful, 1173 SUM(failed_runs) as failed 1174 FROM ( 1175 SELECT 1176 l.job_name, 1177 COUNT(*) as total_runs, 1178 SUM(CASE WHEN l.status = 'success' THEN 1 ELSE 0 END) as successful_runs, 1179 SUM(CASE WHEN l.status = 'failed' THEN 1 ELSE 0 END) as failed_runs 1180 FROM cron_job_logs l 1181 LEFT JOIN cron_jobs j ON j.name = l.job_name 1182 WHERE l.started_at >= datetime('now', 1183 CASE 1184 WHEN j.interval_unit IN ('minutes', 'hours') THEN '-1 days' 1185 WHEN j.interval_unit = 'days' AND j.interval_value < 7 THEN '-7 days' 1186 ELSE '-30 days' 1187 END 1188 ) 1189 GROUP BY l.job_name 1190 ) 1191 """ 1192 exec_stats = conn.execute(exec_query).fetchone() 1193 1194 return { 1195 "total_jobs": stats["total"], 1196 "enabled_jobs": stats["enabled"], 1197 "disabled_jobs": stats["disabled"], 1198 "total_executions": exec_stats["total_executions"] or 0, 1199 "successful_executions": exec_stats["successful"] or 0, 1200 "failed_executions": exec_stats["failed"] or 0, 1201 } 1202 1203 1204 @st.cache_data(ttl=30) 1205 def get_cron_job_logs(task_name: Optional[str] = None, limit: int = 100) -> pd.DataFrame: 1206 """Get cron job execution logs.""" 1207 if task_name: 1208 query = """ 1209 SELECT 1210 id, 1211 job_name, 1212 started_at, 1213 finished_at, 1214 status, 1215 summary, 1216 items_processed, 1217 items_failed, 1218 error_message, 1219 ROUND((julianday(finished_at) - julianday(started_at)) * 24 * 60, 2) as duration_minutes 1220 FROM cron_job_logs 1221 WHERE job_name = ? 1222 ORDER BY started_at DESC 1223 LIMIT ? 1224 """ 1225 conn = get_db_connection() 1226 return pd.read_sql_query(query, conn, params=(task_name, limit)) 1227 else: 1228 query = """ 1229 SELECT 1230 id, 1231 job_name, 1232 started_at, 1233 finished_at, 1234 status, 1235 summary, 1236 items_processed, 1237 items_failed, 1238 error_message, 1239 ROUND((julianday(finished_at) - julianday(started_at)) * 24 * 60, 2) as duration_minutes 1240 FROM cron_job_logs 1241 ORDER BY started_at DESC 1242 LIMIT ? 1243 """ 1244 conn = get_db_connection() 1245 return pd.read_sql_query(query, conn, params=(limit,)) 1246 1247 1248 @st.cache_data(ttl=30) 1249 def get_recent_cron_failures(days: int = 7, limit: int = 10) -> pd.DataFrame: 1250 """Get recent cron job failures.""" 1251 query = """ 1252 SELECT 1253 job_name, 1254 COUNT(*) as failure_count, 1255 MAX(started_at) as last_failure 1256 FROM cron_job_logs 1257 WHERE status = 'failed' 1258 AND started_at >= datetime('now', '-{} days') 1259 GROUP BY job_name 1260 ORDER BY failure_count DESC 1261 LIMIT ? 1262 """.format( 1263 days 1264 ) 1265 conn = get_db_connection() 1266 return pd.read_sql_query(query, conn, params=(limit,)) 1267 1268 1269 def update_cron_job_enabled(task_key: str, enabled: bool) -> bool: 1270 """ 1271 Enable or disable a cron job. 1272 1273 Args: 1274 task_key: The task_key of the job to update 1275 enabled: True to enable, False to disable 1276 1277 Returns: 1278 bool: True if successful, False otherwise 1279 """ 1280 try: 1281 conn = get_writable_db_connection() 1282 conn.execute( 1283 "UPDATE cron_jobs SET enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE task_key = ?", 1284 (1 if enabled else 0, task_key), 1285 ) 1286 conn.commit() 1287 # Clear cache to reflect changes 1288 get_cron_jobs.clear() 1289 get_cron_job_stats.clear() 1290 return True 1291 except Exception as e: 1292 logger.error(f"Failed to update job: {e}", exc_info=True) 1293 st.error(f"Failed to update job: {e}") 1294 return False 1295 1296 1297 def update_cron_job_interval( 1298 task_key: str, interval_value: int, interval_unit: str 1299 ) -> bool: 1300 """ 1301 Update a cron job's interval. 1302 1303 Args: 1304 task_key: The task_key of the job to update 1305 interval_value: The interval value (number) 1306 interval_unit: The interval unit (minutes, hours, days, weeks) 1307 1308 Returns: 1309 bool: True if successful, False otherwise 1310 """ 1311 try: 1312 conn = get_writable_db_connection() 1313 conn.execute( 1314 """ 1315 UPDATE cron_jobs 1316 SET interval_value = ?, interval_unit = ?, updated_at = CURRENT_TIMESTAMP 1317 WHERE task_key = ? 1318 """, 1319 (interval_value, interval_unit, task_key), 1320 ) 1321 conn.commit() 1322 # Clear cache 1323 get_cron_jobs.clear() 1324 return True 1325 except Exception as e: 1326 logger.error(f"Failed to update interval: {e}", exc_info=True) 1327 st.error(f"Failed to update interval: {e}") 1328 return False 1329 1330 1331 @st.cache_data(ttl=config.CACHE_TTL) 1332 def get_cron_daily_history(days: int = 7) -> pd.DataFrame: 1333 """Get daily success/failure counts for cron jobs over the last N days.""" 1334 # Try cache first for 7-day request (most common) 1335 if days == 7: 1336 cached = get_from_cache('chart_cron_daily_history_7d') 1337 if cached: 1338 return pd.DataFrame(cached) 1339 1340 query = """ 1341 SELECT 1342 DATE(started_at) as date, 1343 SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successful, 1344 SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed, 1345 COUNT(*) as total 1346 FROM cron_job_logs 1347 WHERE started_at >= datetime('now', '-{} days') 1348 GROUP BY DATE(started_at) 1349 ORDER BY date 1350 """.format( 1351 days 1352 ) 1353 conn = get_db_connection() 1354 return pd.read_sql_query(query, conn) 1355 1356 1357 @st.cache_data(ttl=30) # Cache for 30 seconds (frequent updates for timeline) 1358 def get_cron_job_timeline(hours: int = 24, limit: int = 200) -> pd.DataFrame: 1359 """ 1360 Get cron job execution timeline for Gantt chart. 1361 Returns recent job executions with start/end times for the last N hours. 1362 1363 Limited to most recent executions to avoid overwhelming the browser 1364 with thousands of Plotly shapes (performance optimization). 1365 """ 1366 # Try cache first for 24-hour/200-limit request (most common) 1367 if hours == 24 and limit == 200: 1368 cached = get_from_cache('chart_cron_timeline_24h') 1369 if cached: 1370 return pd.DataFrame(cached) 1371 1372 query = """ 1373 SELECT 1374 job_name, 1375 started_at, 1376 finished_at, 1377 status, 1378 ROUND((julianday(COALESCE(finished_at, datetime('now'))) - julianday(started_at)) * 24 * 60, 2) as duration_minutes 1379 FROM cron_job_logs 1380 WHERE started_at >= datetime('now', '-{} hours') 1381 ORDER BY started_at DESC 1382 LIMIT {} 1383 """.format( 1384 hours, limit 1385 ) 1386 conn = get_db_connection() 1387 df = pd.read_sql_query(query, conn) 1388 # Re-sort chronologically for timeline display (query sorted DESC for LIMIT) 1389 return df.sort_values('started_at') if not df.empty else df 1390 1391 1392 @st.cache_data(ttl=30) 1393 def get_last_run_status_all_jobs() -> pd.DataFrame: 1394 """ 1395 Get the last run status for all jobs in a single batched query. 1396 Returns DataFrame with columns: job_name, status, started_at 1397 1398 This replaces N individual queries with one query, dramatically improving performance. 1399 """ 1400 query = """ 1401 WITH latest_logs AS ( 1402 SELECT 1403 job_name, 1404 status, 1405 started_at, 1406 ROW_NUMBER() OVER (PARTITION BY job_name ORDER BY started_at DESC) as rn 1407 FROM cron_job_logs 1408 ) 1409 SELECT 1410 job_name, 1411 status, 1412 started_at 1413 FROM latest_logs 1414 WHERE rn = 1 1415 """ 1416 conn = get_db_connection() 1417 return pd.read_sql_query(query, conn) 1418 1419 1420 @st.cache_data(ttl=30) # Cache for 30 seconds (frequent updates) 1421 def get_system_metrics_timeline(hours: int = 24) -> pd.DataFrame: 1422 """ 1423 Get system metrics timeline for overlay on Gantt chart. 1424 Returns CPU, disk I/O, and memory usage for the last N hours. 1425 """ 1426 # Try cache first for 24-hour request (most common) 1427 if hours == 24: 1428 cached = get_from_cache('chart_system_metrics_24h') 1429 if cached: 1430 return pd.DataFrame(cached) 1431 1432 query = """ 1433 SELECT 1434 recorded_at, 1435 cpu_percent, 1436 disk_read_mb, 1437 disk_write_mb, 1438 memory_percent 1439 FROM system_metrics 1440 WHERE recorded_at >= datetime('now', '-{} hours') 1441 ORDER BY recorded_at 1442 """.format( 1443 hours 1444 ) 1445 conn = get_db_connection() 1446 return pd.read_sql_query(query, conn) 1447 1448 1449 # ===== Conversation Management ===== 1450 1451 1452 def update_autoresponder_status(outreach_id: int, enabled: bool) -> bool: 1453 """ 1454 Enable or disable autoresponder for a specific conversation thread. 1455 1456 Args: 1457 outreach_id: The site_id of the conversation (was outreach_id, now site_id) 1458 enabled: True to enable autoresponder, False to disable 1459 1460 Returns: 1461 bool: True if successful, False otherwise 1462 """ 1463 try: 1464 conn = get_writable_db_connection() 1465 conn.execute( 1466 """ 1467 UPDATE messages 1468 SET autoresponder_enabled = ? 1469 WHERE site_id = ? 1470 """, 1471 (1 if enabled else 0, outreach_id), 1472 ) 1473 conn.commit() 1474 # Clear cache to reflect changes 1475 get_threaded_conversations.clear() 1476 return True 1477 except Exception as e: 1478 logger.error(f"Failed to update autoresponder status: {e}", exc_info=True) 1479 st.error(f"Failed to update autoresponder status: {e}") 1480 return False 1481 1482 1483 def insert_manual_reply(outreach_id: int, message_body: str, channel: str) -> bool: 1484 """ 1485 Insert a manual reply into the messages table. 1486 1487 Args: 1488 outreach_id: The site_id to reply to (was outreach_id, now site_id) 1489 message_body: The message text 1490 channel: The channel (sms, email, form, x, linkedin) 1491 1492 Returns: 1493 bool: True if successful, False otherwise 1494 """ 1495 try: 1496 conn = get_writable_db_connection() 1497 conn.execute( 1498 """ 1499 INSERT INTO messages ( 1500 site_id, 1501 direction, 1502 contact_method, 1503 message_body, 1504 created_at, 1505 autoresponder_enabled 1506 ) VALUES (?, 'outbound', ?, ?, CURRENT_TIMESTAMP, 0) 1507 """, 1508 (outreach_id, channel, message_body), 1509 ) 1510 conn.commit() 1511 # Clear cache to reflect changes 1512 get_threaded_conversations.clear() 1513 get_conversation_stats.clear() 1514 return True 1515 except Exception as e: 1516 logger.error(f"Failed to insert manual reply: {e}", exc_info=True) 1517 st.error(f"Failed to insert manual reply: {e}") 1518 return False 1519 1520 1521 def trigger_cron_job(task_key: str, handler_type: str, handler_value: str) -> dict: 1522 """ 1523 Manually trigger a cron job to run immediately. 1524 1525 Args: 1526 task_key: The task_key of the job to run 1527 handler_type: The handler type (function or command) 1528 handler_value: The handler value (function name or command) 1529 1530 Returns: 1531 dict: Result with success status and message 1532 """ 1533 import subprocess 1534 from pathlib import Path 1535 1536 try: 1537 # Get project root (dashboard is in dashboard/, project root is parent) 1538 project_root = Path(__file__).parent.parent.parent 1539 1540 # Use the run-cron-job.js script 1541 script_path = project_root / 'scripts' / 'run-cron-job.js' 1542 1543 # Execute the script in the background 1544 process = subprocess.Popen( 1545 ['node', str(script_path), task_key], 1546 cwd=str(project_root), 1547 stdout=subprocess.PIPE, 1548 stderr=subprocess.PIPE, 1549 text=True 1550 ) 1551 1552 # Don't wait for completion - let it run in background 1553 # The job will log its own results to the database 1554 1555 return { 1556 'success': True, 1557 'message': f'Job triggered successfully (running in background)', 1558 'pid': process.pid 1559 } 1560 1561 except Exception as e: 1562 logger.error(f"Failed to trigger cron job: {e}", exc_info=True) 1563 return { 1564 'success': False, 1565 'error': str(e) 1566 } 1567 1568 1569 # ===== Dashboard Cache Management ===== 1570 1571 1572 def get_from_cache(key: str): 1573 """ 1574 Get a value from the dashboard cache. 1575 1576 Args: 1577 key: The cache key 1578 1579 Returns: 1580 dict or None: The cached value (parsed from JSON) or None if expired/missing 1581 """ 1582 try: 1583 conn = get_db_connection() 1584 result = conn.execute( 1585 """ 1586 SELECT cache_value, expires_at 1587 FROM dashboard_cache 1588 WHERE cache_key = ? AND expires_at > datetime('now') 1589 """, 1590 (key,) 1591 ).fetchone() 1592 1593 if result: 1594 import json 1595 return json.loads(result["cache_value"]) 1596 return None 1597 except Exception as e: 1598 logger.error(f"Failed to get cache value for {key}: {e}", exc_info=True) 1599 return None 1600 1601 1602 def set_cache(key: str, value, expiration_minutes: int = 15) -> bool: 1603 """ 1604 Set a value in the dashboard cache. 1605 1606 Args: 1607 key: The cache key 1608 value: The value to cache (will be JSON serialized) 1609 expiration_minutes: How long until cache expires (default: 15 minutes) 1610 1611 Returns: 1612 bool: True if successful, False otherwise 1613 """ 1614 try: 1615 import json 1616 from datetime import datetime, timedelta 1617 1618 conn = get_writable_db_connection() 1619 expires_at = (datetime.now() + timedelta(minutes=expiration_minutes)).isoformat() 1620 1621 conn.execute( 1622 """ 1623 INSERT INTO dashboard_cache (cache_key, cache_value, expires_at, updated_at) 1624 VALUES (?, ?, ?, CURRENT_TIMESTAMP) 1625 ON CONFLICT(cache_key) DO UPDATE SET 1626 cache_value = excluded.cache_value, 1627 expires_at = excluded.expires_at, 1628 updated_at = CURRENT_TIMESTAMP 1629 """, 1630 (key, json.dumps(value), expires_at), 1631 ) 1632 conn.commit() 1633 return True 1634 except Exception as e: 1635 logger.error(f"Failed to set cache value for {key}: {e}", exc_info=True) 1636 return False 1637 1638 1639 def clear_all_cache() -> bool: 1640 """ 1641 Clear all dashboard cache entries. 1642 Used for manual refresh functionality. 1643 1644 Returns: 1645 bool: True if successful, False otherwise 1646 """ 1647 try: 1648 conn = get_writable_db_connection() 1649 conn.execute("DELETE FROM dashboard_cache") 1650 conn.commit() 1651 logger.info("Cleared all dashboard cache entries") 1652 return True 1653 except Exception as e: 1654 logger.error(f"Failed to clear cache: {e}", exc_info=True) 1655 return False 1656 1657 1658 def get_cache_stats() -> dict: 1659 """ 1660 Get statistics about the dashboard cache. 1661 1662 Returns: 1663 dict: Cache statistics (total entries, fresh entries, expired entries) 1664 """ 1665 try: 1666 conn = get_db_connection() 1667 1668 stats_query = """ 1669 SELECT 1670 COUNT(*) as total_entries, 1671 SUM(CASE WHEN expires_at > datetime('now') THEN 1 ELSE 0 END) as fresh_entries, 1672 SUM(CASE WHEN expires_at <= datetime('now') THEN 1 ELSE 0 END) as expired_entries 1673 FROM dashboard_cache 1674 """ 1675 result = conn.execute(stats_query).fetchone() 1676 1677 return { 1678 'total_entries': result['total_entries'] or 0, 1679 'fresh_entries': result['fresh_entries'] or 0, 1680 'expired_entries': result['expired_entries'] or 0, 1681 } 1682 except Exception as e: 1683 logger.error(f"Failed to get cache stats: {e}", exc_info=True) 1684 return {'total_entries': 0, 'fresh_entries': 0, 'expired_entries': 0} 1685 1686 1687 # ===== Status Tree (error breakdown) ===== 1688 1689 1690 def get_status_tree() -> list: 1691 """ 1692 Get the pipeline status tree (sites) from dashboard_cache. 1693 Falls back to empty list if cache is missing/expired. 1694 Returns a list of dicts with status, total, delta_24h, delta_1h, children. 1695 """ 1696 cached = get_cache('status_tree') 1697 if cached is not None: 1698 return cached 1699 return [] 1700 1701 1702 def get_outreach_tree() -> list: 1703 """ 1704 Get the outreach status tree from dashboard_cache. 1705 Falls back to empty list if cache is missing/expired. 1706 """ 1707 cached = get_cache('outreach_tree') 1708 if cached is not None: 1709 return cached 1710 return [] 1711 1712 1713 def get_error_proposals(status: str = 'pending') -> list: 1714 """ 1715 Get error pattern proposals from the database. 1716 1717 Args: 1718 status: Filter by status ('pending', 'approved', 'rejected') 1719 1720 Returns: 1721 list of dicts with proposal fields 1722 """ 1723 try: 1724 import json 1725 conn = get_db_connection() 1726 rows = conn.execute( 1727 """ 1728 SELECT id, pattern, label, group_name, context, 1729 example_errors, occurrence_count, status, created_at 1730 FROM error_pattern_proposals 1731 WHERE status = ? 1732 ORDER BY occurrence_count DESC, created_at DESC 1733 """, 1734 (status,) 1735 ).fetchall() 1736 1737 result = [] 1738 for row in rows: 1739 r = dict(row) 1740 if r.get('example_errors'): 1741 try: 1742 r['example_errors'] = json.loads(r['example_errors']) 1743 except Exception: 1744 r['example_errors'] = [] 1745 else: 1746 r['example_errors'] = [] 1747 result.append(r) 1748 return result 1749 except Exception as e: 1750 logger.error(f"Failed to get error proposals: {e}", exc_info=True) 1751 return [] 1752 1753 1754 def review_error_proposal(proposal_id: int, decision: str, reviewer: str = 'dashboard') -> bool: 1755 """ 1756 Approve or reject an error pattern proposal. 1757 1758 Args: 1759 proposal_id: The proposal ID 1760 decision: 'approved' or 'rejected' 1761 reviewer: Who reviewed it 1762 1763 Returns: 1764 bool: True if successful 1765 """ 1766 if decision not in ('approved', 'rejected'): 1767 return False 1768 try: 1769 conn = get_writable_db_connection() 1770 conn.execute( 1771 """ 1772 UPDATE error_pattern_proposals 1773 SET status = ?, reviewed_at = CURRENT_TIMESTAMP, reviewed_by = ? 1774 WHERE id = ? 1775 """, 1776 (decision, reviewer, proposal_id) 1777 ) 1778 conn.commit() 1779 return True 1780 except Exception as e: 1781 logger.error(f"Failed to review error proposal {proposal_id}: {e}", exc_info=True) 1782 return False 1783 1784 1785 # ===== Config Management ===== 1786 1787 1788 def get_config_value(key: str, default: Optional[str] = None) -> Optional[str]: 1789 """ 1790 Get a configuration value from the config table. 1791 1792 Args: 1793 key: The configuration key 1794 default: Default value if key doesn't exist 1795 1796 Returns: 1797 str: The configuration value or default 1798 """ 1799 try: 1800 conn = get_db_connection() 1801 result = conn.execute( 1802 "SELECT value FROM config WHERE key = ?", 1803 (key,) 1804 ).fetchone() 1805 return result["value"] if result else default 1806 except Exception as e: 1807 logger.error(f"Failed to get config value for {key}: {e}", exc_info=True) 1808 return default 1809 1810 1811 def set_config_value(key: str, value: str, description: Optional[str] = None) -> bool: 1812 """ 1813 Set a configuration value in the config table. 1814 1815 Args: 1816 key: The configuration key 1817 value: The value to set 1818 description: Optional description of the config 1819 1820 Returns: 1821 bool: True if successful, False otherwise 1822 """ 1823 try: 1824 conn = get_writable_db_connection() 1825 # Use INSERT OR REPLACE to handle both new and existing keys 1826 if description: 1827 conn.execute( 1828 """ 1829 INSERT INTO config (key, value, description) 1830 VALUES (?, ?, ?) 1831 ON CONFLICT(key) DO UPDATE SET 1832 value = excluded.value, 1833 description = COALESCE(excluded.description, description) 1834 """, 1835 (key, value, description), 1836 ) 1837 else: 1838 conn.execute( 1839 """ 1840 INSERT INTO config (key, value) 1841 VALUES (?, ?) 1842 ON CONFLICT(key) DO UPDATE SET 1843 value = excluded.value 1844 """, 1845 (key, value), 1846 ) 1847 conn.commit() 1848 return True 1849 except Exception as e: 1850 logger.error(f"Failed to set config value for {key}: {e}", exc_info=True) 1851 return False