parser.py
1 from __future__ import annotations 2 3 import json 4 import re 5 from datetime import UTC, datetime 6 from pathlib import Path 7 8 from app.schemas import FinalDecision 9 10 OUTPUT_DIR = Path(__file__).resolve().parent.parent / "outputs" 11 FINAL_DECISION_PATH = OUTPUT_DIR / "final_decision.json" 12 START_MARKER = "BEGIN_FINAL_DECISION_JSON" 13 END_MARKER = "END_FINAL_DECISION_JSON" 14 SPECIAL_TOKENS = ("<|im_end|>", "<|endoftext|>", "</s>") 15 16 17 def _strip_transport_artifacts(raw_text: str) -> str: 18 text = raw_text.strip() 19 for token in SPECIAL_TOKENS: 20 text = text.replace(token, "") 21 return text.strip() 22 23 24 def _extract_marked_json(text: str) -> str | None: 25 if START_MARKER not in text or END_MARKER not in text: 26 return None 27 _, tail = text.split(START_MARKER, 1) 28 candidate, _ = tail.split(END_MARKER, 1) 29 return candidate.strip() 30 31 32 def _extract_balanced_json(text: str) -> str: 33 start = text.find("{") 34 if start == -1: 35 raise ValueError("No JSON object start found in model output") 36 37 depth = 0 38 in_string = False 39 escape = False 40 for index, char in enumerate(text[start:], start=start): 41 if escape: 42 escape = False 43 continue 44 if char == "\\": 45 escape = True 46 continue 47 if char == '"': 48 in_string = not in_string 49 continue 50 if in_string: 51 continue 52 if char == "{": 53 depth += 1 54 elif char == "}": 55 depth -= 1 56 if depth == 0: 57 return text[start : index + 1] 58 raise ValueError("No balanced JSON object end found in model output") 59 60 61 def _light_repair(candidate: str) -> str: 62 repaired = candidate.strip() 63 repaired = re.sub(r"^```json\s*", "", repaired) 64 repaired = re.sub(r"\s*```$", "", repaired) 65 repaired = repaired.replace("\ufeff", "") 66 repaired = re.sub(r",(\s*[}\]])", r"\1", repaired) 67 return repaired.strip() 68 69 70 def persist_failed_parse(raw_text: str) -> Path: 71 OUTPUT_DIR.mkdir(parents=True, exist_ok=True) 72 timestamp = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ") 73 path = OUTPUT_DIR / f"failed_parse_{timestamp}.txt" 74 path.write_text(raw_text, encoding="utf-8") 75 return path 76 77 78 def parse_final_decision(raw_text: str, incident_id: str) -> FinalDecision: 79 cleaned = _strip_transport_artifacts(raw_text) 80 parse_mode = "direct" 81 82 candidate = _extract_marked_json(cleaned) 83 if candidate is not None: 84 parse_mode = "marked" 85 else: 86 candidate = cleaned 87 88 try: 89 payload = json.loads(candidate) 90 except json.JSONDecodeError: 91 try: 92 repaired = _light_repair(_extract_balanced_json(candidate)) 93 payload = json.loads(repaired) 94 parse_mode = "repaired" 95 except Exception: 96 failed_path = persist_failed_parse(raw_text) 97 raise ValueError(f"Unable to parse FinalDecision JSON. Raw output saved to {failed_path}") 98 99 decision = FinalDecision.model_validate(payload) 100 if decision.incident_id != incident_id: 101 failed_path = persist_failed_parse(raw_text) 102 raise ValueError( 103 f"Incident ID mismatch. Expected {incident_id}, got {decision.incident_id}. " 104 f"Raw output saved to {failed_path}" 105 ) 106 107 machine_json_valid = parse_mode in {"direct", "marked"} and decision.machine_json_valid 108 return decision.model_copy(update={"machine_json_valid": machine_json_valid}) 109 110 111 def save_final_decision(decision: FinalDecision, path: Path = FINAL_DECISION_PATH) -> Path: 112 path.parent.mkdir(parents=True, exist_ok=True) 113 path.write_text(decision.model_dump_json(indent=2), encoding="utf-8") 114 return path