/ tools / patch_parser.py
patch_parser.py
  1  #!/usr/bin/env python3
  2  """
  3  V4A Patch Format Parser
  4  
  5  Parses the V4A patch format used by codex, cline, and other coding agents.
  6  
  7  V4A Format:
  8      *** Begin Patch
  9      *** Update File: path/to/file.py
 10      @@ optional context hint @@
 11       context line (space prefix)
 12      -removed line (minus prefix)
 13      +added line (plus prefix)
 14      *** Add File: path/to/new.py
 15      +new file content
 16      +line 2
 17      *** Delete File: path/to/old.py
 18      *** Move File: old/path.py -> new/path.py
 19      *** End Patch
 20  
 21  Usage:
 22      from tools.patch_parser import parse_v4a_patch, apply_v4a_operations
 23      
 24      operations, error = parse_v4a_patch(patch_content)
 25      if error:
 26          print(f"Parse error: {error}")
 27      else:
 28          result = apply_v4a_operations(operations, file_ops)
 29  """
 30  
 31  import difflib
 32  import re
 33  from dataclasses import dataclass, field
 34  from typing import List, Optional, Tuple, Any
 35  from enum import Enum
 36  
 37  
 38  class OperationType(Enum):
 39      ADD = "add"
 40      UPDATE = "update"
 41      DELETE = "delete"
 42      MOVE = "move"
 43  
 44  
 45  @dataclass
 46  class HunkLine:
 47      """A single line in a patch hunk."""
 48      prefix: str  # ' ', '-', or '+'
 49      content: str
 50  
 51  
 52  @dataclass
 53  class Hunk:
 54      """A group of changes within a file."""
 55      context_hint: Optional[str] = None
 56      lines: List[HunkLine] = field(default_factory=list)
 57  
 58  
 59  @dataclass
 60  class PatchOperation:
 61      """A single operation in a V4A patch."""
 62      operation: OperationType
 63      file_path: str
 64      new_path: Optional[str] = None  # For move operations
 65      hunks: List[Hunk] = field(default_factory=list)
 66      content: Optional[str] = None  # For add file operations
 67  
 68  
 69  def parse_v4a_patch(patch_content: str) -> Tuple[List[PatchOperation], Optional[str]]:
 70      """
 71      Parse a V4A format patch.
 72      
 73      Args:
 74          patch_content: The patch text in V4A format
 75      
 76      Returns:
 77          Tuple of (operations, error_message)
 78          - If successful: (list_of_operations, None)
 79          - If failed: ([], error_description)
 80      """
 81      lines = patch_content.split('\n')
 82      operations: List[PatchOperation] = []
 83      
 84      # Find patch boundaries
 85      start_idx = None
 86      end_idx = None
 87      
 88      for i, line in enumerate(lines):
 89          if '*** Begin Patch' in line or '***Begin Patch' in line:
 90              start_idx = i
 91          elif '*** End Patch' in line or '***End Patch' in line:
 92              end_idx = i
 93              break
 94      
 95      if start_idx is None:
 96          # Try to parse without explicit begin marker
 97          start_idx = -1
 98      
 99      if end_idx is None:
