/ app / parser.py
parser.py
  1  from __future__ import annotations
  2  
  3  import json
  4  import re
  5  from datetime import UTC, datetime
  6  from pathlib import Path
  7  
  8  from app.schemas import FinalDecision
  9  
 10  OUTPUT_DIR = Path(__file__).resolve().parent.parent / "outputs"
 11  FINAL_DECISION_PATH = OUTPUT_DIR / "final_decision.json"
 12  START_MARKER = "BEGIN_FINAL_DECISION_JSON"
 13  END_MARKER = "END_FINAL_DECISION_JSON"
 14  SPECIAL_TOKENS = ("<|im_end|>", "<|endoftext|>", "</s>")
 15  
 16  
 17  def _strip_transport_artifacts(raw_text: str) -> str:
 18      text = raw_text.strip()
 19      for token in SPECIAL_TOKENS:
 20          text = text.replace(token, "")
 21      return text.strip()
 22  
 23  
 24  def _extract_marked_json(text: str) -> str | None:
 25      if START_MARKER not in text or END_MARKER not in text:
 26          return None
 27      _, tail = text.split(START_MARKER, 1)
 28      candidate, _ = tail.split(END_MARKER, 1)
 29      return candidate.strip()
 30  
 31  
 32  def _extract_balanced_json(text: str) -> str:
 33      start = text.find("{")
 34      if start == -1:
 35          raise ValueError("No JSON object start found in model output")
 36  
 37      depth = 0
 38      in_string = False
 39      escape = False
 40      for index, char in enumerate(text[start:], start=start):
 41          if escape:
 42              escape = False
 43              continue
 44          if char == "\\":
 45              escape = True
 46              continue
 47          if char == '"':
 48              in_string = not in_string
 49              continue
 50          if in_string:
 51              continue
 52          if char == "{":
 53              depth += 1
 54          elif char == "}":
 55              depth -= 1
 56              if depth == 0:
 57                  return text[start : index + 1]
 58      raise ValueError("No balanced JSON object end found in model output")
 59  
 60  
 61  def _light_repair(candidate: str) -> str:
 62      repaired = candidate.strip()
 63      repaired = re.sub(r"^```json\s*", "", repaired)
 64      repaired = re.sub(r"\s*```$", "", repaired)
 65      repaired = repaired.replace("\ufeff", "")
 66      repaired = re.sub(r",(\s*[}\]])", r"\1", repaired)
 67      return repaired.strip()
 68  
 69  
 70  def persist_failed_parse(raw_text: str) -> Path:
 71      OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
 72      timestamp = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ")
 73      path = OUTPUT_DIR / f"failed_parse_{timestamp}.txt"
 74      path.write_text(raw_text, encoding="utf-8")
 75      return path
 76  
 77  
 78  def parse_final_decision(raw_text: str, incident_id: str) -> FinalDecision:
 79      cleaned = _strip_transport_artifacts(raw_text)
 80      parse_mode = "direct"
 81  
 82      candidate = _extract_marked_json(cleaned)
 83      if candidate is not None:
 84          parse_mode = "marked"
 85      else:
 86          candidate = cleaned
 87  
 88      try:
 89          payload = json.loads(candidate)
 90      except json.JSONDecodeError:
 91          try:
 92              repaired = _light_repair(_extract_balanced_json(candidate))
 93              payload = json.loads(repaired)
 94              parse_mode = "repaired"
 95          except Exception:
 96              failed_path = persist_failed_parse(raw_text)
 97              raise ValueError(f"Unable to parse FinalDecision JSON. Raw output saved to {failed_path}")
 98  
 99      decision = FinalDecision.model_validate(payload)
100      if decision.incident_id != incident_id:
101          failed_path = persist_failed_parse(raw_text)
102          raise ValueError(
103              f"Incident ID mismatch. Expected {incident_id}, got {decision.incident_id}. "
104              f"Raw output saved to {failed_path}"
105          )
106  
107      machine_json_valid = parse_mode in {"direct", "marked"} and decision.machine_json_valid
108      return decision.model_copy(update={"machine_json_valid": machine_json_valid})
109  
110  
111  def save_final_decision(decision: FinalDecision, path: Path = FINAL_DECISION_PATH) -> Path:
112      path.parent.mkdir(parents=True, exist_ok=True)
113      path.write_text(decision.model_dump_json(indent=2), encoding="utf-8")
114      return path