patch_parser.py
1 #!/usr/bin/env python3 2 """ 3 V4A Patch Format Parser 4 5 Parses the V4A patch format used by codex, cline, and other coding agents. 6 7 V4A Format: 8 *** Begin Patch 9 *** Update File: path/to/file.py 10 @@ optional context hint @@ 11 context line (space prefix) 12 -removed line (minus prefix) 13 +added line (plus prefix) 14 *** Add File: path/to/new.py 15 +new file content 16 +line 2 17 *** Delete File: path/to/old.py 18 *** Move File: old/path.py -> new/path.py 19 *** End Patch 20 21 Usage: 22 from tools.patch_parser import parse_v4a_patch, apply_v4a_operations 23 24 operations, error = parse_v4a_patch(patch_content) 25 if error: 26 print(f"Parse error: {error}") 27 else: 28 result = apply_v4a_operations(operations, file_ops) 29 """ 30 31 import difflib 32 import re 33 from dataclasses import dataclass, field 34 from typing import List, Optional, Tuple, Any 35 from enum import Enum 36 37 38 class OperationType(Enum): 39 ADD = "add" 40 UPDATE = "update" 41 DELETE = "delete" 42 MOVE = "move" 43 44 45 @dataclass 46 class HunkLine: 47 """A single line in a patch hunk.""" 48 prefix: str # ' ', '-', or '+' 49 content: str 50 51 52 @dataclass 53 class Hunk: 54 """A group of changes within a file.""" 55 context_hint: Optional[str] = None 56 lines: List[HunkLine] = field(default_factory=list) 57 58 59 @dataclass 60 class PatchOperation: 61 """A single operation in a V4A patch.""" 62 operation: OperationType 63 file_path: str 64 new_path: Optional[str] = None # For move operations 65 hunks: List[Hunk] = field(default_factory=list) 66 content: Optional[str] = None # For add file operations 67 68 69 def parse_v4a_patch(patch_content: str) -> Tuple[List[PatchOperation], Optional[str]]: 70 """ 71 Parse a V4A format patch. 72 73 Args: 74 patch_content: The patch text in V4A format 75 76 Returns: 77 Tuple of (operations, error_message) 78 - If successful: (list_of_operations, None) 79 - If failed: ([], error_description) 80 """ 81 lines = patch_content.split('\n') 82 operations: List[PatchOperation] = [] 83 84 # Find patch boundaries 85 start_idx = None 86 end_idx = None 87 88 for i, line in enumerate(lines): 89 if '*** Begin Patch' in line or '***Begin Patch' in line: 90 start_idx = i 91 elif '*** End Patch' in line or '***End Patch' in line: 92 end_idx = i 93 break 94 95 if start_idx is None: 96 # Try to parse without explicit begin marker 97 start_idx = -1 98 99 if end_idx is None: 100 end_idx = len(lines) 101 102 # Parse operations between boundaries 103 i = start_idx + 1 104 current_op: Optional[PatchOperation] = None 105 current_hunk: Optional[Hunk] = None 106 107 while i < end_idx: 108 line = lines[i] 109 110 # Check for file operation markers 111 update_match = re.match(r'\*\*\*\s*Update\s+File:\s*(.+)', line) 112 add_match = re.match(r'\*\*\*\s*Add\s+File:\s*(.+)', line) 113 delete_match = re.match(r'\*\*\*\s*Delete\s+File:\s*(.+)', line) 114 move_match = re.match(r'\*\*\*\s*Move\s+File:\s*(.+?)\s*->\s*(.+)', line) 115 116 if update_match: 117 # Save previous operation 118 if current_op: 119 if current_hunk and current_hunk.lines: 120 current_op.hunks.append(current_hunk) 121 operations.append(current_op) 122 123 current_op = PatchOperation( 124 operation=OperationType.UPDATE, 125 file_path=update_match.group(1).strip() 126 ) 127 current_hunk = None 128 129 elif add_match: 130 if current_op: 131 if current_hunk and current_hunk.lines: 132 current_op.hunks.append(current_hunk) 133 operations.append(current_op) 134 135 current_op = PatchOperation( 136 operation=OperationType.ADD, 137 file_path=add_match.group(1).strip() 138 ) 139 current_hunk = Hunk() 140 141 elif delete_match: 142 if current_op: 143 if current_hunk and current_hunk.lines: 144 current_op.hunks.append(current_hunk) 145 operations.append(current_op) 146 147 current_op = PatchOperation( 148 operation=OperationType.DELETE, 149 file_path=delete_match.group(1).strip() 150 ) 151 operations.append(current_op) 152 current_op = None 153 current_hunk = None 154 155 elif move_match: 156 if current_op: 157 if current_hunk and current_hunk.lines: 158 current_op.hunks.append(current_hunk) 159 operations.append(current_op) 160 161 current_op = PatchOperation( 162 operation=OperationType.MOVE, 163 file_path=move_match.group(1).strip(), 164 new_path=move_match.group(2).strip() 165 ) 166 operations.append(current_op) 167 current_op = None 168 current_hunk = None 169 170 elif line.startswith('@@'): 171 # Context hint / hunk marker 172 if current_op: 173 if current_hunk and current_hunk.lines: 174 current_op.hunks.append(current_hunk) 175 176 # Extract context hint 177 hint_match = re.match(r'@@\s*(.+?)\s*@@', line) 178 hint = hint_match.group(1) if hint_match else None 179 current_hunk = Hunk(context_hint=hint) 180 181 elif current_op and line: 182 # Parse hunk line 183 if current_hunk is None: 184 current_hunk = Hunk() 185 186 if line.startswith('+'): 187 current_hunk.lines.append(HunkLine('+', line[1:])) 188 elif line.startswith('-'): 189 current_hunk.lines.append(HunkLine('-', line[1:])) 190 elif line.startswith(' '): 191 current_hunk.lines.append(HunkLine(' ', line[1:])) 192 elif line.startswith('\\'): 193 # "\ No newline at end of file" marker - skip 194 pass 195 else: 196 # Treat as context line (implicit space prefix) 197 current_hunk.lines.append(HunkLine(' ', line)) 198 199 i += 1 200 201 # Don't forget the last operation 202 if current_op: 203 if current_hunk and current_hunk.lines: 204 current_op.hunks.append(current_hunk) 205 operations.append(current_op) 206 207 # Validate the parsed result 208 if not operations: 209 # Empty patch is not an error — callers get [] and can decide 210 return operations, None 211 212 parse_errors: List[str] = [] 213 for op in operations: 214 if not op.file_path: 215 parse_errors.append("Operation with empty file path") 216 if op.operation == OperationType.UPDATE and not op.hunks: 217 parse_errors.append(f"UPDATE {op.file_path!r}: no hunks found") 218 if op.operation == OperationType.MOVE and not op.new_path: 219 parse_errors.append(f"MOVE {op.file_path!r}: missing destination path (expected 'src -> dst')") 220 221 if parse_errors: 222 return [], "Parse error: " + "; ".join(parse_errors) 223 224 return operations, None 225 226 227 def _count_occurrences(text: str, pattern: str) -> int: 228 """Count non-overlapping occurrences of *pattern* in *text*.""" 229 count = 0 230 start = 0 231 while True: 232 pos = text.find(pattern, start) 233 if pos == -1: 234 break 235 count += 1 236 start = pos + 1 237 return count 238 239 240 def _validate_operations( 241 operations: List[PatchOperation], 242 file_ops: Any, 243 ) -> List[str]: 244 """Validate all operations without writing any files. 245 246 Returns a list of error strings; an empty list means all operations 247 are valid and the apply phase can proceed safely. 248 249 For UPDATE operations, hunks are simulated in order so that later 250 hunks validate against post-earlier-hunk content (matching apply order). 251 """ 252 # Deferred import: breaks the patch_parser ↔ fuzzy_match circular dependency 253 from tools.fuzzy_match import fuzzy_find_and_replace 254 255 errors: List[str] = [] 256 257 for op in operations: 258 if op.operation == OperationType.UPDATE: 259 read_result = file_ops.read_file_raw(op.file_path) 260 if read_result.error: 261 errors.append(f"{op.file_path}: {read_result.error}") 262 continue 263 264 simulated = read_result.content 265 for hunk in op.hunks: 266 search_lines = [l.content for l in hunk.lines if l.prefix in (' ', '-')] 267 if not search_lines: 268 # Addition-only hunk: validate context hint uniqueness 269 if hunk.context_hint: 270 occurrences = _count_occurrences(simulated, hunk.context_hint) 271 if occurrences == 0: 272 errors.append( 273 f"{op.file_path}: addition-only hunk context hint " 274 f"'{hunk.context_hint}' not found" 275 ) 276 elif occurrences > 1: 277 errors.append( 278 f"{op.file_path}: addition-only hunk context hint " 279 f"'{hunk.context_hint}' is ambiguous " 280 f"({occurrences} occurrences)" 281 ) 282 continue 283 284 search_pattern = '\n'.join(search_lines) 285 replace_lines = [l.content for l in hunk.lines if l.prefix in (' ', '+')] 286 replacement = '\n'.join(replace_lines) 287 288 new_simulated, count, _strategy, match_error = fuzzy_find_and_replace( 289 simulated, search_pattern, replacement, replace_all=False 290 ) 291 if count == 0: 292 label = f"'{hunk.context_hint}'" if hunk.context_hint else "(no hint)" 293 msg = ( 294 f"{op.file_path}: hunk {label} not found" 295 + (f" — {match_error}" if match_error else "") 296 ) 297 try: 298 from tools.fuzzy_match import format_no_match_hint 299 msg += format_no_match_hint(match_error, count, search_pattern, simulated) 300 except Exception: 301 pass 302 errors.append(msg) 303 else: 304 # Advance simulation so subsequent hunks validate correctly. 305 # Reuse the result from the call above — no second fuzzy run. 306 simulated = new_simulated 307 308 elif op.operation == OperationType.DELETE: 309 read_result = file_ops.read_file_raw(op.file_path) 310 if read_result.error: 311 errors.append(f"{op.file_path}: file not found for deletion") 312 313 elif op.operation == OperationType.MOVE: 314 if not op.new_path: 315 errors.append(f"{op.file_path}: MOVE operation missing destination path") 316 continue 317 src_result = file_ops.read_file_raw(op.file_path) 318 if src_result.error: 319 errors.append(f"{op.file_path}: source file not found for move") 320 dst_result = file_ops.read_file_raw(op.new_path) 321 if not dst_result.error: 322 errors.append( 323 f"{op.new_path}: destination already exists — move would overwrite" 324 ) 325 326 # ADD: parent directory creation handled by write_file; no pre-check needed. 327 328 return errors 329 330 331 def apply_v4a_operations(operations: List[PatchOperation], 332 file_ops: Any) -> 'PatchResult': 333 """Apply V4A patch operations using a file operations interface. 334 335 Uses a two-phase validate-then-apply approach: 336 - Phase 1: validate all operations against current file contents without 337 writing anything. If any validation error is found, return immediately 338 with no filesystem changes. 339 - Phase 2: apply all operations. A failure here (e.g. a race between 340 validation and apply) is reported with a note to run ``git diff``. 341 342 Args: 343 operations: List of PatchOperation from parse_v4a_patch 344 file_ops: Object with read_file_raw, write_file methods 345 346 Returns: 347 PatchResult with results of all operations 348 """ 349 # Import here to avoid circular imports 350 from tools.file_operations import PatchResult 351 352 # ---- Phase 1: validate ---- 353 validation_errors = _validate_operations(operations, file_ops) 354 if validation_errors: 355 return PatchResult( 356 success=False, 357 error="Patch validation failed (no files were modified):\n" 358 + "\n".join(f" • {e}" for e in validation_errors), 359 ) 360 361 # ---- Phase 2: apply ---- 362 files_modified = [] 363 files_created = [] 364 files_deleted = [] 365 all_diffs = [] 366 errors = [] 367 368 for op in operations: 369 try: 370 if op.operation == OperationType.ADD: 371 result = _apply_add(op, file_ops) 372 if result[0]: 373 files_created.append(op.file_path) 374 all_diffs.append(result[1]) 375 else: 376 errors.append(f"Failed to add {op.file_path}: {result[1]}") 377 378 elif op.operation == OperationType.DELETE: 379 result = _apply_delete(op, file_ops) 380 if result[0]: 381 files_deleted.append(op.file_path) 382 all_diffs.append(result[1]) 383 else: 384 errors.append(f"Failed to delete {op.file_path}: {result[1]}") 385 386 elif op.operation == OperationType.MOVE: 387 result = _apply_move(op, file_ops) 388 if result[0]: 389 files_modified.append(f"{op.file_path} -> {op.new_path}") 390 all_diffs.append(result[1]) 391 else: 392 errors.append(f"Failed to move {op.file_path}: {result[1]}") 393 394 elif op.operation == OperationType.UPDATE: 395 result = _apply_update(op, file_ops) 396 if result[0]: 397 files_modified.append(op.file_path) 398 all_diffs.append(result[1]) 399 else: 400 errors.append(f"Failed to update {op.file_path}: {result[1]}") 401 402 except Exception as e: 403 errors.append(f"Error processing {op.file_path}: {str(e)}") 404 405 # Run lint on all modified/created files 406 lint_results = {} 407 for f in files_modified + files_created: 408 if hasattr(file_ops, '_check_lint'): 409 lint_result = file_ops._check_lint(f) 410 lint_results[f] = lint_result.to_dict() 411 412 combined_diff = '\n'.join(all_diffs) 413 414 if errors: 415 return PatchResult( 416 success=False, 417 diff=combined_diff, 418 files_modified=files_modified, 419 files_created=files_created, 420 files_deleted=files_deleted, 421 lint=lint_results if lint_results else None, 422 error="Apply phase failed (state may be inconsistent — run `git diff` to assess):\n" 423 + "\n".join(f" • {e}" for e in errors), 424 ) 425 426 return PatchResult( 427 success=True, 428 diff=combined_diff, 429 files_modified=files_modified, 430 files_created=files_created, 431 files_deleted=files_deleted, 432 lint=lint_results if lint_results else None, 433 ) 434 435 436 def _apply_add(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: 437 """Apply an add file operation.""" 438 # Extract content from hunks (all + lines) 439 content_lines = [] 440 for hunk in op.hunks: 441 for line in hunk.lines: 442 if line.prefix == '+': 443 content_lines.append(line.content) 444 445 content = '\n'.join(content_lines) 446 447 result = file_ops.write_file(op.file_path, content) 448 if result.error: 449 return False, result.error 450 451 diff = f"--- /dev/null\n+++ b/{op.file_path}\n" 452 diff += '\n'.join(f"+{line}" for line in content_lines) 453 454 return True, diff 455 456 457 def _apply_delete(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: 458 """Apply a delete file operation.""" 459 # Read before deleting so we can produce a real unified diff. 460 # Validation already confirmed existence; this guards against races. 461 read_result = file_ops.read_file_raw(op.file_path) 462 if read_result.error: 463 return False, f"Cannot delete {op.file_path}: file not found" 464 465 result = file_ops.delete_file(op.file_path) 466 if result.error: 467 return False, result.error 468 469 removed_lines = read_result.content.splitlines(keepends=True) 470 diff = ''.join(difflib.unified_diff( 471 removed_lines, [], 472 fromfile=f"a/{op.file_path}", 473 tofile="/dev/null", 474 )) 475 return True, diff or f"# Deleted: {op.file_path}" 476 477 478 def _apply_move(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: 479 """Apply a move file operation.""" 480 result = file_ops.move_file(op.file_path, op.new_path) 481 if result.error: 482 return False, result.error 483 484 diff = f"# Moved: {op.file_path} -> {op.new_path}" 485 return True, diff 486 487 488 def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: 489 """Apply an update file operation.""" 490 # Deferred import: breaks the patch_parser ↔ fuzzy_match circular dependency 491 from tools.fuzzy_match import fuzzy_find_and_replace 492 493 # Read current content — raw so no line-number prefixes or per-line truncation 494 read_result = file_ops.read_file_raw(op.file_path) 495 496 if read_result.error: 497 return False, f"Cannot read file: {read_result.error}" 498 499 current_content = read_result.content 500 501 # Apply each hunk 502 new_content = current_content 503 504 for hunk in op.hunks: 505 # Build search pattern from context and removed lines 506 search_lines = [] 507 replace_lines = [] 508 509 for line in hunk.lines: 510 if line.prefix == ' ': 511 search_lines.append(line.content) 512 replace_lines.append(line.content) 513 elif line.prefix == '-': 514 search_lines.append(line.content) 515 elif line.prefix == '+': 516 replace_lines.append(line.content) 517 518 if search_lines: 519 search_pattern = '\n'.join(search_lines) 520 replacement = '\n'.join(replace_lines) 521 522 new_content, count, _strategy, error = fuzzy_find_and_replace( 523 new_content, search_pattern, replacement, replace_all=False 524 ) 525 526 if error and count == 0: 527 # Try with context hint if available 528 if hunk.context_hint: 529 # Find the context hint location and search nearby 530 hint_pos = new_content.find(hunk.context_hint) 531 if hint_pos != -1: 532 # Search in a window around the hint 533 window_start = max(0, hint_pos - 500) 534 window_end = min(len(new_content), hint_pos + 2000) 535 window = new_content[window_start:window_end] 536 537 window_new, count, _strategy, error = fuzzy_find_and_replace( 538 window, search_pattern, replacement, replace_all=False 539 ) 540 541 if count > 0: 542 new_content = new_content[:window_start] + window_new + new_content[window_end:] 543 error = None 544 545 if error: 546 err_msg = f"Could not apply hunk: {error}" 547 try: 548 from tools.fuzzy_match import format_no_match_hint 549 err_msg += format_no_match_hint(error, 0, search_pattern, new_content) 550 except Exception: 551 pass 552 return False, err_msg 553 else: 554 # Addition-only hunk (no context or removed lines). 555 # Insert at the location indicated by the context hint, or at end of file. 556 insert_text = '\n'.join(replace_lines) 557 if hunk.context_hint: 558 occurrences = _count_occurrences(new_content, hunk.context_hint) 559 if occurrences == 0: 560 # Hint not found — append at end as a safe fallback 561 new_content = new_content.rstrip('\n') + '\n' + insert_text + '\n' 562 elif occurrences > 1: 563 return False, ( 564 f"Addition-only hunk: context hint '{hunk.context_hint}' is ambiguous " 565 f"({occurrences} occurrences) — provide a more unique hint" 566 ) 567 else: 568 hint_pos = new_content.find(hunk.context_hint) 569 # Insert after the line containing the context hint 570 eol = new_content.find('\n', hint_pos) 571 if eol != -1: 572 new_content = new_content[:eol + 1] + insert_text + '\n' + new_content[eol + 1:] 573 else: 574 new_content = new_content + '\n' + insert_text 575 else: 576 new_content = new_content.rstrip('\n') + '\n' + insert_text + '\n' 577 578 # Write new content 579 write_result = file_ops.write_file(op.file_path, new_content) 580 if write_result.error: 581 return False, write_result.error 582 583 # Generate diff 584 diff_lines = difflib.unified_diff( 585 current_content.splitlines(keepends=True), 586 new_content.splitlines(keepends=True), 587 fromfile=f"a/{op.file_path}", 588 tofile=f"b/{op.file_path}" 589 ) 590 diff = ''.join(diff_lines) 591 592 return True, diff