/ src / preprocessing / rapidfuzz.py
rapidfuzz.py
  1  from __future__ import annotations
  2  
  3  """RapidFuzz-based preprocessor implementation."""
  4  
  5  import logging
  6  import re
  7  from collections import defaultdict
  8  from typing import Any
  9  
 10  from rapidfuzz import fuzz
 11  
 12  from .protocol import Preprocessor
 13  
 14  
 15  class RapidFuzzPreprocessor:
 16      """Preprocessor using RapidFuzz for fuzzy matching to remove repeated content.
 17  
 18      This is a concrete implementation of the Preprocessor protocol using
 19      RapidFuzz to detect and remove similar lines that appear multiple times,
 20      such as headers, footers, and page numbers.
 21      """
 22  
 23      def __init__(
 24          self,
 25          min_repetitions: int = 3,
 26          similarity_threshold: float = 0.85,
 27      ):
 28          """Initialize the preprocessor.
 29  
 30          Parameters
 31          ----------
 32          min_repetitions
 33              Minimum number of times a line must appear (or be similar) to be
 34              considered repeated. Default is 3. This prevents removing legitimate
 35              content that appears a few times.
 36          similarity_threshold
 37              Minimum similarity ratio (0.0 to 1.0) for considering lines as similar.
 38              Uses rapidfuzz for fuzzy matching. Default is 0.85 (85% similarity).
 39          """
 40          self.logger = logging.getLogger(__name__)
 41          self.min_repetitions = min_repetitions
 42          self.similarity_threshold = similarity_threshold
 43  
 44      def preprocess(self, text: str) -> str:
 45          """Preprocess text to remove repeated content.
 46  
 47          Parameters
 48          ----------
 49          text
 50              The text to preprocess.
 51  
 52          Returns
 53          -------
 54          Preprocessed text with repeated content removed.
 55          """
 56          if not text or not text.strip():
 57              return text
 58  
 59          lines = text.split("\n")
 60          if len(lines) < self.min_repetitions:
 61              return text
 62  
 63          lines_to_remove = self._detect_repeated_lines(lines)
 64  
 65          removed_count = len(lines_to_remove)
 66          if removed_count > 0:
 67              self.logger.info(
 68                  f"Removed {removed_count} lines of repeated content "
 69                  f"({removed_count / len(lines) * 100:.1f}% of total lines)"
 70              )
 71              for line_idx in sorted(lines_to_remove):
 72                  line_content = lines[line_idx].strip()[:100]  # Limit to 100 chars for readability
 73                  self.logger.info(f"Deleted line {line_idx + 1}: {line_content}")
 74  
 75          cleaned_lines = [
 76              line for i, line in enumerate(lines) if i not in lines_to_remove
 77          ]
 78  
 79          return "\n".join(cleaned_lines)
 80  
 81      def _detect_repeated_lines(self, lines: list[str]) -> set[int]:
 82          """Detect lines that appear multiple times using fuzzy matching.
 83  
 84          Uses a two-phase optimization:
 85          1. Pre-filter by line characteristics (length, prefix) to group candidates
 86          2. Fuzzy matching only within candidate groups
 87  
 88          This reduces complexity from O(n²) to O(n*k) where k is typically much
 89          smaller than n, even in worst-case scenarios.
 90  
 91          Parameters
 92          ----------
 93          lines
 94              List of text lines.
 95  
 96          Returns
 97          -------
 98          Set of line indices to remove.
 99          """
