/ services / run_summary.py
run_summary.py
  1  """Run summary printer — structured one-screen summary after each pipeline run."""
  2  
  3  __all__ = ["print_summary"]
  4  
  5  import logging
  6  from datetime import datetime
  7  
  8  from config import settings
  9  from telemetry import RunStats
 10  
 11  logger = logging.getLogger(__name__)
 12  
 13  # Cost formula: (tokens * price_per_million) / 1_000_000
 14  _PER_MILLION = 1_000_000
 15  
 16  
 17  def _compute_phase_cost(
 18      phase_name: str,
 19      input_tokens: int,
 20      output_tokens: int,
 21      phase_models: dict[str, str],
 22  ) -> float | None:
 23      """Compute cost for a single phase, or None if unpriced."""
 24      model = phase_models.get(phase_name)
 25      if model is None:
 26          return None
 27      pricing = settings.model_pricing.get(model)
 28      if pricing is None:
 29          return None
 30      price_in, price_out = pricing
 31      return (input_tokens * price_in + output_tokens * price_out) / _PER_MILLION
 32  
 33  
 34  def _format_cost(cost: float) -> str:
 35      """Format cost as (~$X.XX)."""
 36      return f"(~${cost:.2f})"
 37  
 38  
 39  def _derive_cached_count(pipeline: str, stats: RunStats) -> int:
 40      """Derive the number of fully-cached jobs from RunStats phase data."""
 41      if pipeline == "score_jobs":
 42          return stats.jobs_skipped
 43      if pipeline == "coach_cv":
 44          # min of all phase cache hits = jobs where every phase was a cache hit.
 45          # Today extract and analyze are coupled (always equal), but using all
 46          # three is safe if they're ever decoupled.
 47          phase_hits = [
 48              ps.cache_hits
 49              for name in ("extract", "analyze", "tailor")
 50              if (ps := stats.phases.get(name)) is not None
 51          ]
 52          return min(phase_hits) if phase_hits else 0
 53      return 0
 54  
 55  
 56  def _build_summary(
 57      stats: RunStats,
 58      *,
 59      pipeline: str,
 60      phase_models: dict[str, str] | None = None,
 61  ) -> str:
 62      """Build the summary string for stdout and file export."""
 63      lines: list[str] = ["Run summary"]
 64      pm = phase_models or {}
 65  
 66      # --- Jobs line ---
 67      if pipeline == "import_jobs":
 68          new = stats.jobs_processed - stats.jobs_skipped
 69          lines.append(
 70              f"  Jobs:    fetched {stats.jobs_processed:,} / new {new:,} "
 71              f"/ duplicates {stats.jobs_skipped:,} / errored {stats.jobs_errored:,}"
 72          )
 73      elif pipeline in ("score_jobs", "coach_cv"):
 74          label = "coached" if pipeline == "coach_cv" else "processed"
 75          cached = _derive_cached_count(pipeline, stats)
 76          cached_str = f" ({cached:,} cached)" if cached > 0 else ""
 77          lines.append(
 78              f"  Jobs:    {label} {stats.jobs_processed:,}{cached_str}"
 79              f" / errored {stats.jobs_errored:,}"
 80          )
 81      # parse_cv: no Jobs line
 82  
 83      # --- Multi-phase breakdown (coach_cv) ---
 84      multi_phase = len(stats.phases) > 1
 85      if multi_phase:
 86          lines.append("            cache                    tokens (per phase)")
 87          all_priced = True
 88          phase_costs: dict[str, float] = {}
 89  
 90          for phase_name in ("extract", "analyze", "tailor"):
 91              ps = stats.phases.get(phase_name)
 92              if ps is None:
 93                  continue
 94              total_lookups = ps.cache_hits + ps.cache_misses
 95              cache_str = f"{ps.cache_hits}/{total_lookups} hits"
 96  
 97              cost = _compute_phase_cost(phase_name, ps.input_tokens, ps.output_tokens, pm)
 98              if cost is not None:
 99                  phase_costs[phase_name] = cost
