/ autoreview / engine.py
engine.py
1 """Core: file discovery, persisted batch state, Venice API, markdown report.""" 2 from __future__ import annotations 3 4 import fnmatch 5 import hashlib 6 import json 7 import logging 8 import os 9 import random 10 import re 11 import subprocess 12 import threading 13 import time 14 from collections.abc import Callable 15 from concurrent.futures import ThreadPoolExecutor 16 from concurrent.futures import TimeoutError as FuturesTimeout 17 from dataclasses import dataclass, field 18 from datetime import datetime 19 from pathlib import Path 20 21 from openai import APIError, APITimeoutError, OpenAI, RateLimitError 22 from pydantic import ValidationError 23 24 from autoreview.schemas import ReviewPayload 25 26 logger = logging.getLogger(__name__) 27 28 # OpenAI client expects base URL without trailing slash (paths like /chat/completions are appended). 29 VENICE_BASE_URL = os.environ.get("VENICE_BASE_URL", "https://api.venice.ai/api/v1").rstrip("/") 30 DEFAULT_MODEL = os.environ.get("VENICE_MODEL", "kimi-k2-5") 31 32 # Curated for the GUI from Venice text models (see https://docs.venice.ai/models/overview ). 33 # Use env VENICE_MODEL for any id not listed here (GUI shows it under “Custom (env)”). 34 VENICE_MODEL_GROUPS: tuple[tuple[str, tuple[str, ...]], ...] = ( 35 ("Recommended", ("kimi-k2-5", "zai-org-glm-5", "deepseek-v3.2")), 36 ("Fast coding / agents", ("z-ai-glm-5-turbo", "kimi-k2-thinking")), 37 ( 38 "Dedicated coders", 39 ( 40 "qwen3-coder-480b-a35b-instruct", 41 "qwen3-coder-480b-a35b-instruct-turbo", 42 "openai-gpt-53-codex", 43 "openai-gpt-52-codex", 44 ), 45 ), 46 ("Long context", ("claude-sonnet-4-6", "claude-opus-4-7", "grok-4-20")), 47 ("Reasoning-heavy", ("arcee-trinity-large-thinking", "kimi-k2")), 48 ( 49 "General instruct", 50 ( 51 "qwen3-5-397b-a17b", 52 "qwen3-5-35b-a3b", 53 "qwen-3-6-plus", 54 "mistral-small-2603", 55 "minimax-m27", 56 ), 57 ), 58 ) 59 60 61 def venice_model_categories() -> tuple[str, ...]: 62 return tuple(c for c, _ in VENICE_MODEL_GROUPS) 63 64 65 def venice_models_for_category(category: str) -> tuple[str, ...]: 66 for c, models in VENICE_MODEL_GROUPS: 67 if c == category: 68 return models 69 return () 70 71 72 def venice_category_for_model(model_id: str) -> str | None: 73 for c, models in VENICE_MODEL_GROUPS: 74 if model_id in models: 75 return c 76 return None 77 78 79 VENICE_MODEL_CHOICES: tuple[str, ...] = tuple(m for _, models in VENICE_MODEL_GROUPS for m in models) 80 81 # Persist state every N completed files (skips count). Set AUTOREVIEW_STATE_SAVE_EVERY=1 for old behavior. 82 def _state_save_interval() -> int: 83 raw = os.environ.get("AUTOREVIEW_STATE_SAVE_EVERY", "5").strip() 84 try: 85 n = int(raw) 86 except ValueError: 87 return 5 88 return max(1, min(n, 10_000)) 89 90 # batch_size == 0 means “process every pending file in this run” (entire remaining folder). 91 BATCH_SIZE_ALL = 0 92 # Per-file read cap (avoids accidental huge reads into memory). 93 MIN_FILE_BYTES = 1024 94 MAX_FILE_BYTES_CAP = 50 * 1024 * 1024 95 STATE_VERSION = 1 96 STATE_DIRNAME = ".venice_review" 97 DEFAULT_OUTPUT_NAME = "VENICE_CODE_REVIEW.md" 98 99 # Directory name segments to prune during walk and to drop from git-discovered paths. 100 # Covers caches, venvs, build output, IDE, and package manager noise (saves API tokens). 101 _TOOL_AND_CACHE_DIR_NAMES = frozenset( 102 { 103 ".git", 104 ".venice_review", 105 "node_modules", 106 ".venv", 107 "venv", 108 "__pycache__", 109 ".pytest_cache", 110 ".mypy_cache", 111 ".ruff_cache", 112 ".hypothesis", 113 ".nox", 114 ".cache", 115 ".cursor", 116 ".idea", 117 ".vscode", 118 ".turbo", 119 ".next", 120 ".nuxt", 121 ".output", 122 ".parcel-cache", 123 "dist", 124 "build", 125 ".eggs", 126 ".tox", 127 "vendor", 128 ".cargo", 129 "target", 130 } 131 ) 132 133 134 def _should_skip_noise_path(rel: str) -> bool: 135 """Skip paths under tool/cache dirs or known non-source files (applies to git + walk).""" 136 parts = rel.replace("\\", "/").split("/") 137 for part in parts: 138 if part in _TOOL_AND_CACHE_DIR_NAMES: 139 return True 140 if part.endswith(".egg-info"): 141 return True 142 base = parts[-1] if parts else "" 143 if base == "CACHEDIR.TAG": 144 return True 145 return False 146 147 148 # Typical documentation trees (path segment match, any depth). 149 _DOC_TREE_DIR_NAMES = frozenset({"doc", "docs", "documentation"}) 150 151 152 def _matches_default_doc_exclude(rel: str) -> bool: 153 """True if path looks like prose/docs rather than source (when default doc filter is on).""" 154 parts = rel.replace("\\", "/").split("/") 155 for part in parts: 156 if part.lower() in _DOC_TREE_DIR_NAMES: 157 return True 158 name = Path(rel).name.lower() 159 return name.endswith((".md", ".mdx", ".mdown", ".markdown", ".rst")) 160 161 ProgressCallback = Callable[[str, dict], None] | None 162 163 164 def _sha256_text(s: str) -> str: 165 return hashlib.sha256(s.encode("utf-8")).hexdigest() 166 167 168 def normalize_root(path: str | Path) -> Path: 169 return Path(path).expanduser().resolve() 170 171 172 def state_dir(root: Path) -> Path: 173 return root / STATE_DIRNAME 174 175 176 def state_path(root: Path) -> Path: 177 return state_dir(root) / "state.json" 178 179 180 def default_report_path(root: Path) -> Path: 181 return root / DEFAULT_OUTPUT_NAME 182 183 184 @dataclass 185 class ReviewState: 186 version: int = STATE_VERSION 187 root_abs: str = "" 188 fingerprint: str = "" 189 completed_paths: list[str] = field(default_factory=list) 190 output_name: str = DEFAULT_OUTPUT_NAME 191 # Per Venice model id: tokens accumulated for this project (all autoreview runs). 192 usage_by_model: dict[str, dict[str, int]] = field(default_factory=dict) 193 194 def to_json(self) -> dict: 195 return { 196 "version": self.version, 197 "root_abs": self.root_abs, 198 "fingerprint": self.fingerprint, 199 "completed_paths": sorted(set(self.completed_paths)), 200 "output_name": self.output_name, 201 "usage_by_model": { 202 k: {"prompt": int(v["prompt"]), "completion": int(v["completion"])} 203 for k, v in sorted(self.usage_by_model.items()) 204 }, 205 } 206 207 @classmethod 208 def from_json(cls, data: dict) -> ReviewState: 209 ubm: dict[str, dict[str, int]] = {} 210 raw = data.get("usage_by_model") 211 if isinstance(raw, dict): 212 for mid, u in raw.items(): 213 if isinstance(u, dict): 214 ubm[str(mid)] = { 215 "prompt": int(u.get("prompt", 0)), 216 "completion": int(u.get("completion", 0)), 217 } 218 return cls( 219 version=int(data.get("version", STATE_VERSION)), 220 root_abs=str(data.get("root_abs", "")), 221 fingerprint=str(data.get("fingerprint", "")), 222 completed_paths=list(data.get("completed_paths", [])), 223 output_name=str(data.get("output_name", DEFAULT_OUTPUT_NAME)), 224 usage_by_model=ubm, 225 ) 226 227 228 def _resolve_report_path(root: Path, output_path: Path | None, state: ReviewState | None) -> Path: 229 """Report file to write to. Explicit ``output_path`` wins; else reuse ``state.output_name`` from a prior run.""" 230 if output_path is not None: 231 return output_path.resolve() 232 if state is not None: 233 name = (state.output_name or "").strip() 234 if name: 235 return (root / Path(name).name).resolve() 236 return default_report_path(root).resolve() 237 238 239 def compute_fingerprint(rel_paths: list[str], include: tuple[str, ...], exclude: tuple[str, ...]) -> str: 240 payload = "\n".join(sorted(rel_paths)) + "\n---\n" + "|".join(include) + "\n" + "|".join(exclude) 241 return _sha256_text(payload) 242 243 244 def load_state(root: Path) -> ReviewState | None: 245 p = state_path(root) 246 if not p.is_file(): 247 return None 248 try: 249 data = json.loads(p.read_text(encoding="utf-8")) 250 return ReviewState.from_json(data) 251 except (OSError, json.JSONDecodeError, TypeError, KeyError): 252 return None 253 254 255 def save_state(root: Path, state: ReviewState) -> None: 256 d = state_dir(root) 257 d.mkdir(parents=True, exist_ok=True) 258 tmp = state_path(root).with_suffix(".tmp") 259 tmp.write_text(json.dumps(state.to_json(), indent=2), encoding="utf-8") 260 tmp.replace(state_path(root)) 261 262 263 def discover_via_git(root: Path) -> list[str] | None: 264 git_dir = root / ".git" 265 if not git_dir.exists(): 266 return None 267 try: 268 proc = subprocess.run( 269 ["git", "-C", str(root), "ls-files", "-z", "--cached", "--exclude-standard"], 270 capture_output=True, 271 timeout=120, 272 check=False, 273 ) 274 except (OSError, subprocess.SubprocessError): 275 return None 276 if proc.returncode != 0: 277 return None 278 raw = proc.stdout or b"" 279 names = [x.decode("utf-8", errors="replace") for x in raw.split(b"\0") if x] 280 out: list[str] = [] 281 for name in names: 282 p = root / name 283 if p.is_file(): 284 out.append(name.replace("\\", "/")) 285 return sorted(out) 286 287 288 def _should_skip_dir(name: str) -> bool: 289 if name in _TOOL_AND_CACHE_DIR_NAMES: 290 return True 291 if name.endswith(".egg-info"): 292 return True 293 return False 294 295 296 def discover_via_walk(root: Path) -> list[str]: 297 out: list[str] = [] 298 for dirpath, dirnames, filenames in os.walk(root, followlinks=False): 299 # prune dirs in place 300 dirnames[:] = [d for d in sorted(dirnames) if not _should_skip_dir(d)] 301 for fn in sorted(filenames): 302 full = Path(dirpath) / fn 303 if not full.is_file(): 304 continue 305 try: 306 rel = full.relative_to(root) 307 except ValueError: 308 continue 309 out.append(str(rel).replace("\\", "/")) 310 return sorted(out) 311 312 313 def filter_paths( 314 paths: list[str], 315 root: Path, 316 include: tuple[str, ...], 317 exclude: tuple[str, ...], 318 ) -> list[str]: 319 def inc_ok(rel: str) -> bool: 320 if not include: 321 return True 322 return any(fnmatch.fnmatch(rel, p) or fnmatch.fnmatch(Path(rel).name, p) for p in include) 323 324 def not_excluded(rel: str) -> bool: 325 if not exclude: 326 return True 327 return not any(fnmatch.fnmatch(rel, p) or fnmatch.fnmatch(Path(rel).name, p) for p in exclude) 328 329 result: list[str] = [] 330 for rel in paths: 331 if not inc_ok(rel): 332 continue 333 if not not_excluded(rel): 334 continue 335 p = root / rel 336 if p.is_file(): 337 result.append(rel) 338 return sorted(result) 339 340 341 # Basenames to skip in every review (meta; secrets; not app source). 342 _SKIP_REVIEW_BASENAMES = frozenset( 343 { 344 ".env", 345 DEFAULT_OUTPUT_NAME, # do not send the generated report back through the API 346 } 347 ) 348 349 350 def discover_files( 351 root: Path, 352 include: tuple[str, ...] = (), 353 exclude: tuple[str, ...] = (), 354 *, 355 apply_default_doc_excludes: bool = True, 356 ) -> list[str]: 357 """List files to review. By default, skips markdown/rst and paths under doc/docs/documentation. 358 359 Set ``apply_default_doc_excludes=False`` (CLI ``--review-markdown``, env ``AUTOREVIEW_REVIEW_MARKDOWN``) 360 to review those like any other file. 361 """ 362 paths = discover_via_git(root) 363 if paths is None: 364 paths = discover_via_walk(root) 365 paths = filter_paths(paths, root, include, exclude) 366 paths = [ 367 p 368 for p in paths 369 if Path(p).name not in _SKIP_REVIEW_BASENAMES 370 and not _should_skip_noise_path(p) 371 and not (apply_default_doc_excludes and _matches_default_doc_exclude(p)) 372 ] 373 return sorted(paths) 374 375 376 def looks_binary(sample: bytes) -> bool: 377 if b"\0" in sample[:8192]: 378 return True 379 return False 380 381 382 def _read_file_bytes_capped(path: Path, max_bytes: int) -> bytes: 383 with path.open("rb") as f: 384 return f.read(max_bytes + 1) 385 386 387 def read_file_limited(path: Path, max_bytes: int) -> tuple[str | None, str | None]: 388 """Return (text, None) or (None, skip_reason). Reads at most ``max_bytes + 1`` bytes from disk.""" 389 try: 390 timeout = float(os.environ.get("AUTOREVIEW_READ_TIMEOUT_SEC", "120").strip() or "120") 391 except ValueError: 392 timeout = 120.0 393 timeout = max(1.0, min(timeout, 3600.0)) 394 try: 395 with ThreadPoolExecutor(max_workers=1) as ex: 396 fut = ex.submit(_read_file_bytes_capped, path, max_bytes) 397 raw = fut.result(timeout=timeout) 398 except FuturesTimeout: 399 return None, "skipped (read timed out)" 400 except OSError as e: 401 return None, f"unreadable: {e}" 402 if len(raw) > max_bytes: 403 return None, f"skipped (> {max_bytes} bytes)" 404 chunk = raw[:8192] 405 if looks_binary(chunk): 406 return None, "skipped (binary)" 407 try: 408 text = raw.decode("utf-8") 409 except UnicodeDecodeError: 410 try: 411 text = raw.decode("utf-8", errors="replace") 412 except Exception: 413 return None, "skipped (encoding)" 414 return text, None 415 416 417 _REVIEW_DIMENSION_KEYS: tuple[str, ...] = ( 418 "security", 419 "code_quality", 420 "structure", 421 "performance", 422 "testing_observability", 423 ) 424 425 # Short, generic dismissals (after normalize) — not shown in reports. 426 _GENERIC_DISMISSAL_PHRASES: frozenset[str] = frozenset( 427 { 428 "-", 429 "—", 430 "n/a", 431 "n/a.", 432 "na", 433 "nil", 434 "no", 435 "no.", 436 "none", 437 "none.", 438 "nope", 439 "not applicable", 440 "noted", 441 "ok", 442 "ok.", 443 "okay", 444 "okay.", 445 "no concerns", 446 "no issues", 447 "no issues.", 448 "no problem", 449 "no problems", 450 "none noted", 451 "nothing to note", 452 "nothing to report", 453 "looks fine", 454 "looks good", 455 "looks okay", 456 "all good", 457 "fine", 458 "good", 459 } 460 ) 461 462 463 def _is_substantive_review_text(text: str) -> bool: 464 """True if text is worth showing (filters empty lines and generic one-line dismissals).""" 465 if not isinstance(text, str): 466 return False 467 s = text.strip() 468 if not s: 469 return False 470 if len(s) > 160: 471 return True 472 low = " ".join(s.lower().split()) 473 low = low.rstrip(".!…") 474 if low in _GENERIC_DISMISSAL_PHRASES: 475 return False 476 return True 477 478 479 # Enforced after scrubbing; prompt asks the model to stay within this cap and anchor each item. 480 MAX_SUGGESTIONS_PER_FILE = 5 481 _SUGGESTION_SEVERITY_RANK: dict[str, int] = {"high": 0, "medium": 1, "low": 2} 482 483 484 def _cap_suggestions_by_severity(suggestions: list[dict]) -> list[dict]: 485 """Keep the strongest items when the model returns too many.""" 486 if len(suggestions) <= MAX_SUGGESTIONS_PER_FILE: 487 return suggestions 488 ranked = sorted( 489 suggestions, 490 key=lambda x: _SUGGESTION_SEVERITY_RANK.get(str(x.get("severity", "medium")).lower(), 1), 491 ) 492 return ranked[:MAX_SUGGESTIONS_PER_FILE] 493 494 495 def _scrub_review_dict(data: dict) -> dict: 496 """Drop filler dimension text and suggestions that are empty or generic dismissals.""" 497 out: dict = {} 498 for key in _REVIEW_DIMENSION_KEYS: 499 v = data.get(key, "") 500 s = v if isinstance(v, str) else str(v) 501 s = s.strip() 502 out[key] = s if _is_substantive_review_text(s) else "" 503 cleaned: list[dict] = [] 504 sug = data.get("suggestions") or [] 505 if isinstance(sug, list): 506 for item in sug: 507 if not isinstance(item, dict): 508 continue 509 sev = str(item.get("severity", "medium")).lower() 510 if sev not in ("high", "medium", "low"): 511 sev = "medium" 512 det = item.get("detail", "") 513 det = det if isinstance(det, str) else str(det) 514 det = det.strip() 515 if not _is_substantive_review_text(det): 516 continue 517 cleaned.append({"severity": sev, "detail": det}) 518 out["suggestions"] = _cap_suggestions_by_severity(cleaned) 519 return out 520 521 522 def _review_context_hint(relative_path: str) -> str: 523 """Optional, path-specific focus line for the user message (not repeated in JSON keys).""" 524 norm = relative_path.replace("\\", "/").lower() 525 name = Path(relative_path).name.lower() 526 suf = Path(relative_path).suffix.lower() 527 528 if ".github/workflows/" in norm and suf in (".yml", ".yaml", ""): 529 return ( 530 "Prioritize workflow correctness, use of secrets, pinning third-party actions, " 531 "and least-privilege permissions." 532 ) 533 if name in ("dockerfile",) or norm.endswith("/dockerfile"): 534 return ( 535 "Prioritize image security (base image, packages), non-root users, " 536 "and avoiding leaked secrets in layers." 537 ) 538 if name == "makefile" or name.startswith("makefile."): 539 return "Prioritize build safety, reproducibility, and avoiding destructive or surprising commands." 540 541 if suf in (".json", ".yaml", ".yml", ".toml", ".ini", ".cfg", ".properties"): 542 return ( 543 "Prioritize schema/correctness, validation, and avoiding accidental exposure of secrets or credentials." 544 ) 545 if suf == ".sql": 546 return "Prioritize SQL correctness, injection risks, and migration safety." 547 548 if suf in (".py", ".ts", ".tsx", ".js", ".jsx", ".go", ".rs", ".java", ".kt", ".swift"): 549 in_test_area = ( 550 "/tests/" in norm 551 or "/test/" in norm 552 or "/__tests__/" in norm 553 or name.startswith("test_") 554 or name.endswith("_test.py") 555 or name.endswith(".test.ts") 556 or name.endswith(".test.js") 557 or name.endswith(".test.tsx") 558 or name.endswith("_test.go") 559 or name == "conftest.py" 560 ) 561 if in_test_area: 562 return ( 563 "Prioritize test correctness, meaningful assertions, fixtures, isolation, " 564 "and flaky or slow patterns." 565 ) 566 567 return "" 568 569 570 SYSTEM_PROMPT = """You are an expert code reviewer. Analyze ONLY the file provided by the user. 571 572 Respond with a single JSON object (no markdown fences, no commentary) using exactly these keys: 573 { 574 "security": string, 575 "code_quality": string, 576 "structure": string, 577 "performance": string, 578 "testing_observability": string, 579 "suggestions": [ {"severity": "high"|"medium"|"low", "detail": string} ] 580 } 581 582 Rules (strict): 583 - Use the empty string "" for any dimension that does not need a specific observation for THIS file. Do not pad with filler such as "no issues", "looks fine", "N/A", "none", or "OK". 584 - If you have nothing actionable or specific to say in a dimension, leave it as "". 585 - When you do write a dimension, keep it to at most two sentences unless you are describing a critical problem that needs more detail. 586 - Put concrete, actionable items in "suggestions" with an appropriate severity. Use an empty array [] if there is nothing actionable. 587 - Do not repeat the same point across multiple dimensions; mention it once in the most fitting field or as a single suggestion. 588 - Tailor content to this file's path and source; avoid generic boilerplate that could apply to any file. 589 590 Product judgment (avoid autoreviewer noise): 591 - Do not suggest refactors or API churn (e.g. sync vs promises, minor style) for rare paths, one-off startup, or code that is clearly cold unless there is a concrete bug or security risk. If the cost to the team outweighs the benefit, say nothing. 592 - Respect common, acceptable tradeoffs: test ergonomics (e.g. double-underscore helpers in tests), naming that reads well to authors, and patterns that are normal in this ecosystem. Pedantry about style or naming is not a "finding" unless it causes real confusion, bugs, or conflicts with project rules. 593 - Cryptography, timing guarantees, constant-time comparisons, and subtle security claims: do not assert a defect without reasoning from this file. Prefer: what to verify against the full call chain, what to document (threat model, guarantees), or what might still leak—phrased as worth confirming, not as certainty unless the code is plainly wrong. 594 - Before claiming async/await races, missing await, or timing bugs: check the actual signature and call style in this file (e.g. sync vs async functions). Do not template-findings from filenames; omit the suggestion if the code cannot exhibit the alleged bug. 595 - Severity "high" is for issues that are realistically exploitable or broken on normal call paths. If a concern is defense-in-depth (e.g. duplicate check elsewhere in the route layer) or only matters for hypothetical direct callers / future misuse, prefer severity "medium" and say so—do not describe current production URLs as immediately exploitable when guards higher in the stack already enforce invariants. 596 597 Suggestions list (anchor + cap): 598 - Include at most 5 entries in "suggestions" (fewer is fine). If you have more than 5 ideas, keep only the highest-impact ones. 599 - Every "detail" must point at something concrete in THIS file: name a function, class, variable, import, or describe specific control-flow or observable line behavior readers can find in the snippet. Do not add generic framework or "best practice" advice unless it is clearly tied to named code here. 600 - Vague middleware/architecture tips with no symbol or line anchor should be omitted.""" 601 602 603 def build_user_message(relative_path: str, content: str) -> str: 604 hint = _review_context_hint(relative_path) 605 extra = "" 606 if hint: 607 extra = f"\nContext: {hint}\n" 608 return ( 609 f"File path: `{relative_path}`\n" 610 f"{extra}\n" 611 "Source:\n```\n" 612 + content 613 + "\n```\n" 614 ) 615 616 617 def _strip_json_fence(text: str) -> str: 618 text = text.strip() 619 m = re.match(r"^```(?:json)?\s*([\s\S]*?)\s*```$", text) 620 if m: 621 return m.group(1).strip() 622 return text 623 624 625 def parse_review_json(content: str) -> dict: 626 raw = _strip_json_fence(content) 627 try: 628 return json.loads(raw) 629 except json.JSONDecodeError: 630 m = re.search(r"\{[\s\S]*\}", raw) 631 if m: 632 return json.loads(m.group(0)) 633 raise 634 635 636 def validate_review_dict(data: dict) -> dict: 637 """Ensure expected keys, normalized suggestions, and scrub non-substantive filler.""" 638 out: dict = {} 639 for key in _REVIEW_DIMENSION_KEYS: 640 v = data.get(key, "") 641 out[key] = v if isinstance(v, str) else str(v) 642 sug = data.get("suggestions") or [] 643 if not isinstance(sug, list): 644 sug = [] 645 cleaned: list[dict] = [] 646 for item in sug: 647 if not isinstance(item, dict): 648 continue 649 sev = str(item.get("severity", "medium")).lower() 650 if sev not in ("high", "medium", "low"): 651 sev = "medium" 652 det = item.get("detail", "") 653 det = det if isinstance(det, str) else str(det) 654 cleaned.append({"severity": sev, "detail": det.strip()}) 655 out["suggestions"] = cleaned 656 return _scrub_review_dict(out) 657 658 659 def validate_review_payload(data: dict) -> dict: 660 """Validate and normalize LLM JSON with Pydantic; fall back to :func:`validate_review_dict` if invalid.""" 661 try: 662 d = ReviewPayload.model_validate(data).to_report_dict() 663 return _scrub_review_dict(d) 664 except ValidationError as e: 665 logger.warning("Review JSON failed schema validation; using legacy normalizer: %s", e) 666 return validate_review_dict(data) 667 668 669 def json_to_markdown(relative_path: str, data: dict) -> str: 670 data = _scrub_review_dict(dict(data)) 671 lines = [f"## `{relative_path}`\n"] 672 any_block = False 673 for key, title in [ 674 ("security", "Security"), 675 ("code_quality", "Code quality"), 676 ("structure", "Structure / architecture"), 677 ("performance", "Performance"), 678 ("testing_observability", "Testing / observability"), 679 ]: 680 val = data.get(key, "") 681 if isinstance(val, str) and _is_substantive_review_text(val): 682 lines.append(f"### {title}\n\n{val.strip()}\n") 683 any_block = True 684 sug = data.get("suggestions") or [] 685 if isinstance(sug, list) and sug: 686 lines.append("### Suggestions\n\n") 687 for item in sug: 688 if not isinstance(item, dict): 689 continue 690 sev = str(item.get("severity", "medium")).lower() 691 det = str(item.get("detail", "")).strip() 692 if not _is_substantive_review_text(det): 693 continue 694 lines.append(f"- **{sev}**: {det}\n") 695 any_block = True 696 lines.append("") 697 if not any_block: 698 lines.append("_No substantive feedback for this file._\n") 699 lines.append("\n---\n") 700 return "\n".join(lines) 701 702 703 def make_openai_client(api_key: str) -> OpenAI: 704 return OpenAI( 705 api_key=api_key, 706 base_url=VENICE_BASE_URL, 707 timeout=120.0, 708 max_retries=0, 709 ) 710 711 712 def completion_usage_from_response(resp: object) -> dict[str, int]: 713 """Prompt/completion token counts from a chat completion response (OpenAI-compatible).""" 714 u = getattr(resp, "usage", None) 715 if u is None: 716 return {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} 717 return { 718 "prompt_tokens": int(getattr(u, "prompt_tokens", None) or 0), 719 "completion_tokens": int(getattr(u, "completion_tokens", None) or 0), 720 "total_tokens": int(getattr(u, "total_tokens", None) or 0), 721 } 722 723 724 def _extract_llm_pricing_usd_per_million(model_obj: object) -> tuple[float, float] | None: 725 """Return (input_usd_per_million, output_usd_per_million) from a Venice /models list item.""" 726 dump = model_obj.model_dump() if hasattr(model_obj, "model_dump") else {} 727 if not isinstance(dump, dict): 728 return None 729 if dump.get("type") != "text": 730 return None 731 spec = dump.get("model_spec") 732 if not isinstance(spec, dict): 733 mspec = getattr(model_obj, "model_spec", None) 734 if hasattr(mspec, "model_dump"): 735 spec = mspec.model_dump() 736 elif isinstance(mspec, dict): 737 spec = mspec 738 else: 739 spec = {} 740 pr = spec.get("pricing") if isinstance(spec, dict) else None 741 if not isinstance(pr, dict): 742 pr = dump.get("pricing") 743 if not isinstance(pr, dict): 744 return None 745 inp = pr.get("input") or {} 746 out = pr.get("output") or {} 747 if not isinstance(inp, dict) or not isinstance(out, dict): 748 return None 749 iu = inp.get("usd") 750 ou = out.get("usd") 751 if iu is None or ou is None: 752 return None 753 return float(iu), float(ou) 754 755 756 def fetch_text_models_pricing_usd(client: OpenAI) -> dict[str, tuple[float, float]]: 757 """Map model id → (USD per 1M input tokens, USD per 1M output tokens).""" 758 out: dict[str, tuple[float, float]] = {} 759 try: 760 page = client.models.list() 761 for m in page.data: 762 p = _extract_llm_pricing_usd_per_million(m) 763 if p: 764 out[m.id] = p 765 except Exception as e: 766 logger.warning("Could not list models for pricing: %s", e) 767 return out 768 769 770 def merge_completion_usage(state: ReviewState, model: str, usage: dict[str, int]) -> None: 771 pt = int(usage.get("prompt_tokens") or 0) 772 ct = int(usage.get("completion_tokens") or 0) 773 if pt == 0 and ct == 0: 774 return 775 b = state.usage_by_model.setdefault(model, {"prompt": 0, "completion": 0}) 776 b["prompt"] += pt 777 b["completion"] += ct 778 779 780 def total_usage_tokens(state: ReviewState) -> tuple[int, int]: 781 pt = sum(u["prompt"] for u in state.usage_by_model.values()) 782 ct = sum(u["completion"] for u in state.usage_by_model.values()) 783 return pt, ct 784 785 786 def estimate_project_spend_usd( 787 pricing_map: dict[str, tuple[float, float]], 788 state: ReviewState, 789 ) -> float | None: 790 """Estimated USD using ``/models`` pricing; None if no usage or no pricing.""" 791 if not state.usage_by_model: 792 return None 793 total = 0.0 794 found = False 795 for mid, u in state.usage_by_model.items(): 796 p = pricing_map.get(mid) 797 if not p: 798 continue 799 found = True 800 inp_m, out_m = p 801 total += (u["prompt"] / 1_000_000.0) * inp_m + (u["completion"] / 1_000_000.0) * out_m 802 if not found: 803 return None 804 return total 805 806 807 def project_usage_display_text(root: Path, api_key: str | None) -> str: 808 """One-line summary for GUI: tokens + optional USD estimate (requires API key).""" 809 st = load_state(root) 810 if not st or not st.usage_by_model: 811 return "This project: no API usage recorded yet." 812 pt, ct = total_usage_tokens(st) 813 if not api_key or not api_key.strip(): 814 return f"This project: {pt:,} prompt + {ct:,} completion tokens (set API key to estimate USD)." 815 try: 816 client = make_openai_client(api_key.strip()) 817 pm = fetch_text_models_pricing_usd(client) 818 usd = estimate_project_spend_usd(pm, st) 819 except Exception as e: 820 logger.warning("Usage display failed: %s", e) 821 return f"This project: {pt:,} prompt + {ct:,} completion tokens (could not estimate USD)." 822 if usd is None: 823 return f"This project: {pt:,} prompt + {ct:,} completion tokens (pricing unavailable for some models)." 824 return f"This project: est. ~${usd:.4f} USD · {pt:,} prompt + {ct:,} completion tokens" 825 826 827 def completion_with_retry( 828 client: OpenAI, 829 model: str, 830 messages: list[dict], 831 *, 832 max_attempts: int = 5, 833 base_delay: float = 1.0, 834 ) -> tuple[str, dict[str, int]]: 835 """Non-streaming completion; retries on 429 and transient 5xx. Returns (text, token_usage).""" 836 last_err: Exception | None = None 837 for attempt in range(max_attempts): 838 try: 839 resp = client.chat.completions.create( 840 model=model, 841 messages=messages, 842 temperature=0.2, 843 ) 844 choice = resp.choices[0] 845 content = choice.message.content 846 usage = completion_usage_from_response(resp) 847 if content is None: 848 return "", usage 849 return content, usage 850 except RateLimitError as e: 851 last_err = e 852 logger.warning( 853 "Venice API rate limited (attempt %s/%s): %s", 854 attempt + 1, 855 max_attempts, 856 e, 857 ) 858 except APIError as e: 859 code = getattr(e, "status_code", None) or getattr(e, "code", None) 860 if code in (429, 500, 502, 503, 504): 861 last_err = e 862 logger.warning( 863 "Venice API error (attempt %s/%s) status=%s: %s", 864 attempt + 1, 865 max_attempts, 866 code, 867 e, 868 ) 869 else: 870 raise 871 except APITimeoutError as e: 872 last_err = e 873 logger.warning( 874 "Venice API timeout (attempt %s/%s): %s", 875 attempt + 1, 876 max_attempts, 877 e, 878 ) 879 delay = base_delay * (2**attempt) + random.uniform(0, 0.5) 880 time.sleep(min(delay, 60.0)) 881 if last_err: 882 raise last_err 883 raise RuntimeError("completion_with_retry: exhausted") 884 885 886 @dataclass 887 class RunResult: 888 processed: int = 0 889 remaining: int = 0 890 complete: bool = False 891 cancelled: bool = False 892 output_path: Path | None = None 893 log_lines: list[str] = field(default_factory=list) 894 fingerprint_warning: str | None = None 895 # Token usage (this run vs project lifetime in .venice_review/state.json). 896 usage_prompt_tokens_run: int = 0 897 usage_completion_tokens_run: int = 0 898 usage_prompt_tokens_total: int = 0 899 usage_completion_tokens_total: int = 0 900 usage_estimated_usd_project: float | None = None 901 902 903 def _append_report(path: Path, text: str, header_if_new: str) -> None: 904 path.parent.mkdir(parents=True, exist_ok=True) 905 if not path.exists(): 906 path.write_text(header_if_new + "\n\n" + text, encoding="utf-8") 907 else: 908 with path.open("a", encoding="utf-8") as f: 909 f.write("\n" + text) 910 911 912 def _format_run_banner(model: str, batch_note: str, rel_paths: list[str], *, max_list: int = 20) -> str: 913 """Markdown block prepended once per batch run so appended reports stay separated by time and file set.""" 914 ts = datetime.now().astimezone().replace(microsecond=0).isoformat() 915 n = len(rel_paths) 916 show = rel_paths[:max_list] 917 extra = n - len(show) 918 if n == 0: 919 paths_line = "(none)" 920 else: 921 paths_line = ", ".join(f"`{r}`" for r in show) 922 if extra > 0: 923 paths_line += f" … (+{extra} more)" 924 return ( 925 f"## Autoreview run — {ts}\n\n" 926 f"- **Model:** `{model}`\n" 927 f"- **Batch:** {batch_note}\n" 928 f"- **Files this run ({n}):** {paths_line}\n" 929 ) 930 931 932 def effective_batch_size(batch_size: int, pending_count: int) -> int: 933 """Map request to a concrete slice length. ``batch_size == 0`` → all pending.""" 934 if batch_size < 0: 935 raise ValueError("batch_size must be >= 0 (use 0 for all pending files in one run)") 936 if pending_count < 0: 937 raise ValueError("pending_count invalid") 938 if batch_size == BATCH_SIZE_ALL: 939 return pending_count 940 return min(batch_size, pending_count) 941 942 943 def run_review_batch( 944 root: Path, 945 api_key: str, 946 *, 947 batch_size: int = 10, 948 model: str = DEFAULT_MODEL, 949 include: tuple[str, ...] = (), 950 exclude: tuple[str, ...] = (), 951 max_file_bytes: int = 512_000, 952 delay_ms: int = 0, 953 dry_run: bool = False, 954 reset_progress: bool = False, 955 output_path: Path | None = None, 956 progress: ProgressCallback = None, 957 cancel_event: threading.Event | None = None, 958 review_markdown: bool = False, 959 ) -> RunResult: 960 """ 961 Process up to ``batch_size`` pending files; persist state under .venice_review/. 962 Use ``batch_size=0`` to process every pending file in this run (full remaining folder). 963 964 By default, markdown/rst and paths under ``doc`` / ``docs`` / ``documentation`` are skipped; 965 pass ``review_markdown=True`` (or CLI ``--review-markdown``) to include them. 966 """ 967 result = RunResult() 968 root = normalize_root(root) 969 if not root.is_dir(): 970 raise FileNotFoundError(f"Not a directory: {root}") 971 972 max_file_bytes = max(MIN_FILE_BYTES, min(max_file_bytes, MAX_FILE_BYTES_CAP)) 973 974 discovered = discover_files( 975 root, 976 include=include, 977 exclude=exclude, 978 apply_default_doc_excludes=not review_markdown, 979 ) 980 fp = compute_fingerprint(discovered, include, exclude) 981 982 state = load_state(root) 983 out = _resolve_report_path(root, output_path, state) 984 result.output_path = out 985 if state is None or reset_progress: 986 state = ReviewState(root_abs=str(root), fingerprint=fp, completed_paths=[], output_name=out.name) 987 else: 988 if state.fingerprint != fp: 989 result.fingerprint_warning = ( 990 "Discovery fingerprint changed (new files or filters). " 991 "Merging: keeping completed paths that still exist; new paths become pending." 992 ) 993 if progress: 994 progress("warning", {"message": result.fingerprint_warning}) 995 # merge completed with still-valid paths 996 valid = set(discovered) 997 state.completed_paths = [p for p in state.completed_paths if p in valid] 998 state.fingerprint = fp 999 state.root_abs = str(root) 1000 state.output_name = out.name 1001 1002 pending = [p for p in discovered if p not in set(state.completed_paths)] 1003 result.remaining = len(pending) 1004 1005 n_this_run = effective_batch_size(batch_size, len(pending)) 1006 batch_note = "all pending files" if batch_size == BATCH_SIZE_ALL else str(batch_size) 1007 1008 if dry_run: 1009 est = n_this_run 1010 result.log_lines.append(f"[dry-run] Would review {est} file(s) this batch; {len(pending)} pending total.") 1011 result.log_lines.append(f"[dry-run] Output: {out}") 1012 for rel in pending[:n_this_run]: 1013 p = root / rel 1014 sz = p.stat().st_size if p.is_file() else 0 1015 result.log_lines.append(f" {rel} ({sz} bytes)") 1016 rest = len(pending) - est 1017 if rest > 0: 1018 result.log_lines.append(f" ... {rest} more file(s) pending in later batches") 1019 pt, ct = total_usage_tokens(state) 1020 result.usage_prompt_tokens_total = pt 1021 result.usage_completion_tokens_total = ct 1022 return result 1023 1024 if not pending: 1025 result.complete = True 1026 result.log_lines.append( 1027 "Nothing pending; review complete for this project. " 1028 f"The report file was not modified: `{out}` " 1029 "(every discovered file is already marked reviewed in .venice_review/state.json). " 1030 "To re-review from scratch, use --reset-progress." 1031 ) 1032 pt, ct = total_usage_tokens(state) 1033 result.usage_prompt_tokens_total = pt 1034 result.usage_completion_tokens_total = ct 1035 try: 1036 _c = make_openai_client(api_key) 1037 _pm = fetch_text_models_pricing_usd(_c) 1038 result.usage_estimated_usd_project = estimate_project_spend_usd(_pm, state) 1039 except Exception: 1040 result.usage_estimated_usd_project = None 1041 save_state(root, state) 1042 return result 1043 1044 # Fresh full re-run: avoid duplicating sections by appending to an old report. 1045 if reset_progress and out.exists(): 1046 out.unlink() 1047 result.log_lines.append(f"Removed previous report (fresh start with --reset-progress): {out}") 1048 1049 to_process = pending[:n_this_run] 1050 client = make_openai_client(api_key) 1051 pricing_map = fetch_text_models_pricing_usd(client) 1052 run_prompt = 0 1053 run_completion = 0 1054 1055 header_if_new = "\n".join( 1056 [ 1057 "# Venice code review", 1058 "", 1059 f"- Root: `{root}`", 1060 f"- Model: `{model}`", 1061 f"- Batch size this run: {batch_note}", 1062 "", 1063 "> Generated by autoreview. Feed this file to your editor AI to triage changes.", 1064 "", 1065 ] 1066 ) 1067 1068 save_every = _state_save_interval() 1069 since_save = 0 1070 run_banner_appended = False 1071 1072 def flush_state() -> None: 1073 nonlocal since_save 1074 save_state(root, state) 1075 since_save = 0 1076 1077 for rel in to_process: 1078 if cancel_event is not None and cancel_event.is_set(): 1079 if since_save > 0: 1080 flush_state() 1081 result.cancelled = True 1082 result.log_lines.append("Stopped by user; progress saved.") 1083 if progress: 1084 progress("cancelled", {}) 1085 break 1086 1087 path = root / rel 1088 text, skip = read_file_limited(path, max_file_bytes) 1089 if text is None: 1090 msg = f"Skip `{rel}`: {skip}" 1091 result.log_lines.append(msg) 1092 if progress: 1093 progress("skip", {"path": rel, "reason": skip or "unknown"}) 1094 state.completed_paths.append(rel) 1095 since_save += 1 1096 if since_save >= save_every: 1097 flush_state() 1098 continue 1099 1100 messages = [ 1101 {"role": "system", "content": SYSTEM_PROMPT}, 1102 {"role": "user", "content": build_user_message(rel, text)}, 1103 ] 1104 if progress: 1105 progress("file_start", {"path": rel}) 1106 try: 1107 raw_reply, comp_usage = completion_with_retry(client, model, messages) 1108 merge_completion_usage(state, model, comp_usage) 1109 run_prompt += int(comp_usage.get("prompt_tokens") or 0) 1110 run_completion += int(comp_usage.get("completion_tokens") or 0) 1111 data = validate_review_payload(parse_review_json(raw_reply)) 1112 except json.JSONDecodeError as e: 1113 result.log_lines.append(f"JSON parse error for `{rel}`: {e}; storing raw response in report.") 1114 data = validate_review_payload( 1115 { 1116 "security": "", 1117 "code_quality": "", 1118 "structure": "", 1119 "performance": "", 1120 "testing_observability": f"(parse error) {e}", 1121 "suggestions": [{"severity": "medium", "detail": raw_reply[:2000]}], 1122 } 1123 ) 1124 except Exception as e: 1125 result.log_lines.append(f"API error for `{rel}`: {e}") 1126 if progress: 1127 progress("error", {"path": rel, "error": str(e)}) 1128 raise 1129 1130 section = json_to_markdown(rel, data) 1131 if not run_banner_appended: 1132 chunk = _format_run_banner(model, batch_note, to_process) + "\n" + section 1133 _append_report(out, chunk, header_if_new) 1134 run_banner_appended = True 1135 else: 1136 _append_report(out, section, "") 1137 state.completed_paths.append(rel) 1138 since_save += 1 1139 if since_save >= save_every: 1140 flush_state() 1141 result.processed += 1 1142 if progress: 1143 progress("file_done", {"path": rel}) 1144 pt, ct = total_usage_tokens(state) 1145 usd = estimate_project_spend_usd(pricing_map, state) 1146 if usd is None: 1147 msg = ( 1148 f"This project: {pt:,} prompt + {ct:,} completion tokens " 1149 f"(this run +{run_prompt:,} / +{run_completion:,})." 1150 ) 1151 else: 1152 msg = ( 1153 f"This project: est. ~${usd:.4f} USD · {pt:,} prompt + {ct:,} completion · " 1154 f"this run +{run_prompt:,} / +{run_completion:,}" 1155 ) 1156 progress("usage", {"message": msg}) 1157 1158 if delay_ms > 0: 1159 time.sleep(delay_ms / 1000.0) 1160 1161 if since_save > 0: 1162 flush_state() 1163 1164 if to_process and result.processed == 0 and not result.cancelled: 1165 result.log_lines.append( 1166 "No new sections were written to the report this run: every path in this batch was skipped " 1167 "(binary, over the size limit, read timeout, or encoding). Those paths are still marked complete." 1168 ) 1169 1170 still = [p for p in discovered if p not in set(state.completed_paths)] 1171 result.remaining = len(still) 1172 result.complete = result.remaining == 0 and not result.cancelled 1173 result.log_lines.append(f"Processed {result.processed} file(s); {result.remaining} remaining.") 1174 pt, ct = total_usage_tokens(state) 1175 result.usage_prompt_tokens_total = pt 1176 result.usage_completion_tokens_total = ct 1177 result.usage_prompt_tokens_run = run_prompt 1178 result.usage_completion_tokens_run = run_completion 1179 result.usage_estimated_usd_project = estimate_project_spend_usd(pricing_map, state) 1180 if result.usage_estimated_usd_project is not None: 1181 result.log_lines.append( 1182 f"Estimated project spend: ~${result.usage_estimated_usd_project:.4f} USD " 1183 f"({pt:,} prompt + {ct:,} completion tokens)." 1184 ) 1185 elif pt or ct: 1186 result.log_lines.append(f"Token usage (this project): {pt:,} prompt + {ct:,} completion.") 1187 return result