/ breach_vip.py
breach_vip.py
1 import os 2 import json 3 import re 4 import time 5 import requests 6 from datetime import datetime 7 from PyQt6.QtWidgets import (QGroupBox, QVBoxLayout, QHBoxLayout, 8 QLabel, QLineEdit, QPushButton, QMessageBox, QFileDialog) 9 from PyQt6.QtCore import QThread, pyqtSignal 10 11 class BreachVIP(QGroupBox): 12 def __init__(self, parent=None): 13 super().__init__("Breach.vip Search") 14 self.parent = parent 15 self.setup_ui() 16 17 def setup_ui(self): 18 layout = QVBoxLayout() 19 20 # Create email input section 21 email_label = QLabel("Email to Search:") 22 self.hudson_email_input = QLineEdit() 23 self.hudson_email_input.setPlaceholderText("Enter single email or select file for batch...") 24 layout.addWidget(email_label) 25 layout.addWidget(self.hudson_email_input) 26 27 # Button layout for single and batch operations 28 button_layout = QHBoxLayout() 29 30 # Single search button 31 self.breach_search_button = QPushButton("Search Single Email") 32 self.breach_search_button.clicked.connect(self.search_single_email) 33 button_layout.addWidget(self.breach_search_button) 34 35 # Batch search button 36 self.batch_search_button = QPushButton("Search Batch File") 37 self.batch_search_button.clicked.connect(self.search_batch_file) 38 button_layout.addWidget(self.batch_search_button) 39 40 # Clear button 41 self.clear_button = QPushButton("Clear") 42 self.clear_button.clicked.connect(self.clear_inputs) 43 button_layout.addWidget(self.clear_button) 44 45 layout.addLayout(button_layout) 46 47 self.setLayout(layout) 48 49 def clear_inputs(self): 50 """Clear all input fields""" 51 self.hudson_email_input.clear() 52 53 def search_single_email(self): 54 """Search for a single email""" 55 email = self.hudson_email_input.text().strip() 56 57 if not email: 58 QMessageBox.warning(self, "Input Error", "Please enter an email address to search.") 59 return 60 61 # Validate single email format 62 if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email): 63 QMessageBox.warning(self, "Input Error", "Please enter a valid email address.") 64 return 65 66 self.process_single_breach_email(email) 67 68 def search_batch_file(self): 69 """Search for emails from a file""" 70 file_name, _ = QFileDialog.getOpenFileName( 71 self, 72 "Select Email File for Batch Search", 73 "", 74 "Text Files (*.txt);;All Files (*)" 75 ) 76 77 if file_name: 78 self.process_breach_email_file(file_name) 79 80 def get_output_area(self): 81 """Get the output area from parent""" 82 return self.parent.output_area if self.parent else None 83 84 def process_breach_email_file(self, file_path): 85 """Process a file containing multiple emails for Breach.vip search""" 86 output_area = self.get_output_area() 87 if not output_area: 88 return 89 90 try: 91 with open(file_path, 'r', encoding='utf-8') as f: 92 emails = [line.strip() for line in f if line.strip()] 93 94 if not emails: 95 QMessageBox.warning(self, "File Error", "The file is empty or contains no valid emails.") 96 return 97 98 valid_emails = [] 99 invalid_emails = [] 100 101 # Validate emails 102 for email in emails: 103 if re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email): 104 valid_emails.append(email) 105 else: 106 invalid_emails.append(email) 107 108 if not valid_emails: 109 QMessageBox.warning(self, "File Error", "No valid email addresses found in the file.") 110 return 111 112 # Show confirmation dialog for multiple emails 113 if len(valid_emails) > 1: 114 reply = QMessageBox.question( 115 self, 116 "Multiple Emails Found", 117 f"Found {len(valid_emails)} valid email(s) and {len(invalid_emails)} invalid entry(s).\n\n" 118 f"Do you want to search all {len(valid_emails)} emails? This may take a while due to rate limits.", 119 QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No 120 ) 121 if reply != QMessageBox.StandardButton.Yes: 122 return 123 124 # Process all valid emails 125 output_area.append(f"š Processing {len(valid_emails)} email(s) from file: {os.path.basename(file_path)}") 126 if invalid_emails: 127 output_area.append(f"ā ļø Skipped {len(invalid_emails)} invalid entries") 128 129 output_area.append("=" * 60) 130 131 # Create results directory if it doesn't exist 132 results_dir = "results" 133 if not os.path.exists(results_dir): 134 os.makedirs(results_dir) 135 136 # Generate batch filename 137 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 138 batch_filename = f"breach_vip_batch_{timestamp}.txt" 139 batch_filepath = os.path.join(results_dir, batch_filename) 140 141 all_results = [] 142 143 for i, email in enumerate(valid_emails, 1): 144 output_area.append(f"\nš [{i}/{len(valid_emails)}] Searching: {email}") 145 146 try: 147 result = self.search_single_breach_email_api(email) 148 all_results.append({ 149 'email': email, 150 'result': result, 151 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') 152 }) 153 154 # Display brief result 155 if result.get('success', False) and result.get('data'): 156 data = result['data'] 157 if data.get('results') and len(data['results']) > 0: 158 record_count = len(data['results']) 159 unique_breaches = len(set(r.get('source', '') for r in data['results'])) 160 output_area.append(f" šØ Found {record_count} records across {unique_breaches} breaches") 161 else: 162 output_area.append(f" ā No breach records found") 163 else: 164 output_area.append(f" ā Search failed: {result.get('error', 'Unknown error')}") 165 166 # Respect rate limit - wait between requests 167 if i < len(valid_emails): # Don't wait after the last one 168 time.sleep(4) # 4 seconds between requests to stay under 15/minute 169 170 except Exception as e: 171 output_area.append(f" ā Error searching {email}: {e}") 172 all_results.append({ 173 'email': email, 174 'result': {'success': False, 'error': str(e)}, 175 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') 176 }) 177 178 # Save batch results 179 self.save_breach_batch_results(all_results, batch_filepath) 180 output_area.append(f"\nš¾ Batch results saved to: {batch_filepath}") 181 output_area.append("š Batch search completed!") 182 183 except Exception as e: 184 output_area.append(f"ā Error processing file: {e}") 185 186 def process_single_breach_email(self, email): 187 """Process a single email for Breach.vip search""" 188 output_area = self.get_output_area() 189 if not output_area: 190 return 191 192 output_area.append(f"š Searching Breach.vip for: {email}") 193 194 try: 195 result = self.search_single_breach_email_api(email) 196 197 if result.get('success', False): 198 data = result['data'] 199 self.display_breach_results(data, email) 200 else: 201 output_area.append(f"ā Search failed: {result.get('error', 'Unknown error')}") 202 203 except Exception as e: 204 output_area.append(f"ā Error searching {email}: {e}") 205 206 def search_single_breach_email_api(self, email): 207 """Make API call to Breach.vip for a single email""" 208 try: 209 url = "https://breach.vip/api/search" 210 211 payload = { 212 "term": email, 213 "fields": ["email"], 214 "categories": None, 215 "wildcard": False, 216 "case_sensitive": False 217 } 218 219 headers = { 220 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 221 'Content-Type': 'application/json', 222 'Accept': 'application/json' 223 } 224 225 response = requests.post(url, json=payload, headers=headers, timeout=30) 226 227 if response.status_code == 200: 228 return { 229 'success': True, 230 'data': response.json(), 231 'status_code': response.status_code 232 } 233 else: 234 error_msg = f"API returned status {response.status_code}" 235 if response.status_code == 429: 236 error_msg = "Rate limited - please wait 1 minute" 237 elif response.status_code == 400: 238 error_msg = "Bad request" 239 elif response.status_code == 500: 240 error_msg = "Internal server error" 241 242 return { 243 'success': False, 244 'error': error_msg, 245 'status_code': response.status_code 246 } 247 248 except requests.exceptions.RequestException as e: 249 return { 250 'success': False, 251 'error': f"Network error: {e}" 252 } 253 except Exception as e: 254 return { 255 'success': False, 256 'error': f"Unexpected error: {e}" 257 } 258 259 def save_breach_batch_results(self, all_results, filepath): 260 """Save batch Breach.vip results to file""" 261 output_area = self.get_output_area() 262 try: 263 # Calculate summary stats BEFORE opening the file 264 successful_searches = sum(1 for r in all_results if r['result'].get('success')) 265 total_records = sum(len(r['result'].get('data', {}).get('results', [])) 266 for r in all_results if r['result'].get('success')) 267 emails_with_breaches = sum(1 for r in all_results 268 if r['result'].get('success') and 269 r['result'].get('data', {}).get('results')) 270 271 with open(filepath, 'w', encoding='utf-8') as f: 272 f.write("BREACH.VIP BATCH SEARCH RESULTS\n") 273 f.write("=" * 60 + "\n") 274 f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") 275 f.write(f"Total Emails Searched: {len(all_results)}\n") 276 f.write("=" * 60 + "\n\n") 277 278 for result in all_results: 279 email = result['email'] 280 search_result = result['result'] 281 timestamp = result['timestamp'] 282 283 f.write(f"EMAIL: {email}\n") 284 f.write(f"SEARCH TIME: {timestamp}\n") 285 f.write(f"STATUS: {'SUCCESS' if search_result.get('success') else 'FAILED'}\n") 286 287 if search_result.get('success') and search_result.get('data'): 288 data = search_result['data'] 289 if data.get('results') and len(data['results']) > 0: 290 records = data['results'] 291 f.write(f"RECORDS FOUND: {len(records)}\n") 292 293 # Group by breach source 294 breaches_by_source = {} 295 for record in records: 296 source = record.get('source', 'Unknown Source') 297 if source not in breaches_by_source: 298 breaches_by_source[source] = [] 299 breaches_by_source[source].append(record) 300 301 f.write("BREACHES:\n") 302 for source, source_records in breaches_by_source.items(): 303 f.write(f" - {source}: {len(source_records)} record(s)\n") 304 305 # Show sample data from first record of each source 306 f.write("SAMPLE DATA:\n") 307 for source, source_records in breaches_by_source.items(): 308 f.write(f" {source}:\n") 309 sample_record = source_records[0] 310 for key, value in sample_record.items(): 311 if key not in ['source', 'categories'] and value: 312 f.write(f" {key}: {value}\n") 313 else: 314 f.write("RECORDS FOUND: 0\n") 315 f.write("STATUS: No breach records found\n") 316 else: 317 f.write(f"ERROR: {search_result.get('error', 'Unknown error')}\n") 318 319 f.write("-" * 40 + "\n\n") 320 321 # Add summary INSIDE the with block 322 f.write("SUMMARY\n") 323 f.write("=" * 60 + "\n") 324 f.write(f"Successful searches: {successful_searches}/{len(all_results)}\n") 325 f.write(f"Emails with breaches: {emails_with_breaches}\n") 326 f.write(f"Total breach records found: {total_records}\n") 327 f.write("=" * 60 + "\n") 328 329 except Exception as e: 330 if output_area: 331 output_area.append(f"ā Error saving batch results: {e}") 332 333 def display_breach_results(self, data, email): 334 """Display Breach.vip results in a formatted way and save to file""" 335 output_area = self.get_output_area() 336 if not output_area: 337 return 338 339 output_area.append(f"\nš BREACH.VIP RESULTS FOR: {email}") 340 output_area.append("=" * 60) 341 342 # Create results directory if it doesn't exist 343 results_dir = "results" 344 if not os.path.exists(results_dir): 345 os.makedirs(results_dir) 346 347 # Generate filename 348 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 349 safe_email = re.sub(r'[^\w\-_.]', '_', email.split('@')[0]) 350 filename = f"breach_vip_{safe_email}_{timestamp}.txt" 351 filepath = os.path.join(results_dir, filename) 352 353 # Prepare content for both display and file 354 display_lines = [] 355 file_lines = [] 356 357 try: 358 # Check if we have results according to the API response format 359 if 'results' in data and isinstance(data['results'], list): 360 results = data['results'] 361 362 if len(results) > 0: 363 display_lines.append(f"šØ Found {len(results)} breach record(s)") 364 file_lines.append(f"BREACH.VIP RESULTS FOR: {email}") 365 file_lines.append(f"Search Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") 366 file_lines.append(f"Records Found: {len(results)}") 367 file_lines.append("=" * 60) 368 file_lines.append("") 369 370 # Display each result 371 for i, result in enumerate(results, 1): 372 display_lines.append(f"š¦ Record #{i}:") 373 file_lines.append(f"RECORD #{i}:") 374 375 # Source (breach name) 376 source = result.get('source', 'Unknown Source') 377 display_lines.append(f" š Breach: {source}") 378 file_lines.append(f"Breach: {source}") 379 380 # Categories 381 categories = result.get('categories') 382 if categories: 383 if isinstance(categories, list): 384 display_lines.append(f" š·ļø Categories: {', '.join(categories)}") 385 file_lines.append(f"Categories: {', '.join(categories)}") 386 else: 387 display_lines.append(f" š·ļø Category: {categories}") 388 file_lines.append(f"Category: {categories}") 389 390 # Show all other fields (excluding source and categories) 391 other_fields = {k: v for k, v in result.items() if k not in ['source', 'categories']} 392 for field_name, field_value in other_fields.items(): 393 if field_value: # Only show non-empty fields 394 # Truncate long values for display 395 display_value = str(field_value) 396 file_value = str(field_value) 397 398 if len(display_value) > 100: 399 display_value = display_value[:100] + "..." 400 401 display_lines.append(f" š {field_name}: {display_value}") 402 file_lines.append(f"{field_name}: {file_value}") 403 404 display_lines.append("") # Empty line between records 405 file_lines.append("") # Empty line between records 406 407 # Summary 408 unique_breaches = len(set(result.get('source', '') for result in results)) 409 display_lines.append(f"š Summary: {len(results)} records across {unique_breaches} unique breaches") 410 file_lines.append(f"SUMMARY: {len(results)} records across {unique_breaches} unique breaches") 411 412 else: 413 display_lines.append("ā No breach records found for this email") 414 display_lines.append("š” This email appears clean in Breach.vip database") 415 416 file_lines.append(f"BREACH.VIP RESULTS FOR: {email}") 417 file_lines.append(f"Search Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") 418 file_lines.append("RESULTS: No breach records found") 419 file_lines.append("STATUS: Email appears clean in Breach.vip database") 420 else: 421 display_lines.append("ā Unexpected response format from API") 422 file_lines.append(f"BREACH.VIP RESULTS FOR: {email}") 423 file_lines.append(f"Search Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") 424 file_lines.append("ERROR: Unexpected response format from API") 425 file_lines.append(f"RAW_RESPONSE: {json.dumps(data, indent=2)}") 426 427 except Exception as e: 428 error_msg = f"ā Error processing results: {e}" 429 display_lines.append(error_msg) 430 file_lines.append(f"BREACH.VIP RESULTS FOR: {email}") 431 file_lines.append(f"Search Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") 432 file_lines.append(f"ERROR: {error_msg}") 433 file_lines.append(f"RAW_RESPONSE: {json.dumps(data, indent=2)}") 434 435 # Add footer 436 display_lines.append("=" * 60) 437 display_lines.append("š” Note: Rate limit is 15 requests per minute") 438 439 file_lines.append("=" * 60) 440 file_lines.append(f"Report generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") 441 file_lines.append("Note: Breach.vip rate limit is 15 requests per minute") 442 443 # Display results in GUI 444 for line in display_lines: 445 output_area.append(line) 446 447 # Save to file 448 try: 449 with open(filepath, 'w', encoding='utf-8') as f: 450 f.write('\n'.join(file_lines)) 451 output_area.append(f"š¾ Results saved to: {filepath}") 452 except Exception as e: 453 output_area.append(f"ā Error saving results to file: {e}")