100              else:
101                  all_priced = False
102  
103              cost_str = f"  {_format_cost(cost)}" if cost is not None else ""
104              lines.append(
105                  f"  {phase_name.capitalize():8s}  {cache_str:16s}|  "
106                  f"{ps.input_tokens:,} in / {ps.output_tokens:>6,} out{cost_str}"
107              )
108  
109          # Separator + Total
110          lines.append("  " + "\u2500" * 56)
111          total_cost_str = ""
112          if all_priced and phase_costs:
113              total_cost_str = f"  {_format_cost(sum(phase_costs.values()))}"
114          lines.append(
115              f"  {'Total':8s}                    |  "
116              f"{stats.total_input_tokens:,} in / {stats.total_output_tokens:>6,} out"
117              f"{total_cost_str}"
118          )
119  
120      # --- Single-phase token line ---
121      elif stats.total_input_tokens > 0:
122          cost_str = ""
123          if pm:
124              phase_name = next(iter(stats.phases), "main")
125              ps = stats.phases.get(phase_name)
126              if ps:
127                  cost = _compute_phase_cost(
128                      phase_name, ps.input_tokens, ps.output_tokens, pm
129                  )
130                  if cost is not None:
131                      cost_str = f"  {_format_cost(cost)}"
132          lines.append(
133              f"  Tokens:  {stats.total_input_tokens:,} in / "
134              f"{stats.total_output_tokens:,} out{cost_str}"
135          )
136  
137      # --- Time line ---
138      wall = stats.wall_clock_s
139      time_str = f"  Time:    {wall:.0f}s"
140      if stats.llm_calls > 0 and stats.avg_llm_duration_s is not None:
141          avg = stats.avg_llm_duration_s
142          time_str += f"  (avg {avg:.1f}s/LLM call, {stats.llm_calls:,} call"
143          time_str += "s)" if stats.llm_calls != 1 else ")"
144      if stats.llm_errors > 0:
145          time_str += (
146              f"  ({stats.llm_errors:,} error"
147              + ("s" if stats.llm_errors != 1 else "")
148              + f", {stats.llm_error_duration_s:.1f}s spent on retries)"
149          )
150      lines.append(time_str)
151  
152      # --- Disclaimer ---
153      if stats.total_input_tokens > 0:
154          lines.append(
155              "  Note: token counts and costs are estimates. "
156              "Check your provider dashboard for official usage."
157          )
158  
159      return "\n".join(lines)
160  
161  
162  def _append_to_log(summary: str, pipeline: str) -> None:
163      """Append the summary to the rolling log file."""
164      log_path = settings.output_dir / "run_history.log"
165      timestamp = datetime.now().strftime("%Y-%m-%d %H:%M")
166      entry = f"--- {timestamp} {pipeline} ---\n{summary}\n\n"
167      try:
168          log_path.parent.mkdir(parents=True, exist_ok=True)
169          with log_path.open("a", encoding="utf-8") as f:
170              f.write(entry)
171      except OSError:
172          logger.warning("Failed to write run summary to %s", log_path, exc_info=True)
173  
174  
175  def print_summary(
176      stats: RunStats,
177      *,
178      pipeline: str,
179      phase_models: dict[str, str] | None = None,
180  ) -> None:
181      """Print a structured run summary to stdout and append to log file.
182  
183      Args:
184          stats: Accumulated metrics for the pipeline run.
185          pipeline: One of "parse_cv", "score_jobs", "coach_cv", "import_jobs".
186          phase_models: Maps phase names to model IDs for cost lookup.
187              When None or empty, cost display is suppressed.
188      """
189      summary = _build_summary(stats, pipeline=pipeline, phase_models=phase_models)
190      print(summary)  # noqa: T201
191      _append_to_log(summary, pipeline)