100          end_idx = len(lines)
101      
102      # Parse operations between boundaries
103      i = start_idx + 1
104      current_op: Optional[PatchOperation] = None
105      current_hunk: Optional[Hunk] = None
106      
107      while i < end_idx:
108          line = lines[i]
109          
110          # Check for file operation markers
111          update_match = re.match(r'\*\*\*\s*Update\s+File:\s*(.+)', line)
112          add_match = re.match(r'\*\*\*\s*Add\s+File:\s*(.+)', line)
113          delete_match = re.match(r'\*\*\*\s*Delete\s+File:\s*(.+)', line)
114          move_match = re.match(r'\*\*\*\s*Move\s+File:\s*(.+?)\s*->\s*(.+)', line)
115          
116          if update_match:
117              # Save previous operation
118              if current_op:
119                  if current_hunk and current_hunk.lines:
120                      current_op.hunks.append(current_hunk)
121                  operations.append(current_op)
122              
123              current_op = PatchOperation(
124                  operation=OperationType.UPDATE,
125                  file_path=update_match.group(1).strip()
126              )
127              current_hunk = None
128              
129          elif add_match:
130              if current_op:
131                  if current_hunk and current_hunk.lines:
132                      current_op.hunks.append(current_hunk)
133                  operations.append(current_op)
134              
135              current_op = PatchOperation(
136                  operation=OperationType.ADD,
137                  file_path=add_match.group(1).strip()
138              )
139              current_hunk = Hunk()
140              
141          elif delete_match:
142              if current_op:
143                  if current_hunk and current_hunk.lines:
144                      current_op.hunks.append(current_hunk)
145                  operations.append(current_op)
146              
147              current_op = PatchOperation(
148                  operation=OperationType.DELETE,
149                  file_path=delete_match.group(1).strip()
150              )
151              operations.append(current_op)
152              current_op = None
153              current_hunk = None
154              
155          elif move_match:
156              if current_op:
157                  if current_hunk and current_hunk.lines:
158                      current_op.hunks.append(current_hunk)
159                  operations.append(current_op)
160              
161              current_op = PatchOperation(
162                  operation=OperationType.MOVE,
163                  file_path=move_match.group(1).strip(),
164                  new_path=move_match.group(2).strip()
165              )
166              operations.append(current_op)
167              current_op = None
168              current_hunk = None
169              
170          elif line.startswith('@@'):
171              # Context hint / hunk marker
172              if current_op:
173                  if current_hunk and current_hunk.lines:
174                      current_op.hunks.append(current_hunk)
175                  
176                  # Extract context hint
177                  hint_match = re.match(r'@@\s*(.+?)\s*@@', line)
178                  hint = hint_match.group(1) if hint_match else None
179                  current_hunk = Hunk(context_hint=hint)
180                  
181          elif current_op and line:
182              # Parse hunk line
183              if current_hunk is None:
184                  current_hunk = Hunk()
185              
186              if line.startswith('+'):
187                  current_hunk.lines.append(HunkLine('+', line[1:]))
188              elif line.startswith('-'):
189                  current_hunk.lines.append(HunkLine('-', line[1:]))
190              elif line.startswith(' '):
191                  current_hunk.lines.append(HunkLine(' ', line[1:]))
192              elif line.startswith('\\'):
193                  # "\ No newline at end of file" marker - skip
194                  pass
195              else:
196                  # Treat as context line (implicit space prefix)
197                  current_hunk.lines.append(HunkLine(' ', line))
198          
199          i += 1
200      
201      # Don't forget the last operation
202      if current_op:
203          if current_hunk and current_hunk.lines:
204              current_op.hunks.append(current_hunk)
205          operations.append(current_op)
206  
207      # Validate the parsed result
208      if not operations:
209          # Empty patch is not an error — callers get [] and can decide
210          return operations, None
211  
212      parse_errors: List[str] = []
213      for op in operations:
214          if not op.file_path:
215              parse_errors.append("Operation with empty file path")
216          if op.operation == OperationType.UPDATE and not op.hunks:
217              parse_errors.append(f"UPDATE {op.file_path!r}: no hunks found")
218          if op.operation == OperationType.MOVE and not op.new_path:
219              parse_errors.append(f"MOVE {op.file_path!r}: missing destination path (expected 'src -> dst')")
220  
221      if parse_errors:
222          return [], "Parse error: " + "; ".join(parse_errors)
223  
224      return operations, None
225  
226  
227  def _count_occurrences(text: str, pattern: str) -> int:
228      """Count non-overlapping occurrences of *pattern* in *text*."""
229      count = 0
230      start = 0
231      while True:
232          pos = text.find(pattern, start)
233          if pos == -1:
234              break
235          count += 1
236          start = pos + 1
237      return count
238  
239  
240  def _validate_operations(
241      operations: List[PatchOperation],
242      file_ops: Any,
243  ) -> List[str]:
244      """Validate all operations without writing any files.
245  
246      Returns a list of error strings; an empty list means all operations
247      are valid and the apply phase can proceed safely.
248  
249      For UPDATE operations, hunks are simulated in order so that later
250      hunks validate against post-earlier-hunk content (matching apply order).
251      """
252      # Deferred import: breaks the patch_parser ↔ fuzzy_match circular dependency
253      from tools.fuzzy_match import fuzzy_find_and_replace
254  
255      errors: List[str] = []
256  
257      for op in operations:
258          if op.operation == OperationType.UPDATE:
259              read_result = file_ops.read_file_raw(op.file_path)
260              if read_result.error:
261                  errors.append(f"{op.file_path}: {read_result.error}")
262                  continue
263  
264              simulated = read_result.content
265              for hunk in op.hunks:
266                  search_lines = [l.content for l in hunk.lines if l.prefix in (' ', '-')]
267                  if not search_lines:
268                      # Addition-only hunk: validate context hint uniqueness
269                      if hunk.context_hint:
270                          occurrences = _count_occurrences(simulated, hunk.context_hint)
271                          if occurrences == 0:
272                              errors.append(
273                                  f"{op.file_path}: addition-only hunk context hint "
274                                  f"'{hunk.context_hint}' not found"
275                              )
276                          elif occurrences > 1:
277                              errors.append(
278                                  f"{op.file_path}: addition-only hunk context hint "
279                                  f"'{hunk.context_hint}' is ambiguous "
280                                  f"({occurrences} occurrences)"
281                              )
282                      continue
283  
284                  search_pattern = '\n'.join(search_lines)
285                  replace_lines = [l.content for l in hunk.lines if l.prefix in (' ', '+')]
286                  replacement = '\n'.join(replace_lines)
287  
288                  new_simulated, count, _strategy, match_error = fuzzy_find_and_replace(
289                      simulated, search_pattern, replacement, replace_all=False
290                  )
291                  if count == 0:
292                      label = f"'{hunk.context_hint}'" if hunk.context_hint else "(no hint)"
293                      msg = (
294                          f"{op.file_path}: hunk {label} not found"
295                          + (f" — {match_error}" if match_error else "")
296                      )
297                      try:
298                          from tools.fuzzy_match import format_no_match_hint
299                          msg += format_no_match_hint(match_error, count, search_pattern, simulated)
300                      except Exception:
301                          pass
302                      errors.append(msg)
303                  else:
304                      # Advance simulation so subsequent hunks validate correctly.
305                      # Reuse the result from the call above — no second fuzzy run.
306                      simulated = new_simulated
307  
308          elif op.operation == OperationType.DELETE:
309              read_result = file_ops.read_file_raw(op.file_path)
310              if read_result.error:
311                  errors.append(f"{op.file_path}: file not found for deletion")
312  
313          elif op.operation == OperationType.MOVE:
314              if not op.new_path:
315                  errors.append(f"{op.file_path}: MOVE operation missing destination path")
316                  continue
317              src_result = file_ops.read_file_raw(op.file_path)
318              if src_result.error:
319                  errors.append(f"{op.file_path}: source file not found for move")
320              dst_result = file_ops.read_file_raw(op.new_path)
321              if not dst_result.error:
322                  errors.append(
323                      f"{op.new_path}: destination already exists — move would overwrite"
324                  )
325  
326          # ADD: parent directory creation handled by write_file; no pre-check needed.
327  
328      return errors
329  
330  
331  def apply_v4a_operations(operations: List[PatchOperation],
332                            file_ops: Any) -> 'PatchResult':
333      """Apply V4A patch operations using a file operations interface.
334  
335      Uses a two-phase validate-then-apply approach:
336      - Phase 1: validate all operations against current file contents without
337        writing anything. If any validation error is found, return immediately
338        with no filesystem changes.
339      - Phase 2: apply all operations. A failure here (e.g. a race between
340        validation and apply) is reported with a note to run ``git diff``.
341  
342      Args:
343          operations: List of PatchOperation from parse_v4a_patch
344          file_ops: Object with read_file_raw, write_file methods
345  
346      Returns:
347          PatchResult with results of all operations
348      """
349      # Import here to avoid circular imports
350      from tools.file_operations import PatchResult
351  
352      # ---- Phase 1: validate ----
353      validation_errors = _validate_operations(operations, file_ops)
354      if validation_errors:
355          return PatchResult(
356              success=False,
357              error="Patch validation failed (no files were modified):\n"
358                    + "\n".join(f"  • {e}" for e in validation_errors),
359          )
360  
361      # ---- Phase 2: apply ----
362      files_modified = []
363      files_created = []
364      files_deleted = []
365      all_diffs = []
366      errors = []
367  
368      for op in operations:
369          try:
370              if op.operation == OperationType.ADD:
371                  result = _apply_add(op, file_ops)
372                  if result[0]:
373                      files_created.append(op.file_path)
374                      all_diffs.append(result[1])
375                  else:
376                      errors.append(f"Failed to add {op.file_path}: {result[1]}")
377  
378              elif op.operation == OperationType.DELETE:
379                  result = _apply_delete(op, file_ops)
380                  if result[0]:
381                      files_deleted.append(op.file_path)
382                      all_diffs.append(result[1])
383                  else:
384                      errors.append(f"Failed to delete {op.file_path}: {result[1]}")
385  
386              elif op.operation == OperationType.MOVE:
387                  result = _apply_move(op, file_ops)
388                  if result[0]:
389                      files_modified.append(f"{op.file_path} -> {op.new_path}")
390                      all_diffs.append(result[1])
391                  else:
392                      errors.append(f"Failed to move {op.file_path}: {result[1]}")
393  
394              elif op.operation == OperationType.UPDATE:
395                  result = _apply_update(op, file_ops)
396                  if result[0]:
397                      files_modified.append(op.file_path)
398                      all_diffs.append(result[1])
399                  else:
400                      errors.append(f"Failed to update {op.file_path}: {result[1]}")
401  
402          except Exception as e:
403              errors.append(f"Error processing {op.file_path}: {str(e)}")
404  
405      # Run lint on all modified/created files
406      lint_results = {}
407      for f in files_modified + files_created:
408          if hasattr(file_ops, '_check_lint'):
409              lint_result = file_ops._check_lint(f)
410              lint_results[f] = lint_result.to_dict()
411  
412      combined_diff = '\n'.join(all_diffs)
413  
414      if errors:
415          return PatchResult(
416              success=False,
417              diff=combined_diff,
418              files_modified=files_modified,
419              files_created=files_created,
420              files_deleted=files_deleted,
421              lint=lint_results if lint_results else None,
422              error="Apply phase failed (state may be inconsistent — run `git diff` to assess):\n"
423                    + "\n".join(f"  • {e}" for e in errors),
424          )
425  
426      return PatchResult(
427          success=True,
428          diff=combined_diff,
429          files_modified=files_modified,
430          files_created=files_created,
431          files_deleted=files_deleted,
432          lint=lint_results if lint_results else None,
433      )
434  
435  
436  def _apply_add(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
437      """Apply an add file operation."""
438      # Extract content from hunks (all + lines)
439      content_lines = []
440      for hunk in op.hunks:
441          for line in hunk.lines:
442              if line.prefix == '+':
443                  content_lines.append(line.content)
444      
445      content = '\n'.join(content_lines)
446      
447      result = file_ops.write_file(op.file_path, content)
448      if result.error:
449          return False, result.error
450      
451      diff = f"--- /dev/null\n+++ b/{op.file_path}\n"
452      diff += '\n'.join(f"+{line}" for line in content_lines)
453      
454      return True, diff
455  
456  
457  def _apply_delete(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
458      """Apply a delete file operation."""
459      # Read before deleting so we can produce a real unified diff.
460      # Validation already confirmed existence; this guards against races.
461      read_result = file_ops.read_file_raw(op.file_path)
462      if read_result.error:
463          return False, f"Cannot delete {op.file_path}: file not found"
464  
465      result = file_ops.delete_file(op.file_path)
466      if result.error:
467          return False, result.error
468  
469      removed_lines = read_result.content.splitlines(keepends=True)
470      diff = ''.join(difflib.unified_diff(
471          removed_lines, [],
472          fromfile=f"a/{op.file_path}",
473          tofile="/dev/null",
474      ))
475      return True, diff or f"# Deleted: {op.file_path}"
476  
477  
478  def _apply_move(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
479      """Apply a move file operation."""
480      result = file_ops.move_file(op.file_path, op.new_path)
481      if result.error:
482          return False, result.error
483  
484      diff = f"# Moved: {op.file_path} -> {op.new_path}"
485      return True, diff
486  
487  
488  def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
489      """Apply an update file operation."""
490      # Deferred import: breaks the patch_parser ↔ fuzzy_match circular dependency
491      from tools.fuzzy_match import fuzzy_find_and_replace
492  
493      # Read current content — raw so no line-number prefixes or per-line truncation
494      read_result = file_ops.read_file_raw(op.file_path)
495  
496      if read_result.error:
497          return False, f"Cannot read file: {read_result.error}"
498  
499      current_content = read_result.content
500  
501      # Apply each hunk
502      new_content = current_content
503  
504      for hunk in op.hunks:
505          # Build search pattern from context and removed lines
506          search_lines = []
507          replace_lines = []
508  
509          for line in hunk.lines:
510              if line.prefix == ' ':
511                  search_lines.append(line.content)
512                  replace_lines.append(line.content)
513              elif line.prefix == '-':
514                  search_lines.append(line.content)
515              elif line.prefix == '+':
516                  replace_lines.append(line.content)
517  
518          if search_lines:
519              search_pattern = '\n'.join(search_lines)
520              replacement = '\n'.join(replace_lines)
521  
522              new_content, count, _strategy, error = fuzzy_find_and_replace(
523                  new_content, search_pattern, replacement, replace_all=False
524              )
525  
526              if error and count == 0:
527                  # Try with context hint if available
528                  if hunk.context_hint:
529                      # Find the context hint location and search nearby
530                      hint_pos = new_content.find(hunk.context_hint)
531                      if hint_pos != -1:
532                          # Search in a window around the hint
533                          window_start = max(0, hint_pos - 500)
534                          window_end = min(len(new_content), hint_pos + 2000)
535                          window = new_content[window_start:window_end]
536  
537                          window_new, count, _strategy, error = fuzzy_find_and_replace(
538                              window, search_pattern, replacement, replace_all=False
539                          )
540                          
541                          if count > 0:
542                              new_content = new_content[:window_start] + window_new + new_content[window_end:]
543                              error = None
544                  
545                  if error:
546                      err_msg = f"Could not apply hunk: {error}"
547                      try:
548                          from tools.fuzzy_match import format_no_match_hint
549                          err_msg += format_no_match_hint(error, 0, search_pattern, new_content)
550                      except Exception:
551                          pass
552                      return False, err_msg
553          else:
554              # Addition-only hunk (no context or removed lines).
555              # Insert at the location indicated by the context hint, or at end of file.
556              insert_text = '\n'.join(replace_lines)
557              if hunk.context_hint:
558                  occurrences = _count_occurrences(new_content, hunk.context_hint)
559                  if occurrences == 0:
560                      # Hint not found — append at end as a safe fallback
561                      new_content = new_content.rstrip('\n') + '\n' + insert_text + '\n'
562                  elif occurrences > 1:
563                      return False, (
564                          f"Addition-only hunk: context hint '{hunk.context_hint}' is ambiguous "
565                          f"({occurrences} occurrences) — provide a more unique hint"
566                      )
567                  else:
568                      hint_pos = new_content.find(hunk.context_hint)
569                      # Insert after the line containing the context hint
570                      eol = new_content.find('\n', hint_pos)
571                      if eol != -1:
572                          new_content = new_content[:eol + 1] + insert_text + '\n' + new_content[eol + 1:]
573                      else:
574                          new_content = new_content + '\n' + insert_text
575              else:
576                  new_content = new_content.rstrip('\n') + '\n' + insert_text + '\n'
577      
578      # Write new content
579      write_result = file_ops.write_file(op.file_path, new_content)
580      if write_result.error:
581          return False, write_result.error
582      
583      # Generate diff
584      diff_lines = difflib.unified_diff(
585          current_content.splitlines(keepends=True),
586          new_content.splitlines(keepends=True),
587          fromfile=f"a/{op.file_path}",
588          tofile=f"b/{op.file_path}"
589      )
590      diff = ''.join(diff_lines)
591      
592      return True, diff