sensitive_data_scanner.py
1 """ 2 Sensitive Data Scanner for Ag3ntum. 3 4 Detects and redacts sensitive information like API keys, tokens, and passwords 5 using detect-secrets library and custom patterns. 6 7 Key features: 8 - Same-length replacement to preserve formatting 9 - Integration with detect-secrets library 10 - Custom regex patterns for additional detection 11 - Allowlist support for known false positives 12 - Session file scanning with configurable limits 13 14 Usage: 15 from src.security import get_scanner, scan_and_redact 16 17 # Quick usage 18 result = scan_and_redact(text) 19 if result.has_secrets: 20 print(f"Found {len(result.secrets)} secrets") 21 print(result.redacted_text) 22 23 # With scanner instance 24 scanner = get_scanner() 25 result = scanner.scan(text) 26 """ 27 28 from __future__ import annotations 29 30 import hashlib 31 import logging 32 import re 33 from dataclasses import dataclass, field 34 from fnmatch import fnmatch 35 from pathlib import Path 36 from typing import Any, Optional 37 38 from detect_secrets.core.scan import scan_line 39 from detect_secrets.settings import transient_settings 40 41 logger = logging.getLogger(__name__) 42 43 # Global scanner instance (lazy-loaded) 44 _scanner_instance: Optional["SensitiveDataScanner"] = None 45 46 47 @dataclass 48 class DetectedSecret: 49 """Represents a detected secret in text.""" 50 51 secret_type: str 52 secret_value: str 53 line_number: int 54 start_index: int # Index within the line 55 end_index: int # Index within the line 56 replacement: str = "" 57 58 @property 59 def length(self) -> int: 60 """Length of the secret value.""" 61 return len(self.secret_value) 62 63 64 @dataclass 65 class ScanResult: 66 """Result of scanning text for secrets.""" 67 68 original_text: str 69 redacted_text: str 70 secrets: list[DetectedSecret] = field(default_factory=list) 71 secret_types: set[str] = field(default_factory=set) 72 73 @property 74 def has_secrets(self) -> bool: 75 """Whether any secrets were found.""" 76 return len(self.secrets) > 0 77 78 @property 79 def secret_count(self) -> int: 80 """Number of secrets found.""" 81 return len(self.secrets) 82 83 84 class SensitiveDataScanner: 85 """ 86 Detect and redact sensitive information in text. 87 88 Supports: 89 - detect-secrets library plugins 90 - Custom regex patterns 91 - Same-length replacement (preserves formatting) 92 - Allowlist for false positives 93 """ 94 95 # Default custom patterns (used if config not loaded) 96 DEFAULT_CUSTOM_PATTERNS: dict[str, list[str]] = { 97 "generic_api_key": [ 98 r'(?i)(?:api[_-]?key|apikey)["\s:=]+["\']?([a-zA-Z0-9_\-]{20,})["\']?', 99 r'(?i)(?:access[_-]?token|accesstoken)["\s:=]+["\']?([a-zA-Z0-9_\-]{20,})["\']?', 100 r'(?i)(?:auth[_-]?token|authtoken)["\s:=]+["\']?([a-zA-Z0-9_\-]{20,})["\']?', 101 r'(?i)(?:secret[_-]?key|secretkey)["\s:=]+["\']?([a-zA-Z0-9_\-]{20,})["\']?', 102 ], 103 "bearer_token": [ 104 r"(?i)bearer\s+([a-zA-Z0-9_\-\.]{20,})", 105 ], 106 "password": [ 107 r'(?i)(?:password|passwd|pwd)["\s:=]+["\']?([^\s"\']{8,})["\']?', 108 ], 109 "connection_string": [ 110 r"(?i)(?:mongodb|postgres|mysql|redis|amqp):\/\/[^\s]+", 111 r"(?i)Server=[^;]+;.*(?:Password|Pwd)=[^;]+", 112 ], 113 "private_key": [ 114 r"-----BEGIN (?:RSA |EC |DSA |OPENSSH )?PRIVATE KEY-----", 115 ], 116 "anthropic_key": [ 117 r"sk-ant-[a-zA-Z0-9_-]{40,}", 118 ], 119 "openai_key": [ 120 r"sk-[a-zA-Z0-9]{32,}", 121 ], 122 "gcp_key": [ 123 r"AIza[0-9A-Za-z_-]{35}", 124 ], 125 } 126 127 def __init__( 128 self, 129 replacement_format: str = "asterisk", 130 custom_patterns: Optional[dict[str, list[str]]] = None, 131 detect_secrets_plugins: Optional[list[str]] = None, 132 entropy_base64_limit: float = 4.5, 133 entropy_hex_limit: float = 3.0, 134 false_positive_strings: Optional[list[str]] = None, 135 false_positive_patterns: Optional[list[str]] = None, 136 ): 137 """ 138 Initialize the scanner. 139 140 Args: 141 replacement_format: How to replace secrets: 142 - "asterisk": Replace with asterisks (same length) 143 - "redact_typed": Replace with [REDACTED:TYPE] (variable length) 144 - "hash": Replace with hash prefix (same length) 145 custom_patterns: Dict of pattern_name -> list of regex patterns 146 detect_secrets_plugins: List of detect-secrets plugin names to enable 147 entropy_base64_limit: Threshold for Base64 high-entropy detection 148 entropy_hex_limit: Threshold for hex high-entropy detection 149 false_positive_strings: Known false positive strings to skip 150 false_positive_patterns: Regex patterns for false positives to skip 151 """ 152 self.replacement_format = replacement_format 153 self.custom_patterns = custom_patterns or self.DEFAULT_CUSTOM_PATTERNS 154 self.detect_secrets_plugins = detect_secrets_plugins or [] 155 self.entropy_base64_limit = entropy_base64_limit 156 self.entropy_hex_limit = entropy_hex_limit 157 self.false_positive_strings = set(false_positive_strings or []) 158 self.false_positive_patterns = [ 159 re.compile(p) for p in (false_positive_patterns or []) 160 ] 161 162 # Compile custom patterns 163 self._compiled_patterns: dict[str, list[re.Pattern]] = {} 164 for name, patterns in self.custom_patterns.items(): 165 self._compiled_patterns[name] = [re.compile(p) for p in patterns] 166 167 # detect-secrets settings (lazy-loaded) 168 self._detect_secrets_settings: Optional[dict] = None 169 170 def _get_detect_secrets_settings(self) -> dict: 171 """Get detect-secrets settings, building if necessary.""" 172 if self._detect_secrets_settings is not None: 173 return self._detect_secrets_settings 174 175 plugins = [] 176 for plugin_name in self.detect_secrets_plugins: 177 if plugin_name == "Base64HighEntropyString": 178 plugins.append({"name": plugin_name, "limit": self.entropy_base64_limit}) 179 elif plugin_name == "HexHighEntropyString": 180 plugins.append({"name": plugin_name, "limit": self.entropy_hex_limit}) 181 else: 182 plugins.append({"name": plugin_name}) 183 184 self._detect_secrets_settings = { 185 "plugins_used": plugins, 186 "filters_used": [ 187 {"path": "detect_secrets.filters.allowlist.is_line_allowlisted"}, 188 ], 189 } 190 return self._detect_secrets_settings 191 192 def _generate_replacement(self, secret_value: str, secret_type: str) -> str: 193 """ 194 Generate replacement string with SAME LENGTH as original. 195 196 This is critical for preserving formatting in files. 197 """ 198 length = len(secret_value) 199 200 if self.replacement_format == "asterisk": 201 # Same length asterisks 202 return "*" * length 203 204 elif self.replacement_format == "hash": 205 # Hash-based replacement, same length 206 hash_val = hashlib.sha256(secret_value.encode()).hexdigest() 207 # Format: [HASH:xxxxx...] - adjust hash length to match 208 prefix = "[HASH:" 209 suffix = "]" 210 overhead = len(prefix) + len(suffix) 211 if length <= overhead: 212 return "*" * length 213 hash_chars = hash_val[: length - overhead] 214 return f"{prefix}{hash_chars}{suffix}" 215 216 elif self.replacement_format == "redact_typed": 217 # This format does NOT preserve length (use only if formatting doesn't matter) 218 type_label = secret_type.upper().replace(" ", "_") 219 return f"[REDACTED:{type_label}]" 220 221 else: 222 # Default to asterisks 223 return "*" * length 224 225 def _is_false_positive(self, secret_value: str) -> bool: 226 """Check if a detected secret is a known false positive.""" 227 # Check exact strings 228 if secret_value in self.false_positive_strings: 229 return True 230 231 # Check patterns 232 for pattern in self.false_positive_patterns: 233 if pattern.search(secret_value): 234 return True 235 236 return False 237 238 def _detect_with_detect_secrets(self, text: str) -> list[DetectedSecret]: 239 """Use detect-secrets library to find secrets.""" 240 if not self.detect_secrets_plugins: 241 return [] 242 243 detected: list[DetectedSecret] = [] 244 lines = text.split("\n") 245 246 settings = self._get_detect_secrets_settings() 247 248 with transient_settings(settings): 249 for line_num, line in enumerate(lines, 1): 250 for secret in scan_line(line): 251 if secret.secret_value: 252 # Skip false positives 253 if self._is_false_positive(secret.secret_value): 254 continue 255 256 # Find position in line 257 start_idx = line.find(secret.secret_value) 258 if start_idx != -1: 259 detected.append( 260 DetectedSecret( 261 secret_type=secret.type, 262 secret_value=secret.secret_value, 263 line_number=line_num, 264 start_index=start_idx, 265 end_index=start_idx + len(secret.secret_value), 266 ) 267 ) 268 269 return detected 270 271 def _detect_with_custom_patterns(self, text: str) -> list[DetectedSecret]: 272 """Use custom regex patterns to find secrets.""" 273 detected: list[DetectedSecret] = [] 274 lines = text.split("\n") 275 276 for secret_type, patterns in self._compiled_patterns.items(): 277 for pattern in patterns: 278 for line_num, line in enumerate(lines, 1): 279 for match in pattern.finditer(line): 280 # Get the captured group if exists, otherwise full match 281 if match.groups(): 282 secret_value = match.group(1) 283 start_idx = match.start(1) 284 end_idx = match.end(1) 285 else: 286 secret_value = match.group(0) 287 start_idx = match.start() 288 end_idx = match.end() 289 290 # Skip false positives 291 if self._is_false_positive(secret_value): 292 continue 293 294 # Skip very short matches (likely false positives) 295 if len(secret_value) < 8: 296 continue 297 298 detected.append( 299 DetectedSecret( 300 secret_type=secret_type, 301 secret_value=secret_value, 302 line_number=line_num, 303 start_index=start_idx, 304 end_index=end_idx, 305 ) 306 ) 307 308 return detected 309 310 def _deduplicate_secrets( 311 self, secrets: list[DetectedSecret] 312 ) -> list[DetectedSecret]: 313 """Remove duplicate detections (same value on same line).""" 314 seen: set[tuple[str, int]] = set() 315 unique: list[DetectedSecret] = [] 316 317 for secret in secrets: 318 key = (secret.secret_value, secret.line_number) 319 if key not in seen: 320 seen.add(key) 321 unique.append(secret) 322 323 return unique 324 325 def scan(self, text: str) -> ScanResult: 326 """ 327 Scan text for secrets and generate redacted version. 328 329 Args: 330 text: Input text to scan 331 332 Returns: 333 ScanResult with original text, redacted text, and list of secrets 334 """ 335 if not text: 336 return ScanResult(original_text=text, redacted_text=text) 337 338 all_secrets: list[DetectedSecret] = [] 339 340 # Detect with detect-secrets library 341 all_secrets.extend(self._detect_with_detect_secrets(text)) 342 343 # Detect with custom patterns 344 all_secrets.extend(self._detect_with_custom_patterns(text)) 345 346 # Deduplicate 347 all_secrets = self._deduplicate_secrets(all_secrets) 348 349 # Generate replacements 350 for secret in all_secrets: 351 secret.replacement = self._generate_replacement( 352 secret.secret_value, secret.secret_type 353 ) 354 355 # Sort by length (longest first) to avoid partial replacement issues 356 all_secrets.sort(key=lambda s: len(s.secret_value), reverse=True) 357 358 # Replace secrets in text 359 redacted_text = text 360 for secret in all_secrets: 361 redacted_text = redacted_text.replace( 362 secret.secret_value, secret.replacement 363 ) 364 365 # Collect unique types 366 secret_types = {s.secret_type for s in all_secrets} 367 368 return ScanResult( 369 original_text=text, 370 redacted_text=redacted_text, 371 secrets=all_secrets, 372 secret_types=secret_types, 373 ) 374 375 def scan_file( 376 self, 377 filepath: Path | str, 378 write_redacted: bool = False, 379 ) -> ScanResult: 380 """ 381 Scan a file for secrets. 382 383 Args: 384 filepath: Path to the file 385 write_redacted: If True, overwrite file with redacted content 386 387 Returns: 388 ScanResult with scan details 389 """ 390 filepath = Path(filepath) 391 392 try: 393 content = filepath.read_text(encoding="utf-8", errors="replace") 394 except Exception as e: 395 logger.warning(f"Failed to read file {filepath}: {e}") 396 return ScanResult(original_text="", redacted_text="") 397 398 result = self.scan(content) 399 400 if write_redacted and result.has_secrets: 401 try: 402 filepath.write_text(result.redacted_text, encoding="utf-8") 403 logger.info( 404 f"Redacted {result.secret_count} secrets in {filepath}" 405 ) 406 except Exception as e: 407 logger.error(f"Failed to write redacted file {filepath}: {e}") 408 409 return result 410 411 412 def get_scanner() -> SensitiveDataScanner: 413 """ 414 Get the global scanner instance (lazy-loaded with config). 415 416 Returns: 417 Configured SensitiveDataScanner instance 418 """ 419 global _scanner_instance 420 421 if _scanner_instance is None: 422 # Try to load config 423 try: 424 from .scanner_config import get_scanner_config 425 426 config = get_scanner_config() 427 _scanner_instance = SensitiveDataScanner( 428 replacement_format=config.replacement_format, 429 custom_patterns=config.custom_patterns, 430 detect_secrets_plugins=config.detect_secrets_plugins, 431 entropy_base64_limit=config.entropy_base64_limit, 432 entropy_hex_limit=config.entropy_hex_limit, 433 false_positive_strings=config.false_positive_strings, 434 false_positive_patterns=config.false_positive_patterns, 435 ) 436 except Exception as e: 437 logger.warning(f"Failed to load scanner config: {e}. Using defaults.") 438 _scanner_instance = SensitiveDataScanner() 439 440 return _scanner_instance 441 442 443 def reset_scanner() -> None: 444 """Reset the global scanner instance (for testing or config reload).""" 445 global _scanner_instance 446 _scanner_instance = None 447 448 449 def scan_text(text: str) -> ScanResult: 450 """ 451 Quick function to scan text for secrets. 452 453 Args: 454 text: Input text 455 456 Returns: 457 ScanResult with scan details 458 """ 459 return get_scanner().scan(text) 460 461 462 def scan_and_redact(text: str) -> ScanResult: 463 """ 464 Scan text and return redacted version. 465 466 This is the primary function to use for filtering content. 467 468 Args: 469 text: Input text 470 471 Returns: 472 ScanResult containing redacted_text and details about found secrets 473 """ 474 return get_scanner().scan(text)