100          normalized_lines = [
101              self._normalize_line(line) if line.strip() else None
102              for line in lines
103          ]
104  
105          non_empty_count = sum(1 for n in normalized_lines if n is not None)
106          if non_empty_count < self.min_repetitions:
107              return set()
108  
109          candidate_groups = self._group_by_characteristics(normalized_lines)
110          indices_to_remove = set()
111  
112          numeric_lines_indices = self._group_numeric_lines(normalized_lines)
113          if len(numeric_lines_indices) >= self.min_repetitions:
114              indices_to_remove.update(numeric_lines_indices)
115  
116          for candidate_group_indices in candidate_groups.values():
117              if len(candidate_group_indices) < self.min_repetitions:
118                  continue
119  
120              clusters: list[tuple[str, list[int]]] = []
121              for i in candidate_group_indices:
122                  if i in indices_to_remove:
123                      continue
124  
125                  line = normalized_lines[i]
126                  matched_cluster_idx = None
127                  for cluster_idx, cluster in enumerate(clusters):
128                      representative = cluster[0]
129                      similarity = fuzz.ratio(
130                          line, representative,
131                          score_cutoff=int(self.similarity_threshold * 100)
132                      )
133                      if similarity >= self.similarity_threshold * 100:
134                          matched_cluster_idx = cluster_idx
135                          break
136  
137                  if matched_cluster_idx is not None:
138                      clusters[matched_cluster_idx][1].append(i)
139                  else:
140                      clusters.append((line, [i]))
141  
142              for cluster in clusters:
143                  indices = cluster[1]
144                  if len(indices) >= self.min_repetitions:
145                      indices_to_remove.update(indices)
146  
147          return indices_to_remove
148  
149      def _group_numeric_lines(self, normalized_lines: list[str | None]) -> set[int]:
150          """Group lines that contain only numbers.
151  
152          This handles cases like page numbers "1", "2", "3" that should be
153          removed as repetitive content even though they have low similarity.
154  
155          Parameters
156          ----------
157          normalized_lines
158              List of normalized lines (may contain None).
159  
160          Returns
161          -------
162          Set of line indices that are numeric-only and should be removed.
163          """
164          numeric_indices = []
165          for i, line in enumerate(normalized_lines):
166              if line is None:
167                  continue
168              if re.match(r"^\d+$", line.strip()):
169                  numeric_indices.append(i)
170  
171          if len(numeric_indices) >= self.min_repetitions:
172              return set(numeric_indices)
173          return set()
174  
175      def _group_by_characteristics(self, normalized_lines: list[str | None]) -> dict[tuple, list[int]]:
176          """Group lines by characteristics to reduce fuzzy matching comparisons.
177  
178          Groups lines by (length_category, prefix) where:
179          - length_category: rounded length to reduce groups
180          - prefix: first few characters for quick filtering
181  
182          Only stores indices to minimize memory usage. Lines are accessed from
183          normalized_lines when needed.
184  
185          Parameters
186          ----------
187          normalized_lines
188              List of normalized lines (may contain None).
189  
190          Returns
191          -------
192          Dictionary mapping (length_category, prefix) to list of line indices.
193          """
194          groups = defaultdict(list)
195          prefix_len = 10
196  
197          for i, line in enumerate(normalized_lines):
198              if line is None:
199                  continue
200  
201              length = len(line)
202              length_category = length // 20
203              prefix = line[:prefix_len] if length >= prefix_len else line
204  
205              groups[(length_category, prefix)].append(i)
206  
207          return groups
208  
209      @staticmethod
210      def _normalize_line(line: str) -> str:
211          """Normalize a line for comparison.
212  
213          Parameters
214          ----------
215          line
216              The line to normalize.
217  
218          Returns
219          -------
220          Normalized line string.
221          """
222          normalized = line.strip()
223          normalized = re.sub(r"\s+", " ", normalized)
224          return normalized
225  
226  
227  def create_rapidfuzz_preprocessor(config: dict[str, Any]) -> Preprocessor:
228      """Create a RapidFuzz preprocessor from configuration.
229  
230      Parameters
231      ----------
232      config
233          Configuration dictionary. Expected keys:
234          - min_repetitions (int, optional): Minimum repetitions. Default: 3
235          - similarity_threshold (float, optional): Similarity threshold. Default: 0.85
236  
237      Returns
238      -------
239      Preprocessor instance.
240      """
241      min_repetitions = config.get("min_repetitions", 3)
242      similarity_threshold = config.get("similarity_threshold", 0.85)
243  
244      return RapidFuzzPreprocessor(
245          min_repetitions=min_repetitions,
246          similarity_threshold=similarity_threshold,
247      )
248