score_jobs.py
1 """Score-jobs pipeline — orchestration, ThreadPoolExecutor, checkpoint logic, file I/O.""" 2 3 __all__ = ["run"] 4 5 import json 6 import logging 7 import threading 8 from concurrent.futures import ThreadPoolExecutor, as_completed 9 from pathlib import Path 10 from typing import Any 11 12 import pandas as pd # type: ignore[import-untyped] 13 from openpyxl import load_workbook # type: ignore[import-untyped] 14 from openpyxl.styles import Alignment # type: ignore[import-untyped] 15 from openpyxl.utils import get_column_letter # type: ignore[import-untyped] 16 from pydantic import ValidationError 17 from tqdm import tqdm # type: ignore[import-untyped] 18 19 from config import settings 20 from exceptions import APIError, ParseError 21 from integrations.instrumented import InstrumentedProvider 22 from integrations.llm import create_llm_provider 23 from models.job import FitCategory, JobListing, ScoredJob 24 from services.job_scorer import JobScorerService 25 from services.run_summary import print_summary 26 from telemetry import RunStats 27 28 logger = logging.getLogger(__name__) 29 30 # Ordered column list for the Excel output. Columns absent from the frame are skipped. 31 _XLSX_COLUMNS = [ 32 "OK", 33 "Score", 34 "Fit Category", 35 "Company", 36 "Title", 37 "Job Location", 38 "AI Reasoning", 39 "URL", 40 "job_posting_id", 41 "Net Opportunity", 42 "Net Rationale", 43 "Company LinkedIn", 44 ] 45 46 # Max auto-fit width for normal columns. 47 _XLSX_MAX_COL_WIDTH = 50 48 # Extra chars added to measured content width. 49 _XLSX_COL_PADDING = 2 50 # Fallback width for empty columns. 51 _XLSX_MIN_COL_WIDTH = 10 52 # Columns with explicit fixed widths (all wrap text). 53 _XLSX_FIXED_WIDTHS: dict[str, int] = { 54 "Title": 17, 55 "Job Location": 25, 56 "AI Reasoning": 45, 57 "Net Rationale": 37, 58 } 59 # Columns sized to header only, no text wrapping (URLs are not meant to be read). 60 # URL also skips wrapping but has its own fixed width below. 61 _XLSX_HEADER_ONLY_COLS = {"Company LinkedIn"} 62 # URL gets a small fixed width but no wrapping. 63 _XLSX_URL_WIDTH = 10 64 # Columns that skip text wrapping. 65 _XLSX_NO_WRAP_COLS = _XLSX_HEADER_ONLY_COLS | {"URL"} 66 67 68 def _get_job_id(job: JobListing) -> str: 69 """Return a stable identifier for a job listing. 70 71 Priority: job_posting_id > url > "{job_title}|{company_name}". 72 73 Args: 74 job: Parsed job listing or scored job. 75 76 Returns: 77 String identifier. 78 """ 79 if job.job_posting_id: 80 return job.job_posting_id 81 if job.url: 82 return job.url 83 return f"{job.job_title}|{job.company_name}" 84 85 86 def _load_checkpoint(path: Path) -> tuple[list[ScoredJob], set[str]]: 87 """Load a checkpoint file and return (results, processed_ids). 88 89 Error entries (fit_category=ERROR) are excluded from both so they are 90 re-scored in the current run. 91 92 Args: 93 path: Path to the checkpoint JSON file. 94 95 Returns: 96 Tuple of (scored_jobs, processed_ids) where scored_jobs excludes 97 entries with fit_category=ERROR. 98 99 Raises: 100 json.JSONDecodeError: If the checkpoint file is corrupt. 101 ValidationError: If the checkpoint schema has changed. 102 """ 103 existing = json.loads(path.read_text(encoding="utf-8")) 104 all_entries = [ScoredJob.model_validate(e) for e in existing] 105 results = [e for e in all_entries if e.fit_category != FitCategory.ERROR] 106 processed_ids = {_get_job_id(r) for r in results} 107 retrying = len(all_entries) - len(results) 108 if retrying: 109 logger.info("Re-queuing %d error entries for re-scoring.", retrying) 110 return results, processed_ids 111 112 113 def _to_excel_row(job: ScoredJob) -> dict[str, Any]: 114 """Map a ScoredJob to a human-readable Excel row dict. 115 116 Args: 117 job: Scored job to convert. 118 119 Returns: 120 Dict with display-friendly column names for Excel output. 121 """ 122 return { 123 "OK": "", 124 "Score": job.score, 125 "Fit Category": job.fit_category.value, 126 "Company": job.company_name, 127 "Title": job.job_title, 128 "Job Location": job.job_location, 129 "AI Reasoning": job.reasoning, 130 "URL": job.url, 131 "job_posting_id": job.job_posting_id, 132 "Net Opportunity": job.networking_opportunity, 133 "Net Rationale": job.networking_rationale, 134 "Company LinkedIn": job.company_url, 135 } 136 137 138 def _format_xlsx(xlsx_path: Path) -> None: 139 """Auto-fit column widths and enable text wrapping on an Excel file. 140 141 AI Reasoning gets a fixed width; all other columns are auto-fitted to 142 content up to a maximum. Text wrapping is enabled so row height adapts. 143 144 Args: 145 xlsx_path: Path to the Excel file to format in place. 146 """ 147 wb = load_workbook(xlsx_path) 148 ws = wb.active 149 wrap = Alignment(wrap_text=True, vertical="top") 150 151 for col_idx, col_cells in enumerate(ws.iter_cols(min_row=1, max_row=ws.max_row), start=1): 152 header = col_cells[0].value 153 if header in _XLSX_FIXED_WIDTHS: 154 width = _XLSX_FIXED_WIDTHS[header] 155 elif header == "URL": 156 width = _XLSX_URL_WIDTH 157 elif header in _XLSX_HEADER_ONLY_COLS: 158 # Size to header text only; content stays inline (no wrap). 159 width = len(str(header)) + _XLSX_COL_PADDING 160 else: 161 max_len = max( 162 (len(str(cell.value)) for cell in col_cells if cell.value is not None), 163 default=_XLSX_MIN_COL_WIDTH, 164 ) 165 width = min(max_len + _XLSX_COL_PADDING, _XLSX_MAX_COL_WIDTH) 166 167 ws.column_dimensions[get_column_letter(col_idx)].width = width 168 169 if header not in _XLSX_NO_WRAP_COLS: 170 for cell in col_cells[1:]: # Skip header. 171 cell.alignment = wrap 172 173 wb.save(xlsx_path) 174 175 176 def _save_outputs( 177 results: list[ScoredJob], 178 json_path: Path, 179 xlsx_path: Path, 180 ) -> None: 181 """Save scored results to JSON and Excel. 182 183 Args: 184 results: List of scored jobs. 185 json_path: Destination for the full JSON output. 186 xlsx_path: Destination for the filtered Excel output. 187 """ 188 with json_path.open("w", encoding="utf-8") as f: 189 json.dump( 190 [{**job.raw, **job.model_dump()} for job in results], f, indent=2, ensure_ascii=False 191 ) 192 193 try: 194 df = pd.DataFrame([_to_excel_row(job) for job in results]) 195 df = df.sort_values("Score", ascending=False) 196 available_cols = [c for c in _XLSX_COLUMNS if c in df.columns] 197 df[available_cols].to_excel(xlsx_path, engine="openpyxl", index=False) 198 _format_xlsx(xlsx_path) 199 except OSError as exc: 200 logger.warning("Could not save Excel (file may be locked): %s", exc) 201 202 203 def run( 204 input_path: Path, 205 output_json: Path, 206 output_xlsx: Path, 207 persona_path: Path, 208 system_prompt_path: Path, 209 user_template_path: Path, 210 provider_override: str | None, 211 model_override: str | None, 212 limit: int | None, 213 force: bool, 214 workers: int, 215 ) -> tuple[int, int]: 216 """Run the score-jobs pipeline. 217 218 Args: 219 input_path: Path to the input JSON file of job listings. 220 output_json: Destination path for the full scored JSON output. 221 output_xlsx: Destination path for the filtered Excel output. 222 persona_path: Path to the candidate persona text file. 223 system_prompt_path: Path to the LLM system prompt text file. 224 user_template_path: Path to the Jinja2 user message template file. 225 provider_override: Provider name from CLI, or None to use settings. 226 model_override: Model ID from CLI, or None to use settings. 227 limit: If set, score only this many unscored jobs (testing mode). 228 force: If True, ignore any existing checkpoint and restart from scratch. 229 workers: Number of parallel threads. 230 231 Returns: 232 Tuple of (total_scored, new_this_run). 233 234 Raises: 235 ParseError: If persona, system prompt, or user template files are not found. 236 ConfigurationError: If the required API key is missing. 237 APIError: If the LLM provider returns a non-retriable HTTP error. 238 """ 239 # 1. Load persona 240 if not persona_path.exists(): 241 raise ParseError("persona", f"File not found: {persona_path}") 242 persona_text = persona_path.read_text(encoding="utf-8") 243 244 # 2. Load system prompt 245 if not system_prompt_path.exists(): 246 raise ParseError("system_prompt", f"File not found: {system_prompt_path}") 247 system_prompt_text = system_prompt_path.read_text(encoding="utf-8") 248 249 # 3. Load user template 250 if not user_template_path.exists(): 251 raise ParseError("user_template", f"File not found: {user_template_path}") 252 user_template = user_template_path.read_text(encoding="utf-8") 253 254 # 4. Resolve provider, model, temperature, seed, and max_tokens 255 ( 256 default_provider, default_model, effective_temperature, effective_seed, effective_max_tokens 257 ) = settings.get_llm_config("score_jobs") 258 effective_provider = provider_override or default_provider 259 effective_model = model_override or default_model 260 261 # 5. Construct provider 262 stats = RunStats() 263 raw_provider = create_llm_provider( 264 effective_provider, effective_model, settings, max_tokens=effective_max_tokens 265 ) 266 provider = InstrumentedProvider(raw_provider, stats, phase="main") 267 268 # 5b. Live probe — verify provider + model before scoring 269 logger.info("Probing provider=%s model=%s ...", effective_provider, effective_model) 270 try: 271 provider.ping(temperature=effective_temperature) 272 except APIError as exc: 273 logger.critical( 274 "Provider probe failed (%s %s): %s — check provider and model in config.toml", 275 effective_provider, 276 effective_model, 277 exc, 278 ) 279 raise 280 logger.info("Provider probe OK.") 281 282 # 6. Instantiate scorer service 283 service = JobScorerService( 284 provider, 285 system_prompt_text, 286 user_template, 287 persona_text, 288 temperature=effective_temperature, 289 seed=effective_seed, 290 ) 291 292 # 7. Load all jobs 293 all_jobs: list[JobListing] = [ 294 JobListing.model_validate(e) 295 for e in json.loads(input_path.read_text(encoding="utf-8")) 296 ] 297 298 # 8. Resume logic 299 results: list[ScoredJob] = [] 300 processed_ids: set[str] = set() 301 302 if output_json.exists() and not force: 303 try: 304 results, processed_ids = _load_checkpoint(output_json) 305 except (json.JSONDecodeError, ValidationError) as exc: 306 logger.warning( 307 "Checkpoint at %s is unreadable (%s). Use --force to restart from scratch.", 308 output_json, 309 exc, 310 ) 311 raise 312 313 # 9. Filter to unscored jobs 314 jobs_to_score = [j for j in all_jobs if _get_job_id(j) not in processed_ids] 315 if limit is not None: 316 jobs_to_score = jobs_to_score[:limit] 317 318 # 10. Nothing to score 319 if not jobs_to_score: 320 _save_outputs(results, output_json, output_xlsx) 321 stats.jobs_processed = len(results) 322 stats.jobs_skipped = len(results) 323 stats.jobs_errored = 0 324 stats.finish() 325 try: 326 print_summary(stats, pipeline="score_jobs", phase_models={"main": effective_model}) 327 except Exception: 328 logger.warning("Failed to print run summary", exc_info=True) 329 return len(results), 0 330 331 # 11. Threading lock 332 results_lock = threading.Lock() 333 new_results: list[ScoredJob] = [] 334 total = len(jobs_to_score) 335 336 # 12. Parallel scoring 337 with ThreadPoolExecutor(max_workers=workers) as executor: 338 futures = {executor.submit(service.score_job, job): job for job in jobs_to_score} 339 with tqdm(total=total, unit="job") as progress: 340 for future in as_completed(futures): 341 result: ScoredJob = future.result() 342 with results_lock: 343 results.append(result) 344 new_results.append(result) 345 progress.update(1) 346 progress.set_postfix_str(result.job_title[:40]) 347 if len(new_results) % settings.score_batch_size == 0: 348 _save_outputs(results, output_json, output_xlsx) 349 logger.info("Checkpoint: %d total jobs saved.", len(results)) 350 351 # 13. Final save 352 _save_outputs(results, output_json, output_xlsx) 353 354 # 14. Run summary 355 error_count = sum(1 for r in new_results if r.fit_category == FitCategory.ERROR) 356 cached_count = len(results) - len(new_results) 357 stats.jobs_processed = len(results) - error_count 358 stats.jobs_skipped = cached_count 359 stats.jobs_errored = error_count 360 stats.finish() 361 try: 362 print_summary(stats, pipeline="score_jobs", phase_models={"main": effective_model}) 363 except Exception: 364 logger.warning("Failed to print run summary", exc_info=True) 365 366 # 15. Return counts 367 return len(results), len(new_results)