run_summary.py
1 """Run summary printer — structured one-screen summary after each pipeline run.""" 2 3 __all__ = ["print_summary"] 4 5 import logging 6 from datetime import datetime 7 8 from config import settings 9 from telemetry import RunStats 10 11 logger = logging.getLogger(__name__) 12 13 # Cost formula: (tokens * price_per_million) / 1_000_000 14 _PER_MILLION = 1_000_000 15 16 17 def _compute_phase_cost( 18 phase_name: str, 19 input_tokens: int, 20 output_tokens: int, 21 phase_models: dict[str, str], 22 ) -> float | None: 23 """Compute cost for a single phase, or None if unpriced.""" 24 model = phase_models.get(phase_name) 25 if model is None: 26 return None 27 pricing = settings.model_pricing.get(model) 28 if pricing is None: 29 return None 30 price_in, price_out = pricing 31 return (input_tokens * price_in + output_tokens * price_out) / _PER_MILLION 32 33 34 def _format_cost(cost: float) -> str: 35 """Format cost as (~$X.XX).""" 36 return f"(~${cost:.2f})" 37 38 39 def _derive_cached_count(pipeline: str, stats: RunStats) -> int: 40 """Derive the number of fully-cached jobs from RunStats phase data.""" 41 if pipeline == "score_jobs": 42 return stats.jobs_skipped 43 if pipeline == "coach_cv": 44 # min of all phase cache hits = jobs where every phase was a cache hit. 45 # Today extract and analyze are coupled (always equal), but using all 46 # three is safe if they're ever decoupled. 47 phase_hits = [ 48 ps.cache_hits 49 for name in ("extract", "analyze", "tailor") 50 if (ps := stats.phases.get(name)) is not None 51 ] 52 return min(phase_hits) if phase_hits else 0 53 return 0 54 55 56 def _build_summary( 57 stats: RunStats, 58 *, 59 pipeline: str, 60 phase_models: dict[str, str] | None = None, 61 ) -> str: 62 """Build the summary string for stdout and file export.""" 63 lines: list[str] = ["Run summary"] 64 pm = phase_models or {} 65 66 # --- Jobs line --- 67 if pipeline == "import_jobs": 68 new = stats.jobs_processed - stats.jobs_skipped 69 lines.append( 70 f" Jobs: fetched {stats.jobs_processed:,} / new {new:,} " 71 f"/ duplicates {stats.jobs_skipped:,} / errored {stats.jobs_errored:,}" 72 ) 73 elif pipeline in ("score_jobs", "coach_cv"): 74 label = "coached" if pipeline == "coach_cv" else "processed" 75 cached = _derive_cached_count(pipeline, stats) 76 cached_str = f" ({cached:,} cached)" if cached > 0 else "" 77 lines.append( 78 f" Jobs: {label} {stats.jobs_processed:,}{cached_str}" 79 f" / errored {stats.jobs_errored:,}" 80 ) 81 # parse_cv: no Jobs line 82 83 # --- Multi-phase breakdown (coach_cv) --- 84 multi_phase = len(stats.phases) > 1 85 if multi_phase: 86 lines.append(" cache tokens (per phase)") 87 all_priced = True 88 phase_costs: dict[str, float] = {} 89 90 for phase_name in ("extract", "analyze", "tailor"): 91 ps = stats.phases.get(phase_name) 92 if ps is None: 93 continue 94 total_lookups = ps.cache_hits + ps.cache_misses 95 cache_str = f"{ps.cache_hits}/{total_lookups} hits" 96 97 cost = _compute_phase_cost(phase_name, ps.input_tokens, ps.output_tokens, pm) 98 if cost is not None: 99 phase_costs[phase_name] = cost 100 else: 101 all_priced = False 102 103 cost_str = f" {_format_cost(cost)}" if cost is not None else "" 104 lines.append( 105 f" {phase_name.capitalize():8s} {cache_str:16s}| " 106 f"{ps.input_tokens:,} in / {ps.output_tokens:>6,} out{cost_str}" 107 ) 108 109 # Separator + Total 110 lines.append(" " + "\u2500" * 56) 111 total_cost_str = "" 112 if all_priced and phase_costs: 113 total_cost_str = f" {_format_cost(sum(phase_costs.values()))}" 114 lines.append( 115 f" {'Total':8s} | " 116 f"{stats.total_input_tokens:,} in / {stats.total_output_tokens:>6,} out" 117 f"{total_cost_str}" 118 ) 119 120 # --- Single-phase token line --- 121 elif stats.total_input_tokens > 0: 122 cost_str = "" 123 if pm: 124 phase_name = next(iter(stats.phases), "main") 125 ps = stats.phases.get(phase_name) 126 if ps: 127 cost = _compute_phase_cost( 128 phase_name, ps.input_tokens, ps.output_tokens, pm 129 ) 130 if cost is not None: 131 cost_str = f" {_format_cost(cost)}" 132 lines.append( 133 f" Tokens: {stats.total_input_tokens:,} in / " 134 f"{stats.total_output_tokens:,} out{cost_str}" 135 ) 136 137 # --- Time line --- 138 wall = stats.wall_clock_s 139 time_str = f" Time: {wall:.0f}s" 140 if stats.llm_calls > 0 and stats.avg_llm_duration_s is not None: 141 avg = stats.avg_llm_duration_s 142 time_str += f" (avg {avg:.1f}s/LLM call, {stats.llm_calls:,} call" 143 time_str += "s)" if stats.llm_calls != 1 else ")" 144 if stats.llm_errors > 0: 145 time_str += ( 146 f" ({stats.llm_errors:,} error" 147 + ("s" if stats.llm_errors != 1 else "") 148 + f", {stats.llm_error_duration_s:.1f}s spent on retries)" 149 ) 150 lines.append(time_str) 151 152 # --- Disclaimer --- 153 if stats.total_input_tokens > 0: 154 lines.append( 155 " Note: token counts and costs are estimates. " 156 "Check your provider dashboard for official usage." 157 ) 158 159 return "\n".join(lines) 160 161 162 def _append_to_log(summary: str, pipeline: str) -> None: 163 """Append the summary to the rolling log file.""" 164 log_path = settings.output_dir / "run_history.log" 165 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M") 166 entry = f"--- {timestamp} {pipeline} ---\n{summary}\n\n" 167 try: 168 log_path.parent.mkdir(parents=True, exist_ok=True) 169 with log_path.open("a", encoding="utf-8") as f: 170 f.write(entry) 171 except OSError: 172 logger.warning("Failed to write run summary to %s", log_path, exc_info=True) 173 174 175 def print_summary( 176 stats: RunStats, 177 *, 178 pipeline: str, 179 phase_models: dict[str, str] | None = None, 180 ) -> None: 181 """Print a structured run summary to stdout and append to log file. 182 183 Args: 184 stats: Accumulated metrics for the pipeline run. 185 pipeline: One of "parse_cv", "score_jobs", "coach_cv", "import_jobs". 186 phase_models: Maps phase names to model IDs for cost lookup. 187 When None or empty, cost display is suppressed. 188 """ 189 summary = _build_summary(stats, pipeline=pipeline, phase_models=phase_models) 190 print(summary) # noqa: T201 191 _append_to_log(summary, pipeline)