rapidfuzz.py
1 from __future__ import annotations 2 3 """RapidFuzz-based preprocessor implementation.""" 4 5 import logging 6 import re 7 from collections import defaultdict 8 from typing import Any 9 10 from rapidfuzz import fuzz 11 12 from .protocol import Preprocessor 13 14 15 class RapidFuzzPreprocessor: 16 """Preprocessor using RapidFuzz for fuzzy matching to remove repeated content. 17 18 This is a concrete implementation of the Preprocessor protocol using 19 RapidFuzz to detect and remove similar lines that appear multiple times, 20 such as headers, footers, and page numbers. 21 """ 22 23 def __init__( 24 self, 25 min_repetitions: int = 3, 26 similarity_threshold: float = 0.85, 27 ): 28 """Initialize the preprocessor. 29 30 Parameters 31 ---------- 32 min_repetitions 33 Minimum number of times a line must appear (or be similar) to be 34 considered repeated. Default is 3. This prevents removing legitimate 35 content that appears a few times. 36 similarity_threshold 37 Minimum similarity ratio (0.0 to 1.0) for considering lines as similar. 38 Uses rapidfuzz for fuzzy matching. Default is 0.85 (85% similarity). 39 """ 40 self.logger = logging.getLogger(__name__) 41 self.min_repetitions = min_repetitions 42 self.similarity_threshold = similarity_threshold 43 44 def preprocess(self, text: str) -> str: 45 """Preprocess text to remove repeated content. 46 47 Parameters 48 ---------- 49 text 50 The text to preprocess. 51 52 Returns 53 ------- 54 Preprocessed text with repeated content removed. 55 """ 56 if not text or not text.strip(): 57 return text 58 59 lines = text.split("\n") 60 if len(lines) < self.min_repetitions: 61 return text 62 63 lines_to_remove = self._detect_repeated_lines(lines) 64 65 removed_count = len(lines_to_remove) 66 if removed_count > 0: 67 self.logger.info( 68 f"Removed {removed_count} lines of repeated content " 69 f"({removed_count / len(lines) * 100:.1f}% of total lines)" 70 ) 71 for line_idx in sorted(lines_to_remove): 72 line_content = lines[line_idx].strip()[:100] # Limit to 100 chars for readability 73 self.logger.info(f"Deleted line {line_idx + 1}: {line_content}") 74 75 cleaned_lines = [ 76 line for i, line in enumerate(lines) if i not in lines_to_remove 77 ] 78 79 return "\n".join(cleaned_lines) 80 81 def _detect_repeated_lines(self, lines: list[str]) -> set[int]: 82 """Detect lines that appear multiple times using fuzzy matching. 83 84 Uses a two-phase optimization: 85 1. Pre-filter by line characteristics (length, prefix) to group candidates 86 2. Fuzzy matching only within candidate groups 87 88 This reduces complexity from O(n²) to O(n*k) where k is typically much 89 smaller than n, even in worst-case scenarios. 90 91 Parameters 92 ---------- 93 lines 94 List of text lines. 95 96 Returns 97 ------- 98 Set of line indices to remove. 99 """ 100 normalized_lines = [ 101 self._normalize_line(line) if line.strip() else None 102 for line in lines 103 ] 104 105 non_empty_count = sum(1 for n in normalized_lines if n is not None) 106 if non_empty_count < self.min_repetitions: 107 return set() 108 109 candidate_groups = self._group_by_characteristics(normalized_lines) 110 indices_to_remove = set() 111 112 numeric_lines_indices = self._group_numeric_lines(normalized_lines) 113 if len(numeric_lines_indices) >= self.min_repetitions: 114 indices_to_remove.update(numeric_lines_indices) 115 116 for candidate_group_indices in candidate_groups.values(): 117 if len(candidate_group_indices) < self.min_repetitions: 118 continue 119 120 clusters: list[tuple[str, list[int]]] = [] 121 for i in candidate_group_indices: 122 if i in indices_to_remove: 123 continue 124 125 line = normalized_lines[i] 126 matched_cluster_idx = None 127 for cluster_idx, cluster in enumerate(clusters): 128 representative = cluster[0] 129 similarity = fuzz.ratio( 130 line, representative, 131 score_cutoff=int(self.similarity_threshold * 100) 132 ) 133 if similarity >= self.similarity_threshold * 100: 134 matched_cluster_idx = cluster_idx 135 break 136 137 if matched_cluster_idx is not None: 138 clusters[matched_cluster_idx][1].append(i) 139 else: 140 clusters.append((line, [i])) 141 142 for cluster in clusters: 143 indices = cluster[1] 144 if len(indices) >= self.min_repetitions: 145 indices_to_remove.update(indices) 146 147 return indices_to_remove 148 149 def _group_numeric_lines(self, normalized_lines: list[str | None]) -> set[int]: 150 """Group lines that contain only numbers. 151 152 This handles cases like page numbers "1", "2", "3" that should be 153 removed as repetitive content even though they have low similarity. 154 155 Parameters 156 ---------- 157 normalized_lines 158 List of normalized lines (may contain None). 159 160 Returns 161 ------- 162 Set of line indices that are numeric-only and should be removed. 163 """ 164 numeric_indices = [] 165 for i, line in enumerate(normalized_lines): 166 if line is None: 167 continue 168 if re.match(r"^\d+$", line.strip()): 169 numeric_indices.append(i) 170 171 if len(numeric_indices) >= self.min_repetitions: 172 return set(numeric_indices) 173 return set() 174 175 def _group_by_characteristics(self, normalized_lines: list[str | None]) -> dict[tuple, list[int]]: 176 """Group lines by characteristics to reduce fuzzy matching comparisons. 177 178 Groups lines by (length_category, prefix) where: 179 - length_category: rounded length to reduce groups 180 - prefix: first few characters for quick filtering 181 182 Only stores indices to minimize memory usage. Lines are accessed from 183 normalized_lines when needed. 184 185 Parameters 186 ---------- 187 normalized_lines 188 List of normalized lines (may contain None). 189 190 Returns 191 ------- 192 Dictionary mapping (length_category, prefix) to list of line indices. 193 """ 194 groups = defaultdict(list) 195 prefix_len = 10 196 197 for i, line in enumerate(normalized_lines): 198 if line is None: 199 continue 200 201 length = len(line) 202 length_category = length // 20 203 prefix = line[:prefix_len] if length >= prefix_len else line 204 205 groups[(length_category, prefix)].append(i) 206 207 return groups 208 209 @staticmethod 210 def _normalize_line(line: str) -> str: 211 """Normalize a line for comparison. 212 213 Parameters 214 ---------- 215 line 216 The line to normalize. 217 218 Returns 219 ------- 220 Normalized line string. 221 """ 222 normalized = line.strip() 223 normalized = re.sub(r"\s+", " ", normalized) 224 return normalized 225 226 227 def create_rapidfuzz_preprocessor(config: dict[str, Any]) -> Preprocessor: 228 """Create a RapidFuzz preprocessor from configuration. 229 230 Parameters 231 ---------- 232 config 233 Configuration dictionary. Expected keys: 234 - min_repetitions (int, optional): Minimum repetitions. Default: 3 235 - similarity_threshold (float, optional): Similarity threshold. Default: 0.85 236 237 Returns 238 ------- 239 Preprocessor instance. 240 """ 241 min_repetitions = config.get("min_repetitions", 3) 242 similarity_threshold = config.get("similarity_threshold", 0.85) 243 244 return RapidFuzzPreprocessor( 245 min_repetitions=min_repetitions, 246 similarity_threshold=similarity_threshold, 247 ) 248