1_đ§_Pipeline.py
1 """ 2 333 Method Analytics Dashboard - Pipeline Page 3 """ 4 5 import logging 6 import streamlit as st 7 from streamlit_autorefresh import st_autorefresh 8 from dashboard import config 9 from dashboard.utils import database 10 from dashboard.components import metrics, charts, filters, pipeline_widgets 11 from dashboard.utils.logging_config import configure_app_logging, log_exception 12 13 # Configure logging for this page 14 logger = configure_app_logging("dashboard.pipeline") 15 16 # Page configuration 17 st.set_page_config(page_title="Pipeline", page_icon="đ§", layout="wide") 18 19 # Auto-refresh 20 st_autorefresh(interval=config.REFRESH_INTERVAL * 1000, key="pipeline_refresh") 21 22 st.title("đ§ Pipeline") 23 24 # === Filters === 25 with st.expander("Filters", expanded=False): 26 col1, col2 = st.columns(2) 27 with col1: 28 start_date, end_date = filters.date_range_filter("pipeline") 29 with col2: 30 selected_statuses = filters.status_multiselect("pipeline") 31 32 # === Key Metrics === 33 try: 34 funnel = database.get_pipeline_funnel() 35 errors = database.get_error_breakdown() 36 stuck_sites = database.get_stuck_sites_by_error() 37 38 total_sites = funnel["count"].sum() if not funnel.empty else 0 39 total_errors = errors["count"].sum() if not errors.empty else 0 40 stuck_count = len(stuck_sites) if not stuck_sites.empty else 0 41 42 metrics.display_metric_grid( 43 [ 44 {"label": "Total Sites", "value": f"{total_sites:,}"}, 45 {"label": "Active Errors", "value": f"{total_errors:,}"}, 46 {"label": "Stuck Sites", "value": f"{stuck_count:,}"}, 47 ] 48 ) 49 50 st.markdown("---") 51 52 # === Processing Throughput Charts === 53 col1, col2 = st.columns(2) 54 55 with col1: 56 st.subheader("Daily Processing Throughput") 57 throughput = database.get_daily_throughput(30) 58 if not throughput.empty: 59 fig = charts.create_line_chart( 60 throughput, "date", "sites_added", "Sites Added Per Day (Last 30 Days)" 61 ) 62 st.plotly_chart(fig, use_container_width=True) 63 else: 64 st.info("No throughput data available") 65 66 with col2: 67 st.subheader("Hourly Status Breakdown (48h)") 68 status_breakdown = database.get_hourly_status_breakdown(48) 69 if not status_breakdown.empty: 70 # Get all status columns (exclude 'hour' column) 71 status_cols = [col for col in status_breakdown.columns if col != 'hour'] 72 73 # Create stacked bar chart with status-based colors 74 import plotly.graph_objects as go 75 from dashboard import config 76 77 fig = go.Figure() 78 79 # Add a bar for each status 80 for status in status_cols: 81 if status in status_breakdown.columns: 82 fig.add_trace(go.Bar( 83 name=status.replace('_', ' ').title(), 84 x=status_breakdown['hour'], 85 y=status_breakdown[status], 86 marker_color=config.STATUS_COLORS.get(status, '#999999') 87 )) 88 89 fig.update_layout( 90 barmode='stack', 91 xaxis_title='Hour', 92 yaxis_title='Sites Processed', 93 height=400, 94 margin=dict(l=20, r=20, t=20, b=20), 95 showlegend=True, 96 legend=dict( 97 orientation="v", 98 yanchor="top", 99 y=1, 100 xanchor="left", 101 x=1.02 102 ) 103 ) 104 105 st.plotly_chart(fig, use_container_width=True) 106 else: 107 st.info("No hourly status data available") 108 109 st.markdown("---") 110 111 # === Pipeline Status === 112 col1, col2 = st.columns([2, 1]) 113 114 with col1: 115 st.subheader("Pipeline Status Flow") 116 if not funnel.empty: 117 # Display excluded sites count 118 excluded = database.get_excluded_sites_count() 119 excluded_total = excluded['ignored'] + excluded['failing'] 120 if excluded_total > 0: 121 st.caption( 122 f"âšī¸ {excluded['ignored']:,} sites ignored (duplicates/directories/social media), " 123 f"{excluded['failing']:,} sites failing (need manual review)" 124 ) 125 126 fig = charts.create_funnel_chart( 127 funnel, "count", "status", "Sites by Pipeline Stage" 128 ) 129 st.plotly_chart(fig, use_container_width=True) 130 else: 131 st.info("No pipeline data available") 132 133 with col2: 134 st.subheader("Status Distribution") 135 if not funnel.empty: 136 # Use pie chart from pipeline_widgets (moved from Overview) 137 fig = pipeline_widgets.create_pie_chart(funnel, show_legend=False) 138 st.plotly_chart(fig, use_container_width=True) 139 else: 140 st.info("No data") 141 142 st.markdown("---") 143 144 # === Error Type Breakdown === 145 st.subheader("Most Common Errors") 146 if not errors.empty: 147 # Bar chart of top 10 errors 148 top_errors = errors.head(10) 149 fig = charts.create_bar_chart( 150 top_errors, "error_message", "count", "Top 10 Error Types" 151 ) 152 st.plotly_chart(fig, use_container_width=True) 153 154 # Full error table in collapsed section 155 with st.expander("đ View Full Error Details", expanded=False): 156 st.dataframe( 157 errors[["error_message", "stage", "count"]], 158 use_container_width=True, 159 hide_index=True, 160 ) 161 else: 162 st.success("â No errors found - all systems operational!") 163 164 st.markdown("---") 165 166 # === Sites Stuck by Error === 167 st.subheader("Sites Stuck with Errors") 168 if not stuck_sites.empty: 169 st.warning( 170 f"â ī¸ {len(stuck_sites)} sites have been stuck with errors for >1 day" 171 ) 172 173 # Create bar chart grouped by error message 174 import pandas as pd 175 stuck_by_error = stuck_sites.groupby("error_message").size().reset_index(name="count") 176 stuck_by_error = stuck_by_error.sort_values("count", ascending=False).head(10) 177 178 fig = charts.create_bar_chart( 179 stuck_by_error, "error_message", "count", "Top 10 Errors Causing Stuck Sites" 180 ) 181 st.plotly_chart(fig, use_container_width=True) 182 183 # Show stuck sites table in collapsed section 184 with st.expander("đ View Stuck Sites Details", expanded=False): 185 display_stuck = stuck_sites[ 186 ["domain", "stage", "error_message", "days_stuck"] 187 ].copy() 188 display_stuck["days_stuck"] = display_stuck["days_stuck"].round(1) 189 190 st.dataframe( 191 display_stuck, 192 use_container_width=True, 193 hide_index=True, 194 column_config={ 195 "days_stuck": st.column_config.NumberColumn( 196 "Days Stuck", format="%.1f" 197 ) 198 }, 199 ) 200 else: 201 st.success("â No stuck sites - pipeline is flowing smoothly!") 202 203 st.markdown("---") 204 205 st.markdown("---") 206 207 # === Status Breakdown (tree view with error reasons) === 208 st.subheader("Status Breakdown") 209 210 status_tree = database.get_status_tree() 211 outreach_tree = database.get_outreach_tree() 212 213 def fmt_delta(n): 214 if not n: 215 return "" 216 return f"+{n:,}" 217 218 def _expander_label(row): 219 label = f"**{row['status']}** â {row['total']:,}" 220 parts = [] 221 if row.get('delta_24h'): 222 parts.append(f"+{row['delta_24h']:,} 24h") 223 if row.get('delta_1h'): 224 parts.append(f"+{row['delta_1h']:,} 1h") 225 if parts: 226 label += f" ({' | '.join(parts)})" 227 return label 228 229 import pandas as pd 230 231 col_cfg = { 232 "label": "Category", 233 "total": st.column_config.NumberColumn("Total", format="%d"), 234 "delta_24h": st.column_config.TextColumn("Î24h"), 235 "delta_1h": st.column_config.TextColumn("Î1h"), 236 } 237 238 def render_error_children(children): 239 if not children: 240 return 241 if children.get('type') == 'channels': 242 rows = children.get('rows', []) 243 if rows: 244 df = pd.DataFrame([{ 245 "label": r['label'], 246 "total": r['total'], 247 "delta_24h": fmt_delta(r.get('delta_24h')), 248 "delta_1h": fmt_delta(r.get('delta_1h')), 249 } for r in rows]) 250 st.dataframe(df, use_container_width=True, hide_index=True, column_config=col_cfg) 251 elif children.get('type') == 'errors': 252 retriable = children.get('retriable', []) 253 terminal = children.get('terminal', []) 254 unknown = children.get('unknown', []) 255 if retriable: 256 st.markdown("**đ Retriable**") 257 df = pd.DataFrame([{ 258 "label": r['label'], 259 "total": r['total'], 260 "delta_24h": fmt_delta(r.get('delta_24h')), 261 "delta_1h": fmt_delta(r.get('delta_1h')), 262 } for r in retriable]) 263 st.dataframe(df, use_container_width=True, hide_index=True, column_config=col_cfg) 264 if terminal: 265 st.markdown("**â Terminal**") 266 df = pd.DataFrame([{ 267 "label": r['label'], 268 "total": r['total'], 269 "delta_24h": fmt_delta(r.get('delta_24h')), 270 "delta_1h": fmt_delta(r.get('delta_1h')), 271 } for r in terminal]) 272 st.dataframe(df, use_container_width=True, hide_index=True, column_config=col_cfg) 273 if unknown: 274 st.markdown("**â Unknown (unclassified)**") 275 df = pd.DataFrame([{ 276 "label": r['label'], 277 "total": r['total'], 278 "delta_24h": fmt_delta(r.get('delta_24h')), 279 "delta_1h": fmt_delta(r.get('delta_1h')), 280 } for r in unknown]) 281 st.dataframe(df, use_container_width=True, hide_index=True, column_config=col_cfg) 282 283 if status_tree: 284 for row in status_tree: 285 with st.expander(_expander_label(row), expanded=False): 286 if row.get('children'): 287 render_error_children(row['children']) 288 else: 289 st.caption("No sub-breakdown available for this status.") 290 else: 291 st.info("Status tree not yet computed â will appear after next precompute cron run (every 10 min).") 292 293 st.markdown("---") 294 st.subheader("Outreach Breakdown") 295 296 if outreach_tree: 297 for row in outreach_tree: 298 with st.expander(_expander_label(row), expanded=False): 299 if row.get('children'): 300 render_error_children(row['children']) 301 else: 302 st.caption("No sub-breakdown available for this status.") 303 else: 304 st.info("Outreach tree not yet computed â will appear after next precompute cron run (every 10 min).") 305 306 st.markdown("---") 307 308 # === Error Pattern Proposals (human review) === 309 proposals = database.get_error_proposals(status='pending') 310 if proposals: 311 st.subheader(f"đ Error Pattern Proposals ({len(proposals)} pending review)") 312 st.caption( 313 "The daily classifier found new uncategorized error messages and proposed regex patterns. " 314 "Review and approve/reject below â approved patterns will be added to error-categories.js via a developer task." 315 ) 316 for p in proposals: 317 with st.expander(f"[{p['context']}] {p['label']} â `{p['pattern']}` ({p['occurrence_count'] or 0:,} occurrences)", expanded=False): 318 st.markdown(f"**Group:** {'â Terminal' if p['group_name'] == 'terminal' else 'đ Retriable'}") 319 st.markdown(f"**Pattern:** `{p['pattern']}`") 320 if p.get('example_errors'): 321 st.markdown("**Example errors:**") 322 for ex in p['example_errors'][:5]: 323 st.code(ex, language=None) 324 col_a, col_b, _ = st.columns([1, 1, 4]) 325 with col_a: 326 if st.button("â Approve", key=f"approve_{p['id']}"): 327 if database.review_error_proposal(p['id'], 'approved'): 328 st.success("Approved â developer task queued") 329 st.rerun() 330 with col_b: 331 if st.button("â Reject", key=f"reject_{p['id']}"): 332 if database.review_error_proposal(p['id'], 'rejected'): 333 st.info("Rejected") 334 st.rerun() 335 336 st.markdown("---") 337 338 # === Failing Sites (Exceeded Retry Limits) === 339 st.subheader("Failing Sites (Exceeded Retry Limits)") 340 failing_sites = database.get_failing_sites() 341 if not failing_sites.empty: 342 st.error(f"đ¨ {len(failing_sites)} sites have exceeded retry limits and need manual intervention") 343 344 # Show failing sites table 345 display_failing = failing_sites[ 346 ["id", "domain", "error_message", "retry_count"] 347 ].copy() 348 349 st.dataframe( 350 display_failing, 351 use_container_width=True, 352 hide_index=True, 353 column_config={ 354 "id": st.column_config.NumberColumn("Site ID", format="%d"), 355 "domain": "Domain", 356 "error_message": "Error Message", 357 "retry_count": st.column_config.NumberColumn("Retry Count", format="%d"), 358 }, 359 ) 360 361 st.info( 362 "đĄ **Action Required**: These sites need manual review. " 363 "Update the record in the database to fix the issue or mark as 'ignore'." 364 ) 365 else: 366 st.success("â No failing sites - all retries within limits!") 367 368 except Exception as e: 369 log_exception(logger, f"Error loading pipeline data: {e}") 370 st.error(f"Error loading pipeline data: {e}")