/ src / security / sensitive_data_scanner.py
sensitive_data_scanner.py
  1  """
  2  Sensitive Data Scanner for Ag3ntum.
  3  
  4  Detects and redacts sensitive information like API keys, tokens, and passwords
  5  using detect-secrets library and custom patterns.
  6  
  7  Key features:
  8  - Same-length replacement to preserve formatting
  9  - Integration with detect-secrets library
 10  - Custom regex patterns for additional detection
 11  - Allowlist support for known false positives
 12  - Session file scanning with configurable limits
 13  
 14  Usage:
 15      from src.security import get_scanner, scan_and_redact
 16  
 17      # Quick usage
 18      result = scan_and_redact(text)
 19      if result.has_secrets:
 20          print(f"Found {len(result.secrets)} secrets")
 21          print(result.redacted_text)
 22  
 23      # With scanner instance
 24      scanner = get_scanner()
 25      result = scanner.scan(text)
 26  """
 27  
 28  from __future__ import annotations
 29  
 30  import hashlib
 31  import logging
 32  import re
 33  from dataclasses import dataclass, field
 34  from fnmatch import fnmatch
 35  from pathlib import Path
 36  from typing import Any, Optional
 37  
 38  from detect_secrets.core.scan import scan_line
 39  from detect_secrets.settings import transient_settings
 40  
 41  logger = logging.getLogger(__name__)
 42  
 43  # Global scanner instance (lazy-loaded)
 44  _scanner_instance: Optional["SensitiveDataScanner"] = None
 45  
 46  
 47  @dataclass
 48  class DetectedSecret:
 49      """Represents a detected secret in text."""
 50  
 51      secret_type: str
 52      secret_value: str
 53      line_number: int
 54      start_index: int  # Index within the line
 55      end_index: int  # Index within the line
 56      replacement: str = ""
 57  
 58      @property
 59      def length(self) -> int:
 60          """Length of the secret value."""
 61          return len(self.secret_value)
 62  
 63  
 64  @dataclass
 65  class ScanResult:
 66      """Result of scanning text for secrets."""
 67  
 68      original_text: str
 69      redacted_text: str
 70      secrets: list[DetectedSecret] = field(default_factory=list)
 71      secret_types: set[str] = field(default_factory=set)
 72  
 73      @property
 74      def has_secrets(self) -> bool:
 75          """Whether any secrets were found."""
 76          return len(self.secrets) > 0
 77  
 78      @property
 79      def secret_count(self) -> int:
 80          """Number of secrets found."""
 81          return len(self.secrets)
 82  
 83  
 84  class SensitiveDataScanner:
 85      """
 86      Detect and redact sensitive information in text.
 87  
 88      Supports:
 89      - detect-secrets library plugins
 90      - Custom regex patterns
 91      - Same-length replacement (preserves formatting)
 92      - Allowlist for false positives
 93      """
 94  
 95      # Default custom patterns (used if config not loaded)
 96      DEFAULT_CUSTOM_PATTERNS: dict[str, list[str]] = {
 97          "generic_api_key": [
 98              r'(?i)(?:api[_-]?key|apikey)["\s:=]+["\']?([a-zA-Z0-9_\-]{20,})["\']?',
 99              r'(?i)(?:access[_-]?token|accesstoken)["\s:=]+["\']?([a-zA-Z0-9_\-]{20,})["\']?',
100              r'(?i)(?:auth[_-]?token|authtoken)["\s:=]+["\']?([a-zA-Z0-9_\-]{20,})["\']?',
101              r'(?i)(?:secret[_-]?key|secretkey)["\s:=]+["\']?([a-zA-Z0-9_\-]{20,})["\']?',
102          ],
103          "bearer_token": [
104              r"(?i)bearer\s+([a-zA-Z0-9_\-\.]{20,})",
105          ],
106          "password": [
107              r'(?i)(?:password|passwd|pwd)["\s:=]+["\']?([^\s"\']{8,})["\']?',
108          ],
109          "connection_string": [
110              r"(?i)(?:mongodb|postgres|mysql|redis|amqp):\/\/[^\s]+",
111              r"(?i)Server=[^;]+;.*(?:Password|Pwd)=[^;]+",
112          ],
113          "private_key": [
114              r"-----BEGIN (?:RSA |EC |DSA |OPENSSH )?PRIVATE KEY-----",
115          ],
116          "anthropic_key": [
117              r"sk-ant-[a-zA-Z0-9_-]{40,}",
118          ],
119          "openai_key": [
120              r"sk-[a-zA-Z0-9]{32,}",
121          ],
122          "gcp_key": [
123              r"AIza[0-9A-Za-z_-]{35}",
124          ],
125      }
126  
127      def __init__(
128          self,
129          replacement_format: str = "asterisk",
130          custom_patterns: Optional[dict[str, list[str]]] = None,
131          detect_secrets_plugins: Optional[list[str]] = None,
132          entropy_base64_limit: float = 4.5,
133          entropy_hex_limit: float = 3.0,
134          false_positive_strings: Optional[list[str]] = None,
135          false_positive_patterns: Optional[list[str]] = None,
136      ):
137          """
138          Initialize the scanner.
139  
140          Args:
141              replacement_format: How to replace secrets:
142                  - "asterisk": Replace with asterisks (same length)
143                  - "redact_typed": Replace with [REDACTED:TYPE] (variable length)
144                  - "hash": Replace with hash prefix (same length)
145              custom_patterns: Dict of pattern_name -> list of regex patterns
146              detect_secrets_plugins: List of detect-secrets plugin names to enable
147              entropy_base64_limit: Threshold for Base64 high-entropy detection
148              entropy_hex_limit: Threshold for hex high-entropy detection
149              false_positive_strings: Known false positive strings to skip
150              false_positive_patterns: Regex patterns for false positives to skip
151          """
152          self.replacement_format = replacement_format
153          self.custom_patterns = custom_patterns or self.DEFAULT_CUSTOM_PATTERNS
154          self.detect_secrets_plugins = detect_secrets_plugins or []
155          self.entropy_base64_limit = entropy_base64_limit
156          self.entropy_hex_limit = entropy_hex_limit
157          self.false_positive_strings = set(false_positive_strings or [])
158          self.false_positive_patterns = [
159              re.compile(p) for p in (false_positive_patterns or [])
160          ]
161  
162          # Compile custom patterns
163          self._compiled_patterns: dict[str, list[re.Pattern]] = {}
164          for name, patterns in self.custom_patterns.items():
165              self._compiled_patterns[name] = [re.compile(p) for p in patterns]
166  
167          # detect-secrets settings (lazy-loaded)
168          self._detect_secrets_settings: Optional[dict] = None
169  
170      def _get_detect_secrets_settings(self) -> dict:
171          """Get detect-secrets settings, building if necessary."""
172          if self._detect_secrets_settings is not None:
173              return self._detect_secrets_settings
174  
175          plugins = []
176          for plugin_name in self.detect_secrets_plugins:
177              if plugin_name == "Base64HighEntropyString":
178                  plugins.append({"name": plugin_name, "limit": self.entropy_base64_limit})
179              elif plugin_name == "HexHighEntropyString":
180                  plugins.append({"name": plugin_name, "limit": self.entropy_hex_limit})
181              else:
182                  plugins.append({"name": plugin_name})
183  
184          self._detect_secrets_settings = {
185              "plugins_used": plugins,
186              "filters_used": [
187                  {"path": "detect_secrets.filters.allowlist.is_line_allowlisted"},
188              ],
189          }
190          return self._detect_secrets_settings
191  
192      def _generate_replacement(self, secret_value: str, secret_type: str) -> str:
193          """
194          Generate replacement string with SAME LENGTH as original.
195  
196          This is critical for preserving formatting in files.
197          """
198          length = len(secret_value)
199  
200          if self.replacement_format == "asterisk":
201              # Same length asterisks
202              return "*" * length
203  
204          elif self.replacement_format == "hash":
205              # Hash-based replacement, same length
206              hash_val = hashlib.sha256(secret_value.encode()).hexdigest()
207              # Format: [HASH:xxxxx...] - adjust hash length to match
208              prefix = "[HASH:"
209              suffix = "]"
210              overhead = len(prefix) + len(suffix)
211              if length <= overhead:
212                  return "*" * length
213              hash_chars = hash_val[: length - overhead]
214              return f"{prefix}{hash_chars}{suffix}"
215  
216          elif self.replacement_format == "redact_typed":
217              # This format does NOT preserve length (use only if formatting doesn't matter)
218              type_label = secret_type.upper().replace(" ", "_")
219              return f"[REDACTED:{type_label}]"
220  
221          else:
222              # Default to asterisks
223              return "*" * length
224  
225      def _is_false_positive(self, secret_value: str) -> bool:
226          """Check if a detected secret is a known false positive."""
227          # Check exact strings
228          if secret_value in self.false_positive_strings:
229              return True
230  
231          # Check patterns
232          for pattern in self.false_positive_patterns:
233              if pattern.search(secret_value):
234                  return True
235  
236          return False
237  
238      def _detect_with_detect_secrets(self, text: str) -> list[DetectedSecret]:
239          """Use detect-secrets library to find secrets."""
240          if not self.detect_secrets_plugins:
241              return []
242  
243          detected: list[DetectedSecret] = []
244          lines = text.split("\n")
245  
246          settings = self._get_detect_secrets_settings()
247  
248          with transient_settings(settings):
249              for line_num, line in enumerate(lines, 1):
250                  for secret in scan_line(line):
251                      if secret.secret_value:
252                          # Skip false positives
253                          if self._is_false_positive(secret.secret_value):
254                              continue
255  
256                          # Find position in line
257                          start_idx = line.find(secret.secret_value)
258                          if start_idx != -1:
259                              detected.append(
260                                  DetectedSecret(
261                                      secret_type=secret.type,
262                                      secret_value=secret.secret_value,
263                                      line_number=line_num,
264                                      start_index=start_idx,
265                                      end_index=start_idx + len(secret.secret_value),
266                                  )
267                              )
268  
269          return detected
270  
271      def _detect_with_custom_patterns(self, text: str) -> list[DetectedSecret]:
272          """Use custom regex patterns to find secrets."""
273          detected: list[DetectedSecret] = []
274          lines = text.split("\n")
275  
276          for secret_type, patterns in self._compiled_patterns.items():
277              for pattern in patterns:
278                  for line_num, line in enumerate(lines, 1):
279                      for match in pattern.finditer(line):
280                          # Get the captured group if exists, otherwise full match
281                          if match.groups():
282                              secret_value = match.group(1)
283                              start_idx = match.start(1)
284                              end_idx = match.end(1)
285                          else:
286                              secret_value = match.group(0)
287                              start_idx = match.start()
288                              end_idx = match.end()
289  
290                          # Skip false positives
291                          if self._is_false_positive(secret_value):
292                              continue
293  
294                          # Skip very short matches (likely false positives)
295                          if len(secret_value) < 8:
296                              continue
297  
298                          detected.append(
299                              DetectedSecret(
300                                  secret_type=secret_type,
301                                  secret_value=secret_value,
302                                  line_number=line_num,
303                                  start_index=start_idx,
304                                  end_index=end_idx,
305                              )
306                          )
307  
308          return detected
309  
310      def _deduplicate_secrets(
311          self, secrets: list[DetectedSecret]
312      ) -> list[DetectedSecret]:
313          """Remove duplicate detections (same value on same line)."""
314          seen: set[tuple[str, int]] = set()
315          unique: list[DetectedSecret] = []
316  
317          for secret in secrets:
318              key = (secret.secret_value, secret.line_number)
319              if key not in seen:
320                  seen.add(key)
321                  unique.append(secret)
322  
323          return unique
324  
325      def scan(self, text: str) -> ScanResult:
326          """
327          Scan text for secrets and generate redacted version.
328  
329          Args:
330              text: Input text to scan
331  
332          Returns:
333              ScanResult with original text, redacted text, and list of secrets
334          """
335          if not text:
336              return ScanResult(original_text=text, redacted_text=text)
337  
338          all_secrets: list[DetectedSecret] = []
339  
340          # Detect with detect-secrets library
341          all_secrets.extend(self._detect_with_detect_secrets(text))
342  
343          # Detect with custom patterns
344          all_secrets.extend(self._detect_with_custom_patterns(text))
345  
346          # Deduplicate
347          all_secrets = self._deduplicate_secrets(all_secrets)
348  
349          # Generate replacements
350          for secret in all_secrets:
351              secret.replacement = self._generate_replacement(
352                  secret.secret_value, secret.secret_type
353              )
354  
355          # Sort by length (longest first) to avoid partial replacement issues
356          all_secrets.sort(key=lambda s: len(s.secret_value), reverse=True)
357  
358          # Replace secrets in text
359          redacted_text = text
360          for secret in all_secrets:
361              redacted_text = redacted_text.replace(
362                  secret.secret_value, secret.replacement
363              )
364  
365          # Collect unique types
366          secret_types = {s.secret_type for s in all_secrets}
367  
368          return ScanResult(
369              original_text=text,
370              redacted_text=redacted_text,
371              secrets=all_secrets,
372              secret_types=secret_types,
373          )
374  
375      def scan_file(
376          self,
377          filepath: Path | str,
378          write_redacted: bool = False,
379      ) -> ScanResult:
380          """
381          Scan a file for secrets.
382  
383          Args:
384              filepath: Path to the file
385              write_redacted: If True, overwrite file with redacted content
386  
387          Returns:
388              ScanResult with scan details
389          """
390          filepath = Path(filepath)
391  
392          try:
393              content = filepath.read_text(encoding="utf-8", errors="replace")
394          except Exception as e:
395              logger.warning(f"Failed to read file {filepath}: {e}")
396              return ScanResult(original_text="", redacted_text="")
397  
398          result = self.scan(content)
399  
400          if write_redacted and result.has_secrets:
401              try:
402                  filepath.write_text(result.redacted_text, encoding="utf-8")
403                  logger.info(
404                      f"Redacted {result.secret_count} secrets in {filepath}"
405                  )
406              except Exception as e:
407                  logger.error(f"Failed to write redacted file {filepath}: {e}")
408  
409          return result
410  
411  
412  def get_scanner() -> SensitiveDataScanner:
413      """
414      Get the global scanner instance (lazy-loaded with config).
415  
416      Returns:
417          Configured SensitiveDataScanner instance
418      """
419      global _scanner_instance
420  
421      if _scanner_instance is None:
422          # Try to load config
423          try:
424              from .scanner_config import get_scanner_config
425  
426              config = get_scanner_config()
427              _scanner_instance = SensitiveDataScanner(
428                  replacement_format=config.replacement_format,
429                  custom_patterns=config.custom_patterns,
430                  detect_secrets_plugins=config.detect_secrets_plugins,
431                  entropy_base64_limit=config.entropy_base64_limit,
432                  entropy_hex_limit=config.entropy_hex_limit,
433                  false_positive_strings=config.false_positive_strings,
434                  false_positive_patterns=config.false_positive_patterns,
435              )
436          except Exception as e:
437              logger.warning(f"Failed to load scanner config: {e}. Using defaults.")
438              _scanner_instance = SensitiveDataScanner()
439  
440      return _scanner_instance
441  
442  
443  def reset_scanner() -> None:
444      """Reset the global scanner instance (for testing or config reload)."""
445      global _scanner_instance
446      _scanner_instance = None
447  
448  
449  def scan_text(text: str) -> ScanResult:
450      """
451      Quick function to scan text for secrets.
452  
453      Args:
454          text: Input text
455  
456      Returns:
457          ScanResult with scan details
458      """
459      return get_scanner().scan(text)
460  
461  
462  def scan_and_redact(text: str) -> ScanResult:
463      """
464      Scan text and return redacted version.
465  
466      This is the primary function to use for filtering content.
467  
468      Args:
469          text: Input text
470  
471      Returns:
472          ScanResult containing redacted_text and details about found secrets
473      """
474      return get_scanner().scan(text)