/ pipeline / score_jobs.py
score_jobs.py
  1  """Score-jobs pipeline — orchestration, ThreadPoolExecutor, checkpoint logic, file I/O."""
  2  
  3  __all__ = ["run"]
  4  
  5  import json
  6  import logging
  7  import threading
  8  from concurrent.futures import ThreadPoolExecutor, as_completed
  9  from pathlib import Path
 10  from typing import Any
 11  
 12  import pandas as pd  # type: ignore[import-untyped]
 13  from openpyxl import load_workbook  # type: ignore[import-untyped]
 14  from openpyxl.styles import Alignment  # type: ignore[import-untyped]
 15  from openpyxl.utils import get_column_letter  # type: ignore[import-untyped]
 16  from pydantic import ValidationError
 17  from tqdm import tqdm  # type: ignore[import-untyped]
 18  
 19  from config import settings
 20  from exceptions import APIError, ParseError
 21  from integrations.instrumented import InstrumentedProvider
 22  from integrations.llm import create_llm_provider
 23  from models.job import FitCategory, JobListing, ScoredJob
 24  from services.job_scorer import JobScorerService
 25  from services.run_summary import print_summary
 26  from telemetry import RunStats
 27  
 28  logger = logging.getLogger(__name__)
 29  
 30  # Ordered column list for the Excel output. Columns absent from the frame are skipped.
 31  _XLSX_COLUMNS = [
 32      "OK",
 33      "Score",
 34      "Fit Category",
 35      "Company",
 36      "Title",
 37      "Job Location",
 38      "AI Reasoning",
 39      "URL",
 40      "job_posting_id",
 41      "Net Opportunity",
 42      "Net Rationale",
 43      "Company LinkedIn",
 44  ]
 45  
 46  # Max auto-fit width for normal columns.
 47  _XLSX_MAX_COL_WIDTH = 50
 48  # Extra chars added to measured content width.
 49  _XLSX_COL_PADDING = 2
 50  # Fallback width for empty columns.
 51  _XLSX_MIN_COL_WIDTH = 10
 52  # Columns with explicit fixed widths (all wrap text).
 53  _XLSX_FIXED_WIDTHS: dict[str, int] = {
 54      "Title": 17,
 55      "Job Location": 25,
 56      "AI Reasoning": 45,
 57      "Net Rationale": 37,
 58  }
 59  # Columns sized to header only, no text wrapping (URLs are not meant to be read).
 60  # URL also skips wrapping but has its own fixed width below.
 61  _XLSX_HEADER_ONLY_COLS = {"Company LinkedIn"}
 62  # URL gets a small fixed width but no wrapping.
 63  _XLSX_URL_WIDTH = 10
 64  # Columns that skip text wrapping.
 65  _XLSX_NO_WRAP_COLS = _XLSX_HEADER_ONLY_COLS | {"URL"}
 66  
 67  
 68  def _get_job_id(job: JobListing) -> str:
 69      """Return a stable identifier for a job listing.
 70  
 71      Priority: job_posting_id > url > "{job_title}|{company_name}".
 72  
 73      Args:
 74          job: Parsed job listing or scored job.
 75  
 76      Returns:
 77          String identifier.
 78      """
 79      if job.job_posting_id:
 80          return job.job_posting_id
 81      if job.url:
 82          return job.url
 83      return f"{job.job_title}|{job.company_name}"
 84  
 85  
 86  def _load_checkpoint(path: Path) -> tuple[list[ScoredJob], set[str]]:
 87      """Load a checkpoint file and return (results, processed_ids).
 88  
 89      Error entries (fit_category=ERROR) are excluded from both so they are
 90      re-scored in the current run.
 91  
 92      Args:
 93          path: Path to the checkpoint JSON file.
 94  
 95      Returns:
 96          Tuple of (scored_jobs, processed_ids) where scored_jobs excludes
 97          entries with fit_category=ERROR.
 98  
 99      Raises:
100          json.JSONDecodeError: If the checkpoint file is corrupt.
101          ValidationError: If the checkpoint schema has changed.
102      """
103      existing = json.loads(path.read_text(encoding="utf-8"))
104      all_entries = [ScoredJob.model_validate(e) for e in existing]
105      results = [e for e in all_entries if e.fit_category != FitCategory.ERROR]
106      processed_ids = {_get_job_id(r) for r in results}
107      retrying = len(all_entries) - len(results)
108      if retrying:
109          logger.info("Re-queuing %d error entries for re-scoring.", retrying)
110      return results, processed_ids
111  
112  
113  def _to_excel_row(job: ScoredJob) -> dict[str, Any]:
114      """Map a ScoredJob to a human-readable Excel row dict.
115  
116      Args:
117          job: Scored job to convert.
118  
119      Returns:
120          Dict with display-friendly column names for Excel output.
121      """
122      return {
123          "OK": "",
124          "Score": job.score,
125          "Fit Category": job.fit_category.value,
126          "Company": job.company_name,
127          "Title": job.job_title,
128          "Job Location": job.job_location,
129          "AI Reasoning": job.reasoning,
130          "URL": job.url,
131          "job_posting_id": job.job_posting_id,
132          "Net Opportunity": job.networking_opportunity,
133          "Net Rationale": job.networking_rationale,
134          "Company LinkedIn": job.company_url,
135      }
136  
137  
138  def _format_xlsx(xlsx_path: Path) -> None:
139      """Auto-fit column widths and enable text wrapping on an Excel file.
140  
141      AI Reasoning gets a fixed width; all other columns are auto-fitted to
142      content up to a maximum. Text wrapping is enabled so row height adapts.
143  
144      Args:
145          xlsx_path: Path to the Excel file to format in place.
146      """
147      wb = load_workbook(xlsx_path)
148      ws = wb.active
149      wrap = Alignment(wrap_text=True, vertical="top")
150  
151      for col_idx, col_cells in enumerate(ws.iter_cols(min_row=1, max_row=ws.max_row), start=1):
152          header = col_cells[0].value
153          if header in _XLSX_FIXED_WIDTHS:
154              width = _XLSX_FIXED_WIDTHS[header]
155          elif header == "URL":
156              width = _XLSX_URL_WIDTH
157          elif header in _XLSX_HEADER_ONLY_COLS:
158              # Size to header text only; content stays inline (no wrap).
159              width = len(str(header)) + _XLSX_COL_PADDING
160          else:
161              max_len = max(
162                  (len(str(cell.value)) for cell in col_cells if cell.value is not None),
163                  default=_XLSX_MIN_COL_WIDTH,
164              )
165              width = min(max_len + _XLSX_COL_PADDING, _XLSX_MAX_COL_WIDTH)
166  
167          ws.column_dimensions[get_column_letter(col_idx)].width = width
168  
169          if header not in _XLSX_NO_WRAP_COLS:
170              for cell in col_cells[1:]:  # Skip header.
171                  cell.alignment = wrap
172  
173      wb.save(xlsx_path)
174  
175  
176  def _save_outputs(
177      results: list[ScoredJob],
178      json_path: Path,
179      xlsx_path: Path,
180  ) -> None:
181      """Save scored results to JSON and Excel.
182  
183      Args:
184          results: List of scored jobs.
185          json_path: Destination for the full JSON output.
186          xlsx_path: Destination for the filtered Excel output.
187      """
188      with json_path.open("w", encoding="utf-8") as f:
189          json.dump(
190              [{**job.raw, **job.model_dump()} for job in results], f, indent=2, ensure_ascii=False
191          )
192  
193      try:
194          df = pd.DataFrame([_to_excel_row(job) for job in results])
195          df = df.sort_values("Score", ascending=False)
196          available_cols = [c for c in _XLSX_COLUMNS if c in df.columns]
197          df[available_cols].to_excel(xlsx_path, engine="openpyxl", index=False)
198          _format_xlsx(xlsx_path)
199      except OSError as exc:
200          logger.warning("Could not save Excel (file may be locked): %s", exc)
201  
202  
203  def run(
204      input_path: Path,
205      output_json: Path,
206      output_xlsx: Path,
207      persona_path: Path,
208      system_prompt_path: Path,
209      user_template_path: Path,
210      provider_override: str | None,
211      model_override: str | None,
212      limit: int | None,
213      force: bool,
214      workers: int,
215  ) -> tuple[int, int]:
216      """Run the score-jobs pipeline.
217  
218      Args:
219          input_path: Path to the input JSON file of job listings.
220          output_json: Destination path for the full scored JSON output.
221          output_xlsx: Destination path for the filtered Excel output.
222          persona_path: Path to the candidate persona text file.
223          system_prompt_path: Path to the LLM system prompt text file.
224          user_template_path: Path to the Jinja2 user message template file.
225          provider_override: Provider name from CLI, or None to use settings.
226          model_override: Model ID from CLI, or None to use settings.
227          limit: If set, score only this many unscored jobs (testing mode).
228          force: If True, ignore any existing checkpoint and restart from scratch.
229          workers: Number of parallel threads.
230  
231      Returns:
232          Tuple of (total_scored, new_this_run).
233  
234      Raises:
235          ParseError: If persona, system prompt, or user template files are not found.
236          ConfigurationError: If the required API key is missing.
237          APIError: If the LLM provider returns a non-retriable HTTP error.
238      """
239      # 1. Load persona
240      if not persona_path.exists():
241          raise ParseError("persona", f"File not found: {persona_path}")
242      persona_text = persona_path.read_text(encoding="utf-8")
243  
244      # 2. Load system prompt
245      if not system_prompt_path.exists():
246          raise ParseError("system_prompt", f"File not found: {system_prompt_path}")
247      system_prompt_text = system_prompt_path.read_text(encoding="utf-8")
248  
249      # 3. Load user template
250      if not user_template_path.exists():
251          raise ParseError("user_template", f"File not found: {user_template_path}")
252      user_template = user_template_path.read_text(encoding="utf-8")
253  
254      # 4. Resolve provider, model, temperature, seed, and max_tokens
255      (
256          default_provider, default_model, effective_temperature, effective_seed, effective_max_tokens
257      ) = settings.get_llm_config("score_jobs")
258      effective_provider = provider_override or default_provider
259      effective_model = model_override or default_model
260  
261      # 5. Construct provider
262      stats = RunStats()
263      raw_provider = create_llm_provider(
264          effective_provider, effective_model, settings, max_tokens=effective_max_tokens
265      )
266      provider = InstrumentedProvider(raw_provider, stats, phase="main")
267  
268      # 5b. Live probe — verify provider + model before scoring
269      logger.info("Probing provider=%s model=%s ...", effective_provider, effective_model)
270      try:
271          provider.ping(temperature=effective_temperature)
272      except APIError as exc:
273          logger.critical(
274              "Provider probe failed (%s %s): %s — check provider and model in config.toml",
275              effective_provider,
276              effective_model,
277              exc,
278          )
279          raise
280      logger.info("Provider probe OK.")
281  
282      # 6. Instantiate scorer service
283      service = JobScorerService(
284          provider,
285          system_prompt_text,
286          user_template,
287          persona_text,
288          temperature=effective_temperature,
289          seed=effective_seed,
290      )
291  
292      # 7. Load all jobs
293      all_jobs: list[JobListing] = [
294          JobListing.model_validate(e)
295          for e in json.loads(input_path.read_text(encoding="utf-8"))
296      ]
297  
298      # 8. Resume logic
299      results: list[ScoredJob] = []
300      processed_ids: set[str] = set()
301  
302      if output_json.exists() and not force:
303          try:
304              results, processed_ids = _load_checkpoint(output_json)
305          except (json.JSONDecodeError, ValidationError) as exc:
306              logger.warning(
307                  "Checkpoint at %s is unreadable (%s). Use --force to restart from scratch.",
308                  output_json,
309                  exc,
310              )
311              raise
312  
313      # 9. Filter to unscored jobs
314      jobs_to_score = [j for j in all_jobs if _get_job_id(j) not in processed_ids]
315      if limit is not None:
316          jobs_to_score = jobs_to_score[:limit]
317  
318      # 10. Nothing to score
319      if not jobs_to_score:
320          _save_outputs(results, output_json, output_xlsx)
321          stats.jobs_processed = len(results)
322          stats.jobs_skipped = len(results)
323          stats.jobs_errored = 0
324          stats.finish()
325          try:
326              print_summary(stats, pipeline="score_jobs", phase_models={"main": effective_model})
327          except Exception:
328              logger.warning("Failed to print run summary", exc_info=True)
329          return len(results), 0
330  
331      # 11. Threading lock
332      results_lock = threading.Lock()
333      new_results: list[ScoredJob] = []
334      total = len(jobs_to_score)
335  
336      # 12. Parallel scoring
337      with ThreadPoolExecutor(max_workers=workers) as executor:
338          futures = {executor.submit(service.score_job, job): job for job in jobs_to_score}
339          with tqdm(total=total, unit="job") as progress:
340              for future in as_completed(futures):
341                  result: ScoredJob = future.result()
342                  with results_lock:
343                      results.append(result)
344                      new_results.append(result)
345                      progress.update(1)
346                      progress.set_postfix_str(result.job_title[:40])
347                      if len(new_results) % settings.score_batch_size == 0:
348                          _save_outputs(results, output_json, output_xlsx)
349                          logger.info("Checkpoint: %d total jobs saved.", len(results))
350  
351      # 13. Final save
352      _save_outputs(results, output_json, output_xlsx)
353  
354      # 14. Run summary
355      error_count = sum(1 for r in new_results if r.fit_category == FitCategory.ERROR)
356      cached_count = len(results) - len(new_results)
357      stats.jobs_processed = len(results) - error_count
358      stats.jobs_skipped = cached_count
359      stats.jobs_errored = error_count
360      stats.finish()
361      try:
362          print_summary(stats, pipeline="score_jobs", phase_models={"main": effective_model})
363      except Exception:
364          logger.warning("Failed to print run summary", exc_info=True)
365  
366      # 15. Return counts
367      return len(results), len(new_results)