/ dashboard / pages / 1_🔧_Pipeline.py
1_🔧_Pipeline.py
  1  """
  2  333 Method Analytics Dashboard - Pipeline Page
  3  """
  4  
  5  import logging
  6  import streamlit as st
  7  from streamlit_autorefresh import st_autorefresh
  8  from dashboard import config
  9  from dashboard.utils import database
 10  from dashboard.components import metrics, charts, filters, pipeline_widgets
 11  from dashboard.utils.logging_config import configure_app_logging, log_exception
 12  
 13  # Configure logging for this page
 14  logger = configure_app_logging("dashboard.pipeline")
 15  
 16  # Page configuration
 17  st.set_page_config(page_title="Pipeline", page_icon="🔧", layout="wide")
 18  
 19  # Auto-refresh
 20  st_autorefresh(interval=config.REFRESH_INTERVAL * 1000, key="pipeline_refresh")
 21  
 22  st.title("🔧 Pipeline")
 23  
 24  # === Filters ===
 25  with st.expander("Filters", expanded=False):
 26      col1, col2 = st.columns(2)
 27      with col1:
 28          start_date, end_date = filters.date_range_filter("pipeline")
 29      with col2:
 30          selected_statuses = filters.status_multiselect("pipeline")
 31  
 32  # === Key Metrics ===
 33  try:
 34      funnel = database.get_pipeline_funnel()
 35      errors = database.get_error_breakdown()
 36      stuck_sites = database.get_stuck_sites_by_error()
 37  
 38      total_sites = funnel["count"].sum() if not funnel.empty else 0
 39      total_errors = errors["count"].sum() if not errors.empty else 0
 40      stuck_count = len(stuck_sites) if not stuck_sites.empty else 0
 41  
 42      metrics.display_metric_grid(
 43          [
 44              {"label": "Total Sites", "value": f"{total_sites:,}"},
 45              {"label": "Active Errors", "value": f"{total_errors:,}"},
 46              {"label": "Stuck Sites", "value": f"{stuck_count:,}"},
 47          ]
 48      )
 49  
 50      st.markdown("---")
 51  
 52      # === Processing Throughput Charts ===
 53      col1, col2 = st.columns(2)
 54  
 55      with col1:
 56          st.subheader("Daily Processing Throughput")
 57          throughput = database.get_daily_throughput(30)
 58          if not throughput.empty:
 59              fig = charts.create_line_chart(
 60                  throughput, "date", "sites_added", "Sites Added Per Day (Last 30 Days)"
 61              )
 62              st.plotly_chart(fig, use_container_width=True)
 63          else:
 64              st.info("No throughput data available")
 65  
 66      with col2:
 67          st.subheader("Hourly Status Breakdown (48h)")
 68          status_breakdown = database.get_hourly_status_breakdown(48)
 69          if not status_breakdown.empty:
 70              # Get all status columns (exclude 'hour' column)
 71              status_cols = [col for col in status_breakdown.columns if col != 'hour']
 72  
 73              # Create stacked bar chart with status-based colors
 74              import plotly.graph_objects as go
 75              from dashboard import config
 76  
 77              fig = go.Figure()
 78  
 79              # Add a bar for each status
 80              for status in status_cols:
 81                  if status in status_breakdown.columns:
 82                      fig.add_trace(go.Bar(
 83                          name=status.replace('_', ' ').title(),
 84                          x=status_breakdown['hour'],
 85                          y=status_breakdown[status],
 86                          marker_color=config.STATUS_COLORS.get(status, '#999999')
 87                      ))
 88  
 89              fig.update_layout(
 90                  barmode='stack',
 91                  xaxis_title='Hour',
 92                  yaxis_title='Sites Processed',
 93                  height=400,
 94                  margin=dict(l=20, r=20, t=20, b=20),
 95                  showlegend=True,
 96                  legend=dict(
 97                      orientation="v",
 98                      yanchor="top",
 99                      y=1,
100                      xanchor="left",
101                      x=1.02
102                  )
103              )
104  
105              st.plotly_chart(fig, use_container_width=True)
106          else:
107              st.info("No hourly status data available")
108  
109      st.markdown("---")
110  
111      # === Pipeline Status ===
112      col1, col2 = st.columns([2, 1])
113  
114      with col1:
115          st.subheader("Pipeline Status Flow")
116          if not funnel.empty:
117              # Display excluded sites count
118              excluded = database.get_excluded_sites_count()
119              excluded_total = excluded['ignored'] + excluded['failing']
120              if excluded_total > 0:
121                  st.caption(
122                      f"â„šī¸ {excluded['ignored']:,} sites ignored (duplicates/directories/social media), "
123                      f"{excluded['failing']:,} sites failing (need manual review)"
124                  )
125  
126              fig = charts.create_funnel_chart(
127                  funnel, "count", "status", "Sites by Pipeline Stage"
128              )
129              st.plotly_chart(fig, use_container_width=True)
130          else:
131              st.info("No pipeline data available")
132  
133      with col2:
134          st.subheader("Status Distribution")
135          if not funnel.empty:
136              # Use pie chart from pipeline_widgets (moved from Overview)
137              fig = pipeline_widgets.create_pie_chart(funnel, show_legend=False)
138              st.plotly_chart(fig, use_container_width=True)
139          else:
140              st.info("No data")
141  
142      st.markdown("---")
143  
144      # === Error Type Breakdown ===
145      st.subheader("Most Common Errors")
146      if not errors.empty:
147          # Bar chart of top 10 errors
148          top_errors = errors.head(10)
149          fig = charts.create_bar_chart(
150              top_errors, "error_message", "count", "Top 10 Error Types"
151          )
152          st.plotly_chart(fig, use_container_width=True)
153  
154          # Full error table in collapsed section
155          with st.expander("📋 View Full Error Details", expanded=False):
156              st.dataframe(
157                  errors[["error_message", "stage", "count"]],
158                  use_container_width=True,
159                  hide_index=True,
160              )
161      else:
162          st.success("✅ No errors found - all systems operational!")
163  
164      st.markdown("---")
165  
166      # === Sites Stuck by Error ===
167      st.subheader("Sites Stuck with Errors")
168      if not stuck_sites.empty:
169          st.warning(
170              f"âš ī¸ {len(stuck_sites)} sites have been stuck with errors for >1 day"
171          )
172  
173          # Create bar chart grouped by error message
174          import pandas as pd
175          stuck_by_error = stuck_sites.groupby("error_message").size().reset_index(name="count")
176          stuck_by_error = stuck_by_error.sort_values("count", ascending=False).head(10)
177  
178          fig = charts.create_bar_chart(
179              stuck_by_error, "error_message", "count", "Top 10 Errors Causing Stuck Sites"
180          )
181          st.plotly_chart(fig, use_container_width=True)
182  
183          # Show stuck sites table in collapsed section
184          with st.expander("📋 View Stuck Sites Details", expanded=False):
185              display_stuck = stuck_sites[
186                  ["domain", "stage", "error_message", "days_stuck"]
187              ].copy()
188              display_stuck["days_stuck"] = display_stuck["days_stuck"].round(1)
189  
190              st.dataframe(
191                  display_stuck,
192                  use_container_width=True,
193                  hide_index=True,
194                  column_config={
195                      "days_stuck": st.column_config.NumberColumn(
196                          "Days Stuck", format="%.1f"
197                      )
198                  },
199              )
200      else:
201          st.success("✅ No stuck sites - pipeline is flowing smoothly!")
202  
203      st.markdown("---")
204  
205      st.markdown("---")
206  
207      # === Status Breakdown (tree view with error reasons) ===
208      st.subheader("Status Breakdown")
209  
210      status_tree = database.get_status_tree()
211      outreach_tree = database.get_outreach_tree()
212  
213      def fmt_delta(n):
214          if not n:
215              return ""
216          return f"+{n:,}"
217  
218      def _expander_label(row):
219          label = f"**{row['status']}** — {row['total']:,}"
220          parts = []
221          if row.get('delta_24h'):
222              parts.append(f"+{row['delta_24h']:,} 24h")
223          if row.get('delta_1h'):
224              parts.append(f"+{row['delta_1h']:,} 1h")
225          if parts:
226              label += f"  ({'  |  '.join(parts)})"
227          return label
228  
229      import pandas as pd
230  
231      col_cfg = {
232          "label": "Category",
233          "total": st.column_config.NumberColumn("Total", format="%d"),
234          "delta_24h": st.column_config.TextColumn("Δ24h"),
235          "delta_1h": st.column_config.TextColumn("Δ1h"),
236      }
237  
238      def render_error_children(children):
239          if not children:
240              return
241          if children.get('type') == 'channels':
242              rows = children.get('rows', [])
243              if rows:
244                  df = pd.DataFrame([{
245                      "label": r['label'],
246                      "total": r['total'],
247                      "delta_24h": fmt_delta(r.get('delta_24h')),
248                      "delta_1h": fmt_delta(r.get('delta_1h')),
249                  } for r in rows])
250                  st.dataframe(df, use_container_width=True, hide_index=True, column_config=col_cfg)
251          elif children.get('type') == 'errors':
252              retriable = children.get('retriable', [])
253              terminal = children.get('terminal', [])
254              unknown = children.get('unknown', [])
255              if retriable:
256                  st.markdown("**🔄 Retriable**")
257                  df = pd.DataFrame([{
258                      "label": r['label'],
259                      "total": r['total'],
260                      "delta_24h": fmt_delta(r.get('delta_24h')),
261                      "delta_1h": fmt_delta(r.get('delta_1h')),
262                  } for r in retriable])
263                  st.dataframe(df, use_container_width=True, hide_index=True, column_config=col_cfg)
264              if terminal:
265                  st.markdown("**⛔ Terminal**")
266                  df = pd.DataFrame([{
267                      "label": r['label'],
268                      "total": r['total'],
269                      "delta_24h": fmt_delta(r.get('delta_24h')),
270                      "delta_1h": fmt_delta(r.get('delta_1h')),
271                  } for r in terminal])
272                  st.dataframe(df, use_container_width=True, hide_index=True, column_config=col_cfg)
273              if unknown:
274                  st.markdown("**❓ Unknown (unclassified)**")
275                  df = pd.DataFrame([{
276                      "label": r['label'],
277                      "total": r['total'],
278                      "delta_24h": fmt_delta(r.get('delta_24h')),
279                      "delta_1h": fmt_delta(r.get('delta_1h')),
280                  } for r in unknown])
281                  st.dataframe(df, use_container_width=True, hide_index=True, column_config=col_cfg)
282  
283      if status_tree:
284          for row in status_tree:
285              with st.expander(_expander_label(row), expanded=False):
286                  if row.get('children'):
287                      render_error_children(row['children'])
288                  else:
289                      st.caption("No sub-breakdown available for this status.")
290      else:
291          st.info("Status tree not yet computed — will appear after next precompute cron run (every 10 min).")
292  
293      st.markdown("---")
294      st.subheader("Outreach Breakdown")
295  
296      if outreach_tree:
297          for row in outreach_tree:
298              with st.expander(_expander_label(row), expanded=False):
299                  if row.get('children'):
300                      render_error_children(row['children'])
301                  else:
302                      st.caption("No sub-breakdown available for this status.")
303      else:
304          st.info("Outreach tree not yet computed — will appear after next precompute cron run (every 10 min).")
305  
306      st.markdown("---")
307  
308      # === Error Pattern Proposals (human review) ===
309      proposals = database.get_error_proposals(status='pending')
310      if proposals:
311          st.subheader(f"🔍 Error Pattern Proposals ({len(proposals)} pending review)")
312          st.caption(
313              "The daily classifier found new uncategorized error messages and proposed regex patterns. "
314              "Review and approve/reject below — approved patterns will be added to error-categories.js via a developer task."
315          )
316          for p in proposals:
317              with st.expander(f"[{p['context']}] {p['label']} — `{p['pattern']}` ({p['occurrence_count'] or 0:,} occurrences)", expanded=False):
318                  st.markdown(f"**Group:** {'⛔ Terminal' if p['group_name'] == 'terminal' else '🔄 Retriable'}")
319                  st.markdown(f"**Pattern:** `{p['pattern']}`")
320                  if p.get('example_errors'):
321                      st.markdown("**Example errors:**")
322                      for ex in p['example_errors'][:5]:
323                          st.code(ex, language=None)
324                  col_a, col_b, _ = st.columns([1, 1, 4])
325                  with col_a:
326                      if st.button("✅ Approve", key=f"approve_{p['id']}"):
327                          if database.review_error_proposal(p['id'], 'approved'):
328                              st.success("Approved — developer task queued")
329                              st.rerun()
330                  with col_b:
331                      if st.button("❌ Reject", key=f"reject_{p['id']}"):
332                          if database.review_error_proposal(p['id'], 'rejected'):
333                              st.info("Rejected")
334                              st.rerun()
335  
336      st.markdown("---")
337  
338      # === Failing Sites (Exceeded Retry Limits) ===
339      st.subheader("Failing Sites (Exceeded Retry Limits)")
340      failing_sites = database.get_failing_sites()
341      if not failing_sites.empty:
342          st.error(f"🚨 {len(failing_sites)} sites have exceeded retry limits and need manual intervention")
343  
344          # Show failing sites table
345          display_failing = failing_sites[
346              ["id", "domain", "error_message", "retry_count"]
347          ].copy()
348  
349          st.dataframe(
350              display_failing,
351              use_container_width=True,
352              hide_index=True,
353              column_config={
354                  "id": st.column_config.NumberColumn("Site ID", format="%d"),
355                  "domain": "Domain",
356                  "error_message": "Error Message",
357                  "retry_count": st.column_config.NumberColumn("Retry Count", format="%d"),
358              },
359          )
360  
361          st.info(
362              "💡 **Action Required**: These sites need manual review. "
363              "Update the record in the database to fix the issue or mark as 'ignore'."
364          )
365      else:
366          st.success("✅ No failing sites - all retries within limits!")
367  
368  except Exception as e:
369      log_exception(logger, f"Error loading pipeline data: {e}")
370      st.error(f"Error loading pipeline data: {e}")