/ hexstrike_mcp.py
hexstrike_mcp.py
1 #!/usr/bin/env python3 2 """ 3 HexStrike AI MCP Client - Enhanced AI Agent Communication Interface 4 5 Enhanced with AI-Powered Intelligence & Automation 6 ๐ Bug Bounty | CTF | Red Team | Security Research 7 8 RECENT ENHANCEMENTS (v6.0): 9 โ Complete color consistency with reddish hacker theme 10 โ Enhanced visual output with consistent styling 11 โ Improved error handling and recovery systems 12 โ FastMCP integration for seamless AI communication 13 โ 100+ security tools with intelligent parameter optimization 14 โ Advanced logging with colored output and emojis 15 16 Architecture: MCP Client for AI agent communication with HexStrike server 17 Framework: FastMCP integration for tool orchestration 18 """ 19 20 import sys 21 import os 22 import argparse 23 import logging 24 from typing import Dict, Any, Optional 25 import requests 26 import time 27 from datetime import datetime 28 29 from mcp.server.fastmcp import FastMCP 30 31 class HexStrikeColors: 32 """Enhanced color palette matching the server's ModernVisualEngine.COLORS""" 33 34 # Basic colors (for backward compatibility) 35 RED = '\033[91m' 36 GREEN = '\033[92m' 37 YELLOW = '\033[93m' 38 BLUE = '\033[94m' 39 MAGENTA = '\033[95m' 40 CYAN = '\033[96m' 41 WHITE = '\033[97m' 42 43 # Core enhanced colors 44 MATRIX_GREEN = '\033[38;5;46m' 45 NEON_BLUE = '\033[38;5;51m' 46 ELECTRIC_PURPLE = '\033[38;5;129m' 47 CYBER_ORANGE = '\033[38;5;208m' 48 HACKER_RED = '\033[38;5;196m' 49 TERMINAL_GRAY = '\033[38;5;240m' 50 BRIGHT_WHITE = '\033[97m' 51 RESET = '\033[0m' 52 BOLD = '\033[1m' 53 DIM = '\033[2m' 54 55 # Enhanced reddish tones and highlighting colors 56 BLOOD_RED = '\033[38;5;124m' 57 CRIMSON = '\033[38;5;160m' 58 DARK_RED = '\033[38;5;88m' 59 FIRE_RED = '\033[38;5;202m' 60 ROSE_RED = '\033[38;5;167m' 61 BURGUNDY = '\033[38;5;52m' 62 SCARLET = '\033[38;5;197m' 63 RUBY = '\033[38;5;161m' 64 65 # Highlighting colors 66 HIGHLIGHT_RED = '\033[48;5;196m\033[38;5;15m' # Red background, white text 67 HIGHLIGHT_YELLOW = '\033[48;5;226m\033[38;5;16m' # Yellow background, black text 68 HIGHLIGHT_GREEN = '\033[48;5;46m\033[38;5;16m' # Green background, black text 69 HIGHLIGHT_BLUE = '\033[48;5;51m\033[38;5;16m' # Blue background, black text 70 HIGHLIGHT_PURPLE = '\033[48;5;129m\033[38;5;15m' # Purple background, white text 71 72 # Status colors with reddish tones 73 SUCCESS = '\033[38;5;46m' # Bright green 74 WARNING = '\033[38;5;208m' # Orange 75 ERROR = '\033[38;5;196m' # Bright red 76 CRITICAL = '\033[48;5;196m\033[38;5;15m\033[1m' # Red background, white bold text 77 INFO = '\033[38;5;51m' # Cyan 78 DEBUG = '\033[38;5;240m' # Gray 79 80 # Vulnerability severity colors 81 VULN_CRITICAL = '\033[48;5;124m\033[38;5;15m\033[1m' # Dark red background 82 VULN_HIGH = '\033[38;5;196m\033[1m' # Bright red bold 83 VULN_MEDIUM = '\033[38;5;208m\033[1m' # Orange bold 84 VULN_LOW = '\033[38;5;226m' # Yellow 85 VULN_INFO = '\033[38;5;51m' # Cyan 86 87 # Tool status colors 88 TOOL_RUNNING = '\033[38;5;46m\033[5m' # Blinking green 89 TOOL_SUCCESS = '\033[38;5;46m\033[1m' # Bold green 90 TOOL_FAILED = '\033[38;5;196m\033[1m' # Bold red 91 TOOL_TIMEOUT = '\033[38;5;208m\033[1m' # Bold orange 92 TOOL_RECOVERY = '\033[38;5;129m\033[1m' # Bold purple 93 94 # Backward compatibility alias 95 Colors = HexStrikeColors 96 97 class ColoredFormatter(logging.Formatter): 98 """Enhanced formatter with colors and emojis for MCP client - matches server styling""" 99 100 COLORS = { 101 'DEBUG': HexStrikeColors.DEBUG, 102 'INFO': HexStrikeColors.SUCCESS, 103 'WARNING': HexStrikeColors.WARNING, 104 'ERROR': HexStrikeColors.ERROR, 105 'CRITICAL': HexStrikeColors.CRITICAL 106 } 107 108 EMOJIS = { 109 'DEBUG': '๐', 110 'INFO': 'โ ', 111 'WARNING': 'โ ๏ธ', 112 'ERROR': 'โ', 113 'CRITICAL': '๐ฅ' 114 } 115 116 def format(self, record): 117 emoji = self.EMOJIS.get(record.levelname, '๐') 118 color = self.COLORS.get(record.levelname, HexStrikeColors.BRIGHT_WHITE) 119 120 # Add color and emoji to the message 121 record.msg = f"{color}{emoji} {record.msg}{HexStrikeColors.RESET}" 122 return super().format(record) 123 124 # Setup logging 125 logging.basicConfig( 126 level=logging.INFO, 127 format="[๐ฅ HexStrike MCP] %(asctime)s [%(levelname)s] %(message)s", 128 handlers=[ 129 logging.StreamHandler(sys.stderr) 130 ] 131 ) 132 133 # Apply colored formatter 134 for handler in logging.getLogger().handlers: 135 handler.setFormatter(ColoredFormatter( 136 "[๐ฅ HexStrike MCP] %(asctime)s [%(levelname)s] %(message)s", 137 datefmt="%Y-%m-%d %H:%M:%S" 138 )) 139 140 logger = logging.getLogger(__name__) 141 142 # Default configuration 143 DEFAULT_HEXSTRIKE_SERVER = "http://127.0.0.1:8888" # Default HexStrike server URL 144 DEFAULT_REQUEST_TIMEOUT = 300 # 5 minutes default timeout for API requests 145 MAX_RETRIES = 3 # Maximum number of retries for connection attempts 146 147 class HexStrikeClient: 148 """Enhanced client for communicating with the HexStrike AI API Server""" 149 150 def __init__(self, server_url: str, timeout: int = DEFAULT_REQUEST_TIMEOUT): 151 """ 152 Initialize the HexStrike AI Client 153 154 Args: 155 server_url: URL of the HexStrike AI API Server 156 timeout: Request timeout in seconds 157 """ 158 self.server_url = server_url.rstrip("/") 159 self.timeout = timeout 160 self.session = requests.Session() 161 162 # Try to connect to server with retries 163 connected = False 164 for i in range(MAX_RETRIES): 165 try: 166 logger.info(f"๐ Attempting to connect to HexStrike AI API at {server_url} (attempt {i+1}/{MAX_RETRIES})") 167 # First try a direct connection test before using the health endpoint 168 try: 169 test_response = self.session.get(f"{self.server_url}/health", timeout=5) 170 test_response.raise_for_status() 171 health_check = test_response.json() 172 connected = True 173 logger.info(f"๐ฏ Successfully connected to HexStrike AI API Server at {server_url}") 174 logger.info(f"๐ฅ Server health status: {health_check.get('status', 'unknown')}") 175 logger.info(f"๐ Server version: {health_check.get('version', 'unknown')}") 176 break 177 except requests.exceptions.ConnectionError: 178 logger.warning(f"๐ Connection refused to {server_url}. Make sure the HexStrike AI server is running.") 179 time.sleep(2) # Wait before retrying 180 except Exception as e: 181 logger.warning(f"โ ๏ธ Connection test failed: {str(e)}") 182 time.sleep(2) # Wait before retrying 183 except Exception as e: 184 logger.warning(f"โ Connection attempt {i+1} failed: {str(e)}") 185 time.sleep(2) # Wait before retrying 186 187 if not connected: 188 error_msg = f"Failed to establish connection to HexStrike AI API Server at {server_url} after {MAX_RETRIES} attempts" 189 logger.error(error_msg) 190 # We'll continue anyway to allow the MCP server to start, but tools will likely fail 191 192 def safe_get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: 193 """ 194 Perform a GET request with optional query parameters. 195 196 Args: 197 endpoint: API endpoint path (without leading slash) 198 params: Optional query parameters 199 200 Returns: 201 Response data as dictionary 202 """ 203 if params is None: 204 params = {} 205 206 url = f"{self.server_url}/{endpoint}" 207 208 try: 209 logger.debug(f"๐ก GET {url} with params: {params}") 210 response = self.session.get(url, params=params, timeout=self.timeout) 211 response.raise_for_status() 212 return response.json() 213 except requests.exceptions.RequestException as e: 214 logger.error(f"๐ซ Request failed: {str(e)}") 215 return {"error": f"Request failed: {str(e)}", "success": False} 216 except Exception as e: 217 logger.error(f"๐ฅ Unexpected error: {str(e)}") 218 return {"error": f"Unexpected error: {str(e)}", "success": False} 219 220 def safe_post(self, endpoint: str, json_data: Dict[str, Any]) -> Dict[str, Any]: 221 """ 222 Perform a POST request with JSON data. 223 224 Args: 225 endpoint: API endpoint path (without leading slash) 226 json_data: JSON data to send 227 228 Returns: 229 Response data as dictionary 230 """ 231 url = f"{self.server_url}/{endpoint}" 232 233 try: 234 logger.debug(f"๐ก POST {url} with data: {json_data}") 235 response = self.session.post(url, json=json_data, timeout=self.timeout) 236 response.raise_for_status() 237 return response.json() 238 except requests.exceptions.RequestException as e: 239 logger.error(f"๐ซ Request failed: {str(e)}") 240 return {"error": f"Request failed: {str(e)}", "success": False} 241 except Exception as e: 242 logger.error(f"๐ฅ Unexpected error: {str(e)}") 243 return {"error": f"Unexpected error: {str(e)}", "success": False} 244 245 def execute_command(self, command: str, use_cache: bool = True) -> Dict[str, Any]: 246 """ 247 Execute a generic command on the HexStrike server 248 249 Args: 250 command: Command to execute 251 use_cache: Whether to use caching for this command 252 253 Returns: 254 Command execution results 255 """ 256 return self.safe_post("api/command", {"command": command, "use_cache": use_cache}) 257 258 def check_health(self) -> Dict[str, Any]: 259 """ 260 Check the health of the HexStrike AI API Server 261 262 Returns: 263 Health status information 264 """ 265 return self.safe_get("health") 266 267 def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: 268 """ 269 Set up the MCP server with all enhanced tool functions 270 271 Args: 272 hexstrike_client: Initialized HexStrikeClient 273 274 Returns: 275 Configured FastMCP instance 276 """ 277 mcp = FastMCP("hexstrike-ai-mcp") 278 279 # ============================================================================ 280 # CORE NETWORK SCANNING TOOLS 281 # ============================================================================ 282 283 @mcp.tool() 284 def nmap_scan(target: str, scan_type: str = "-sV", ports: str = "", additional_args: str = "") -> Dict[str, Any]: 285 """ 286 Execute an enhanced Nmap scan against a target with real-time logging. 287 288 Args: 289 target: The IP address or hostname to scan 290 scan_type: Scan type (e.g., -sV for version detection, -sC for scripts) 291 ports: Comma-separated list of ports or port ranges 292 additional_args: Additional Nmap arguments 293 294 Returns: 295 Scan results with enhanced telemetry 296 """ 297 data = { 298 "target": target, 299 "scan_type": scan_type, 300 "ports": ports, 301 "additional_args": additional_args 302 } 303 logger.info(f"{HexStrikeColors.FIRE_RED}๐ Initiating Nmap scan: {target}{HexStrikeColors.RESET}") 304 305 # Use enhanced error handling by default 306 data["use_recovery"] = True 307 result = hexstrike_client.safe_post("api/tools/nmap", data) 308 309 if result.get("success"): 310 logger.info(f"{HexStrikeColors.SUCCESS}โ Nmap scan completed successfully for {target}{HexStrikeColors.RESET}") 311 312 # Check for recovery information 313 if result.get("recovery_info", {}).get("recovery_applied"): 314 recovery_info = result["recovery_info"] 315 attempts = recovery_info.get("attempts_made", 1) 316 logger.info(f"{HexStrikeColors.HIGHLIGHT_YELLOW} Recovery applied: {attempts} attempts made {HexStrikeColors.RESET}") 317 else: 318 logger.error(f"{HexStrikeColors.ERROR}โ Nmap scan failed for {target}{HexStrikeColors.RESET}") 319 320 # Check for human escalation 321 if result.get("human_escalation"): 322 logger.error(f"{HexStrikeColors.CRITICAL} HUMAN ESCALATION REQUIRED {HexStrikeColors.RESET}") 323 324 return result 325 326 @mcp.tool() 327 def gobuster_scan(url: str, mode: str = "dir", wordlist: str = "/usr/share/wordlists/dirb/common.txt", additional_args: str = "") -> Dict[str, Any]: 328 """ 329 Execute Gobuster to find directories, DNS subdomains, or virtual hosts with enhanced logging. 330 331 Args: 332 url: The target URL 333 mode: Scan mode (dir, dns, fuzz, vhost) 334 wordlist: Path to wordlist file 335 additional_args: Additional Gobuster arguments 336 337 Returns: 338 Scan results with enhanced telemetry 339 """ 340 data = { 341 "url": url, 342 "mode": mode, 343 "wordlist": wordlist, 344 "additional_args": additional_args 345 } 346 logger.info(f"{HexStrikeColors.CRIMSON}๐ Starting Gobuster {mode} scan: {url}{HexStrikeColors.RESET}") 347 348 # Use enhanced error handling by default 349 data["use_recovery"] = True 350 result = hexstrike_client.safe_post("api/tools/gobuster", data) 351 352 if result.get("success"): 353 logger.info(f"{HexStrikeColors.SUCCESS}โ Gobuster scan completed for {url}{HexStrikeColors.RESET}") 354 355 # Check for recovery information 356 if result.get("recovery_info", {}).get("recovery_applied"): 357 recovery_info = result["recovery_info"] 358 attempts = recovery_info.get("attempts_made", 1) 359 logger.info(f"{HexStrikeColors.HIGHLIGHT_YELLOW} Recovery applied: {attempts} attempts made {HexStrikeColors.RESET}") 360 else: 361 logger.error(f"{HexStrikeColors.ERROR}โ Gobuster scan failed for {url}{HexStrikeColors.RESET}") 362 363 # Check for alternative tool suggestion 364 if result.get("alternative_tool_suggested"): 365 alt_tool = result["alternative_tool_suggested"] 366 logger.info(f"{HexStrikeColors.HIGHLIGHT_BLUE} Alternative tool suggested: {alt_tool} {HexStrikeColors.RESET}") 367 368 return result 369 370 @mcp.tool() 371 def nuclei_scan(target: str, severity: str = "", tags: str = "", template: str = "", additional_args: str = "") -> Dict[str, Any]: 372 """ 373 Execute Nuclei vulnerability scanner with enhanced logging and real-time progress. 374 375 Args: 376 target: The target URL or IP 377 severity: Filter by severity (critical,high,medium,low,info) 378 tags: Filter by tags (e.g. cve,rce,lfi) 379 template: Custom template path 380 additional_args: Additional Nuclei arguments 381 382 Returns: 383 Scan results with discovered vulnerabilities and telemetry 384 """ 385 data = { 386 "target": target, 387 "severity": severity, 388 "tags": tags, 389 "template": template, 390 "additional_args": additional_args 391 } 392 logger.info(f"{HexStrikeColors.BLOOD_RED}๐ฌ Starting Nuclei vulnerability scan: {target}{HexStrikeColors.RESET}") 393 394 # Use enhanced error handling by default 395 data["use_recovery"] = True 396 result = hexstrike_client.safe_post("api/tools/nuclei", data) 397 398 if result.get("success"): 399 logger.info(f"{HexStrikeColors.SUCCESS}โ Nuclei scan completed for {target}{HexStrikeColors.RESET}") 400 401 # Enhanced vulnerability reporting 402 if result.get("stdout") and "CRITICAL" in result["stdout"]: 403 logger.warning(f"{HexStrikeColors.CRITICAL} CRITICAL vulnerabilities detected! {HexStrikeColors.RESET}") 404 elif result.get("stdout") and "HIGH" in result["stdout"]: 405 logger.warning(f"{HexStrikeColors.FIRE_RED} HIGH severity vulnerabilities found! {HexStrikeColors.RESET}") 406 407 # Check for recovery information 408 if result.get("recovery_info", {}).get("recovery_applied"): 409 recovery_info = result["recovery_info"] 410 attempts = recovery_info.get("attempts_made", 1) 411 logger.info(f"{HexStrikeColors.HIGHLIGHT_YELLOW} Recovery applied: {attempts} attempts made {HexStrikeColors.RESET}") 412 else: 413 logger.error(f"{HexStrikeColors.ERROR}โ Nuclei scan failed for {target}{HexStrikeColors.RESET}") 414 415 return result 416 417 # ============================================================================ 418 # CLOUD SECURITY TOOLS 419 # ============================================================================ 420 421 @mcp.tool() 422 def prowler_scan(provider: str = "aws", profile: str = "default", region: str = "", checks: str = "", output_dir: str = "/tmp/prowler_output", output_format: str = "json", additional_args: str = "") -> Dict[str, Any]: 423 """ 424 Execute Prowler for comprehensive cloud security assessment. 425 426 Args: 427 provider: Cloud provider (aws, azure, gcp) 428 profile: AWS profile to use 429 region: Specific region to scan 430 checks: Specific checks to run 431 output_dir: Directory to save results 432 output_format: Output format (json, csv, html) 433 additional_args: Additional Prowler arguments 434 435 Returns: 436 Cloud security assessment results 437 """ 438 data = { 439 "provider": provider, 440 "profile": profile, 441 "region": region, 442 "checks": checks, 443 "output_dir": output_dir, 444 "output_format": output_format, 445 "additional_args": additional_args 446 } 447 logger.info(f"โ๏ธ Starting Prowler {provider} security assessment") 448 result = hexstrike_client.safe_post("api/tools/prowler", data) 449 if result.get("success"): 450 logger.info(f"โ Prowler assessment completed") 451 else: 452 logger.error(f"โ Prowler assessment failed") 453 return result 454 455 @mcp.tool() 456 def trivy_scan(scan_type: str = "image", target: str = "", output_format: str = "json", severity: str = "", output_file: str = "", additional_args: str = "") -> Dict[str, Any]: 457 """ 458 Execute Trivy for container and filesystem vulnerability scanning. 459 460 Args: 461 scan_type: Type of scan (image, fs, repo, config) 462 target: Target to scan (image name, directory, repository) 463 output_format: Output format (json, table, sarif) 464 severity: Severity filter (UNKNOWN,LOW,MEDIUM,HIGH,CRITICAL) 465 output_file: File to save results 466 additional_args: Additional Trivy arguments 467 468 Returns: 469 Vulnerability scan results 470 """ 471 data = { 472 "scan_type": scan_type, 473 "target": target, 474 "output_format": output_format, 475 "severity": severity, 476 "output_file": output_file, 477 "additional_args": additional_args 478 } 479 logger.info(f"๐ Starting Trivy {scan_type} scan: {target}") 480 result = hexstrike_client.safe_post("api/tools/trivy", data) 481 if result.get("success"): 482 logger.info(f"โ Trivy scan completed for {target}") 483 else: 484 logger.error(f"โ Trivy scan failed for {target}") 485 return result 486 487 # ============================================================================ 488 # ENHANCED CLOUD AND CONTAINER SECURITY TOOLS (v6.0) 489 # ============================================================================ 490 491 @mcp.tool() 492 def scout_suite_assessment(provider: str = "aws", profile: str = "default", 493 report_dir: str = "/tmp/scout-suite", services: str = "", 494 exceptions: str = "", additional_args: str = "") -> Dict[str, Any]: 495 """ 496 Execute Scout Suite for multi-cloud security assessment. 497 498 Args: 499 provider: Cloud provider (aws, azure, gcp, aliyun, oci) 500 profile: AWS profile to use 501 report_dir: Directory to save reports 502 services: Specific services to assess 503 exceptions: Exceptions file path 504 additional_args: Additional Scout Suite arguments 505 506 Returns: 507 Multi-cloud security assessment results 508 """ 509 data = { 510 "provider": provider, 511 "profile": profile, 512 "report_dir": report_dir, 513 "services": services, 514 "exceptions": exceptions, 515 "additional_args": additional_args 516 } 517 logger.info(f"โ๏ธ Starting Scout Suite {provider} assessment") 518 result = hexstrike_client.safe_post("api/tools/scout-suite", data) 519 if result.get("success"): 520 logger.info(f"โ Scout Suite assessment completed") 521 else: 522 logger.error(f"โ Scout Suite assessment failed") 523 return result 524 525 @mcp.tool() 526 def cloudmapper_analysis(action: str = "collect", account: str = "", 527 config: str = "config.json", additional_args: str = "") -> Dict[str, Any]: 528 """ 529 Execute CloudMapper for AWS network visualization and security analysis. 530 531 Args: 532 action: Action to perform (collect, prepare, webserver, find_admins, etc.) 533 account: AWS account to analyze 534 config: Configuration file path 535 additional_args: Additional CloudMapper arguments 536 537 Returns: 538 AWS network visualization and security analysis results 539 """ 540 data = { 541 "action": action, 542 "account": account, 543 "config": config, 544 "additional_args": additional_args 545 } 546 logger.info(f"โ๏ธ Starting CloudMapper {action}") 547 result = hexstrike_client.safe_post("api/tools/cloudmapper", data) 548 if result.get("success"): 549 logger.info(f"โ CloudMapper {action} completed") 550 else: 551 logger.error(f"โ CloudMapper {action} failed") 552 return result 553 554 @mcp.tool() 555 def pacu_exploitation(session_name: str = "hexstrike_session", modules: str = "", 556 data_services: str = "", regions: str = "", 557 additional_args: str = "") -> Dict[str, Any]: 558 """ 559 Execute Pacu for AWS exploitation framework. 560 561 Args: 562 session_name: Pacu session name 563 modules: Comma-separated list of modules to run 564 data_services: Data services to enumerate 565 regions: AWS regions to target 566 additional_args: Additional Pacu arguments 567 568 Returns: 569 AWS exploitation framework results 570 """ 571 data = { 572 "session_name": session_name, 573 "modules": modules, 574 "data_services": data_services, 575 "regions": regions, 576 "additional_args": additional_args 577 } 578 logger.info(f"โ๏ธ Starting Pacu AWS exploitation") 579 result = hexstrike_client.safe_post("api/tools/pacu", data) 580 if result.get("success"): 581 logger.info(f"โ Pacu exploitation completed") 582 else: 583 logger.error(f"โ Pacu exploitation failed") 584 return result 585 586 @mcp.tool() 587 def kube_hunter_scan(target: str = "", remote: str = "", cidr: str = "", 588 interface: str = "", active: bool = False, report: str = "json", 589 additional_args: str = "") -> Dict[str, Any]: 590 """ 591 Execute kube-hunter for Kubernetes penetration testing. 592 593 Args: 594 target: Specific target to scan 595 remote: Remote target to scan 596 cidr: CIDR range to scan 597 interface: Network interface to scan 598 active: Enable active hunting (potentially harmful) 599 report: Report format (json, yaml) 600 additional_args: Additional kube-hunter arguments 601 602 Returns: 603 Kubernetes penetration testing results 604 """ 605 data = { 606 "target": target, 607 "remote": remote, 608 "cidr": cidr, 609 "interface": interface, 610 "active": active, 611 "report": report, 612 "additional_args": additional_args 613 } 614 logger.info(f"โ๏ธ Starting kube-hunter Kubernetes scan") 615 result = hexstrike_client.safe_post("api/tools/kube-hunter", data) 616 if result.get("success"): 617 logger.info(f"โ kube-hunter scan completed") 618 else: 619 logger.error(f"โ kube-hunter scan failed") 620 return result 621 622 @mcp.tool() 623 def kube_bench_cis(targets: str = "", version: str = "", config_dir: str = "", 624 output_format: str = "json", additional_args: str = "") -> Dict[str, Any]: 625 """ 626 Execute kube-bench for CIS Kubernetes benchmark checks. 627 628 Args: 629 targets: Targets to check (master, node, etcd, policies) 630 version: Kubernetes version 631 config_dir: Configuration directory 632 output_format: Output format (json, yaml) 633 additional_args: Additional kube-bench arguments 634 635 Returns: 636 CIS Kubernetes benchmark results 637 """ 638 data = { 639 "targets": targets, 640 "version": version, 641 "config_dir": config_dir, 642 "output_format": output_format, 643 "additional_args": additional_args 644 } 645 logger.info(f"โ๏ธ Starting kube-bench CIS benchmark") 646 result = hexstrike_client.safe_post("api/tools/kube-bench", data) 647 if result.get("success"): 648 logger.info(f"โ kube-bench benchmark completed") 649 else: 650 logger.error(f"โ kube-bench benchmark failed") 651 return result 652 653 @mcp.tool() 654 def docker_bench_security_scan(checks: str = "", exclude: str = "", 655 output_file: str = "/tmp/docker-bench-results.json", 656 additional_args: str = "") -> Dict[str, Any]: 657 """ 658 Execute Docker Bench for Security for Docker security assessment. 659 660 Args: 661 checks: Specific checks to run 662 exclude: Checks to exclude 663 output_file: Output file path 664 additional_args: Additional Docker Bench arguments 665 666 Returns: 667 Docker security assessment results 668 """ 669 data = { 670 "checks": checks, 671 "exclude": exclude, 672 "output_file": output_file, 673 "additional_args": additional_args 674 } 675 logger.info(f"๐ณ Starting Docker Bench Security assessment") 676 result = hexstrike_client.safe_post("api/tools/docker-bench-security", data) 677 if result.get("success"): 678 logger.info(f"โ Docker Bench Security completed") 679 else: 680 logger.error(f"โ Docker Bench Security failed") 681 return result 682 683 @mcp.tool() 684 def clair_vulnerability_scan(image: str, config: str = "/etc/clair/config.yaml", 685 output_format: str = "json", additional_args: str = "") -> Dict[str, Any]: 686 """ 687 Execute Clair for container vulnerability analysis. 688 689 Args: 690 image: Container image to scan 691 config: Clair configuration file 692 output_format: Output format (json, yaml) 693 additional_args: Additional Clair arguments 694 695 Returns: 696 Container vulnerability analysis results 697 """ 698 data = { 699 "image": image, 700 "config": config, 701 "output_format": output_format, 702 "additional_args": additional_args 703 } 704 logger.info(f"๐ณ Starting Clair vulnerability scan: {image}") 705 result = hexstrike_client.safe_post("api/tools/clair", data) 706 if result.get("success"): 707 logger.info(f"โ Clair scan completed for {image}") 708 else: 709 logger.error(f"โ Clair scan failed for {image}") 710 return result 711 712 @mcp.tool() 713 def falco_runtime_monitoring(config_file: str = "/etc/falco/falco.yaml", 714 rules_file: str = "", output_format: str = "json", 715 duration: int = 60, additional_args: str = "") -> Dict[str, Any]: 716 """ 717 Execute Falco for runtime security monitoring. 718 719 Args: 720 config_file: Falco configuration file 721 rules_file: Custom rules file 722 output_format: Output format (json, text) 723 duration: Monitoring duration in seconds 724 additional_args: Additional Falco arguments 725 726 Returns: 727 Runtime security monitoring results 728 """ 729 data = { 730 "config_file": config_file, 731 "rules_file": rules_file, 732 "output_format": output_format, 733 "duration": duration, 734 "additional_args": additional_args 735 } 736 logger.info(f"๐ก๏ธ Starting Falco runtime monitoring for {duration}s") 737 result = hexstrike_client.safe_post("api/tools/falco", data) 738 if result.get("success"): 739 logger.info(f"โ Falco monitoring completed") 740 else: 741 logger.error(f"โ Falco monitoring failed") 742 return result 743 744 @mcp.tool() 745 def checkov_iac_scan(directory: str = ".", framework: str = "", check: str = "", 746 skip_check: str = "", output_format: str = "json", 747 additional_args: str = "") -> Dict[str, Any]: 748 """ 749 Execute Checkov for infrastructure as code security scanning. 750 751 Args: 752 directory: Directory to scan 753 framework: Framework to scan (terraform, cloudformation, kubernetes, etc.) 754 check: Specific check to run 755 skip_check: Check to skip 756 output_format: Output format (json, yaml, cli) 757 additional_args: Additional Checkov arguments 758 759 Returns: 760 Infrastructure as code security scanning results 761 """ 762 data = { 763 "directory": directory, 764 "framework": framework, 765 "check": check, 766 "skip_check": skip_check, 767 "output_format": output_format, 768 "additional_args": additional_args 769 } 770 logger.info(f"๐ Starting Checkov IaC scan: {directory}") 771 result = hexstrike_client.safe_post("api/tools/checkov", data) 772 if result.get("success"): 773 logger.info(f"โ Checkov scan completed") 774 else: 775 logger.error(f"โ Checkov scan failed") 776 return result 777 778 @mcp.tool() 779 def terrascan_iac_scan(scan_type: str = "all", iac_dir: str = ".", 780 policy_type: str = "", output_format: str = "json", 781 severity: str = "", additional_args: str = "") -> Dict[str, Any]: 782 """ 783 Execute Terrascan for infrastructure as code security scanning. 784 785 Args: 786 scan_type: Type of scan (all, terraform, k8s, etc.) 787 iac_dir: Infrastructure as code directory 788 policy_type: Policy type to use 789 output_format: Output format (json, yaml, xml) 790 severity: Severity filter (high, medium, low) 791 additional_args: Additional Terrascan arguments 792 793 Returns: 794 Infrastructure as code security scanning results 795 """ 796 data = { 797 "scan_type": scan_type, 798 "iac_dir": iac_dir, 799 "policy_type": policy_type, 800 "output_format": output_format, 801 "severity": severity, 802 "additional_args": additional_args 803 } 804 logger.info(f"๐ Starting Terrascan IaC scan: {iac_dir}") 805 result = hexstrike_client.safe_post("api/tools/terrascan", data) 806 if result.get("success"): 807 logger.info(f"โ Terrascan scan completed") 808 else: 809 logger.error(f"โ Terrascan scan failed") 810 return result 811 812 # ============================================================================ 813 # FILE OPERATIONS & PAYLOAD GENERATION 814 # ============================================================================ 815 816 @mcp.tool() 817 def create_file(filename: str, content: str, binary: bool = False) -> Dict[str, Any]: 818 """ 819 Create a file with specified content on the HexStrike server. 820 821 Args: 822 filename: Name of the file to create 823 content: Content to write to the file 824 binary: Whether the content is binary data 825 826 Returns: 827 File creation results 828 """ 829 data = { 830 "filename": filename, 831 "content": content, 832 "binary": binary 833 } 834 logger.info(f"๐ Creating file: {filename}") 835 result = hexstrike_client.safe_post("api/files/create", data) 836 if result.get("success"): 837 logger.info(f"โ File created successfully: {filename}") 838 else: 839 logger.error(f"โ Failed to create file: {filename}") 840 return result 841 842 @mcp.tool() 843 def modify_file(filename: str, content: str, append: bool = False) -> Dict[str, Any]: 844 """ 845 Modify an existing file on the HexStrike server. 846 847 Args: 848 filename: Name of the file to modify 849 content: Content to write or append 850 append: Whether to append to the file (True) or overwrite (False) 851 852 Returns: 853 File modification results 854 """ 855 data = { 856 "filename": filename, 857 "content": content, 858 "append": append 859 } 860 logger.info(f"โ๏ธ Modifying file: {filename}") 861 result = hexstrike_client.safe_post("api/files/modify", data) 862 if result.get("success"): 863 logger.info(f"โ File modified successfully: {filename}") 864 else: 865 logger.error(f"โ Failed to modify file: {filename}") 866 return result 867 868 @mcp.tool() 869 def delete_file(filename: str) -> Dict[str, Any]: 870 """ 871 Delete a file or directory on the HexStrike server. 872 873 Args: 874 filename: Name of the file or directory to delete 875 876 Returns: 877 File deletion results 878 """ 879 data = { 880 "filename": filename 881 } 882 logger.info(f"๐๏ธ Deleting file: {filename}") 883 result = hexstrike_client.safe_post("api/files/delete", data) 884 if result.get("success"): 885 logger.info(f"โ File deleted successfully: {filename}") 886 else: 887 logger.error(f"โ Failed to delete file: {filename}") 888 return result 889 890 @mcp.tool() 891 def list_files(directory: str = ".") -> Dict[str, Any]: 892 """ 893 List files in a directory on the HexStrike server. 894 895 Args: 896 directory: Directory to list (relative to server's base directory) 897 898 Returns: 899 Directory listing results 900 """ 901 logger.info(f"๐ Listing files in directory: {directory}") 902 result = hexstrike_client.safe_get("api/files/list", {"directory": directory}) 903 if result.get("success"): 904 file_count = len(result.get("files", [])) 905 logger.info(f"โ Listed {file_count} files in {directory}") 906 else: 907 logger.error(f"โ Failed to list files in {directory}") 908 return result 909 910 @mcp.tool() 911 def generate_payload(payload_type: str = "buffer", size: int = 1024, pattern: str = "A", filename: str = "") -> Dict[str, Any]: 912 """ 913 Generate large payloads for testing and exploitation. 914 915 Args: 916 payload_type: Type of payload (buffer, cyclic, random) 917 size: Size of the payload in bytes 918 pattern: Pattern to use for buffer payloads 919 filename: Custom filename (auto-generated if empty) 920 921 Returns: 922 Payload generation results 923 """ 924 data = { 925 "type": payload_type, 926 "size": size, 927 "pattern": pattern 928 } 929 if filename: 930 data["filename"] = filename 931 932 logger.info(f"๐ฏ Generating {payload_type} payload: {size} bytes") 933 result = hexstrike_client.safe_post("api/payloads/generate", data) 934 if result.get("success"): 935 logger.info(f"โ Payload generated successfully") 936 else: 937 logger.error(f"โ Failed to generate payload") 938 return result 939 940 # ============================================================================ 941 # PYTHON ENVIRONMENT MANAGEMENT 942 # ============================================================================ 943 944 @mcp.tool() 945 def install_python_package(package: str, env_name: str = "default") -> Dict[str, Any]: 946 """ 947 Install a Python package in a virtual environment on the HexStrike server. 948 949 Args: 950 package: Name of the Python package to install 951 env_name: Name of the virtual environment 952 953 Returns: 954 Package installation results 955 """ 956 data = { 957 "package": package, 958 "env_name": env_name 959 } 960 logger.info(f"๐ฆ Installing Python package: {package} in env {env_name}") 961 result = hexstrike_client.safe_post("api/python/install", data) 962 if result.get("success"): 963 logger.info(f"โ Package {package} installed successfully") 964 else: 965 logger.error(f"โ Failed to install package {package}") 966 return result 967 968 @mcp.tool() 969 def execute_python_script(script: str, env_name: str = "default", filename: str = "") -> Dict[str, Any]: 970 """ 971 Execute a Python script in a virtual environment on the HexStrike server. 972 973 Args: 974 script: Python script content to execute 975 env_name: Name of the virtual environment 976 filename: Custom script filename (auto-generated if empty) 977 978 Returns: 979 Script execution results 980 """ 981 data = { 982 "script": script, 983 "env_name": env_name 984 } 985 if filename: 986 data["filename"] = filename 987 988 logger.info(f"๐ Executing Python script in env {env_name}") 989 result = hexstrike_client.safe_post("api/python/execute", data) 990 if result.get("success"): 991 logger.info(f"โ Python script executed successfully") 992 else: 993 logger.error(f"โ Python script execution failed") 994 return result 995 996 # ============================================================================ 997 # ADDITIONAL SECURITY TOOLS FROM ORIGINAL IMPLEMENTATION 998 # ============================================================================ 999 1000 @mcp.tool() 1001 def dirb_scan(url: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", additional_args: str = "") -> Dict[str, Any]: 1002 """ 1003 Execute Dirb for directory brute forcing with enhanced logging. 1004 1005 Args: 1006 url: The target URL 1007 wordlist: Path to wordlist file 1008 additional_args: Additional Dirb arguments 1009 1010 Returns: 1011 Scan results with enhanced telemetry 1012 """ 1013 data = { 1014 "url": url, 1015 "wordlist": wordlist, 1016 "additional_args": additional_args 1017 } 1018 logger.info(f"๐ Starting Dirb scan: {url}") 1019 result = hexstrike_client.safe_post("api/tools/dirb", data) 1020 if result.get("success"): 1021 logger.info(f"โ Dirb scan completed for {url}") 1022 else: 1023 logger.error(f"โ Dirb scan failed for {url}") 1024 return result 1025 1026 @mcp.tool() 1027 def nikto_scan(target: str, additional_args: str = "") -> Dict[str, Any]: 1028 """ 1029 Execute Nikto web vulnerability scanner with enhanced logging. 1030 1031 Args: 1032 target: The target URL or IP 1033 additional_args: Additional Nikto arguments 1034 1035 Returns: 1036 Scan results with discovered vulnerabilities 1037 """ 1038 data = { 1039 "target": target, 1040 "additional_args": additional_args 1041 } 1042 logger.info(f"๐ฌ Starting Nikto scan: {target}") 1043 result = hexstrike_client.safe_post("api/tools/nikto", data) 1044 if result.get("success"): 1045 logger.info(f"โ Nikto scan completed for {target}") 1046 else: 1047 logger.error(f"โ Nikto scan failed for {target}") 1048 return result 1049 1050 @mcp.tool() 1051 def sqlmap_scan(url: str, data: str = "", additional_args: str = "") -> Dict[str, Any]: 1052 """ 1053 Execute SQLMap for SQL injection testing with enhanced logging. 1054 1055 Args: 1056 url: The target URL 1057 data: POST data for testing 1058 additional_args: Additional SQLMap arguments 1059 1060 Returns: 1061 SQL injection test results 1062 """ 1063 data_payload = { 1064 "url": url, 1065 "data": data, 1066 "additional_args": additional_args 1067 } 1068 logger.info(f"๐ Starting SQLMap scan: {url}") 1069 result = hexstrike_client.safe_post("api/tools/sqlmap", data_payload) 1070 if result.get("success"): 1071 logger.info(f"โ SQLMap scan completed for {url}") 1072 else: 1073 logger.error(f"โ SQLMap scan failed for {url}") 1074 return result 1075 1076 @mcp.tool() 1077 def metasploit_run(module: str, options: Dict[str, Any] = {}) -> Dict[str, Any]: 1078 """ 1079 Execute a Metasploit module with enhanced logging. 1080 1081 Args: 1082 module: The Metasploit module to use 1083 options: Dictionary of module options 1084 1085 Returns: 1086 Metasploit execution results 1087 """ 1088 data = { 1089 "module": module, 1090 "options": options 1091 } 1092 logger.info(f"๐ Starting Metasploit module: {module}") 1093 result = hexstrike_client.safe_post("api/tools/metasploit", data) 1094 if result.get("success"): 1095 logger.info(f"โ Metasploit module completed: {module}") 1096 else: 1097 logger.error(f"โ Metasploit module failed: {module}") 1098 return result 1099 1100 @mcp.tool() 1101 def hydra_attack( 1102 target: str, 1103 service: str, 1104 username: str = "", 1105 username_file: str = "", 1106 password: str = "", 1107 password_file: str = "", 1108 additional_args: str = "" 1109 ) -> Dict[str, Any]: 1110 """ 1111 Execute Hydra for password brute forcing with enhanced logging. 1112 1113 Args: 1114 target: The target IP or hostname 1115 service: The service to attack (ssh, ftp, http, etc.) 1116 username: Single username to test 1117 username_file: File containing usernames 1118 password: Single password to test 1119 password_file: File containing passwords 1120 additional_args: Additional Hydra arguments 1121 1122 Returns: 1123 Brute force attack results 1124 """ 1125 data = { 1126 "target": target, 1127 "service": service, 1128 "username": username, 1129 "username_file": username_file, 1130 "password": password, 1131 "password_file": password_file, 1132 "additional_args": additional_args 1133 } 1134 logger.info(f"๐ Starting Hydra attack: {target}:{service}") 1135 result = hexstrike_client.safe_post("api/tools/hydra", data) 1136 if result.get("success"): 1137 logger.info(f"โ Hydra attack completed for {target}") 1138 else: 1139 logger.error(f"โ Hydra attack failed for {target}") 1140 return result 1141 1142 @mcp.tool() 1143 def john_crack( 1144 hash_file: str, 1145 wordlist: str = "/usr/share/wordlists/rockyou.txt", 1146 format_type: str = "", 1147 additional_args: str = "" 1148 ) -> Dict[str, Any]: 1149 """ 1150 Execute John the Ripper for password cracking with enhanced logging. 1151 1152 Args: 1153 hash_file: File containing password hashes 1154 wordlist: Wordlist file to use 1155 format_type: Hash format type 1156 additional_args: Additional John arguments 1157 1158 Returns: 1159 Password cracking results 1160 """ 1161 data = { 1162 "hash_file": hash_file, 1163 "wordlist": wordlist, 1164 "format": format_type, 1165 "additional_args": additional_args 1166 } 1167 logger.info(f"๐ Starting John the Ripper: {hash_file}") 1168 result = hexstrike_client.safe_post("api/tools/john", data) 1169 if result.get("success"): 1170 logger.info(f"โ John the Ripper completed") 1171 else: 1172 logger.error(f"โ John the Ripper failed") 1173 return result 1174 1175 @mcp.tool() 1176 def wpscan_analyze(url: str, additional_args: str = "") -> Dict[str, Any]: 1177 """ 1178 Execute WPScan for WordPress vulnerability scanning with enhanced logging. 1179 1180 Args: 1181 url: The WordPress site URL 1182 additional_args: Additional WPScan arguments 1183 1184 Returns: 1185 WordPress vulnerability scan results 1186 """ 1187 data = { 1188 "url": url, 1189 "additional_args": additional_args 1190 } 1191 logger.info(f"๐ Starting WPScan: {url}") 1192 result = hexstrike_client.safe_post("api/tools/wpscan", data) 1193 if result.get("success"): 1194 logger.info(f"โ WPScan completed for {url}") 1195 else: 1196 logger.error(f"โ WPScan failed for {url}") 1197 return result 1198 1199 @mcp.tool() 1200 def enum4linux_scan(target: str, additional_args: str = "-a") -> Dict[str, Any]: 1201 """ 1202 Execute Enum4linux for SMB enumeration with enhanced logging. 1203 1204 Args: 1205 target: The target IP address 1206 additional_args: Additional Enum4linux arguments 1207 1208 Returns: 1209 SMB enumeration results 1210 """ 1211 data = { 1212 "target": target, 1213 "additional_args": additional_args 1214 } 1215 logger.info(f"๐ Starting Enum4linux: {target}") 1216 result = hexstrike_client.safe_post("api/tools/enum4linux", data) 1217 if result.get("success"): 1218 logger.info(f"โ Enum4linux completed for {target}") 1219 else: 1220 logger.error(f"โ Enum4linux failed for {target}") 1221 return result 1222 1223 @mcp.tool() 1224 def ffuf_scan(url: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", mode: str = "directory", match_codes: str = "200,204,301,302,307,401,403", additional_args: str = "") -> Dict[str, Any]: 1225 """ 1226 Execute FFuf for web fuzzing with enhanced logging. 1227 1228 Args: 1229 url: The target URL 1230 wordlist: Wordlist file to use 1231 mode: Fuzzing mode (directory, vhost, parameter) 1232 match_codes: HTTP status codes to match 1233 additional_args: Additional FFuf arguments 1234 1235 Returns: 1236 Web fuzzing results 1237 """ 1238 data = { 1239 "url": url, 1240 "wordlist": wordlist, 1241 "mode": mode, 1242 "match_codes": match_codes, 1243 "additional_args": additional_args 1244 } 1245 logger.info(f"๐ Starting FFuf {mode} fuzzing: {url}") 1246 result = hexstrike_client.safe_post("api/tools/ffuf", data) 1247 if result.get("success"): 1248 logger.info(f"โ FFuf fuzzing completed for {url}") 1249 else: 1250 logger.error(f"โ FFuf fuzzing failed for {url}") 1251 return result 1252 1253 @mcp.tool() 1254 def netexec_scan(target: str, protocol: str = "smb", username: str = "", password: str = "", hash_value: str = "", module: str = "", additional_args: str = "") -> Dict[str, Any]: 1255 """ 1256 Execute NetExec (formerly CrackMapExec) for network enumeration with enhanced logging. 1257 1258 Args: 1259 target: The target IP or network 1260 protocol: Protocol to use (smb, ssh, winrm, etc.) 1261 username: Username for authentication 1262 password: Password for authentication 1263 hash_value: Hash for pass-the-hash attacks 1264 module: NetExec module to execute 1265 additional_args: Additional NetExec arguments 1266 1267 Returns: 1268 Network enumeration results 1269 """ 1270 data = { 1271 "target": target, 1272 "protocol": protocol, 1273 "username": username, 1274 "password": password, 1275 "hash": hash_value, 1276 "module": module, 1277 "additional_args": additional_args 1278 } 1279 logger.info(f"๐ Starting NetExec {protocol} scan: {target}") 1280 result = hexstrike_client.safe_post("api/tools/netexec", data) 1281 if result.get("success"): 1282 logger.info(f"โ NetExec scan completed for {target}") 1283 else: 1284 logger.error(f"โ NetExec scan failed for {target}") 1285 return result 1286 1287 @mcp.tool() 1288 def amass_scan(domain: str, mode: str = "enum", additional_args: str = "") -> Dict[str, Any]: 1289 """ 1290 Execute Amass for subdomain enumeration with enhanced logging. 1291 1292 Args: 1293 domain: The target domain 1294 mode: Amass mode (enum, intel, viz) 1295 additional_args: Additional Amass arguments 1296 1297 Returns: 1298 Subdomain enumeration results 1299 """ 1300 data = { 1301 "domain": domain, 1302 "mode": mode, 1303 "additional_args": additional_args 1304 } 1305 logger.info(f"๐ Starting Amass {mode}: {domain}") 1306 result = hexstrike_client.safe_post("api/tools/amass", data) 1307 if result.get("success"): 1308 logger.info(f"โ Amass completed for {domain}") 1309 else: 1310 logger.error(f"โ Amass failed for {domain}") 1311 return result 1312 1313 @mcp.tool() 1314 def hashcat_crack(hash_file: str, hash_type: str, attack_mode: str = "0", wordlist: str = "/usr/share/wordlists/rockyou.txt", mask: str = "", additional_args: str = "") -> Dict[str, Any]: 1315 """ 1316 Execute Hashcat for advanced password cracking with enhanced logging. 1317 1318 Args: 1319 hash_file: File containing password hashes 1320 hash_type: Hash type number for Hashcat 1321 attack_mode: Attack mode (0=dict, 1=combo, 3=mask, etc.) 1322 wordlist: Wordlist file for dictionary attacks 1323 mask: Mask for mask attacks 1324 additional_args: Additional Hashcat arguments 1325 1326 Returns: 1327 Password cracking results 1328 """ 1329 data = { 1330 "hash_file": hash_file, 1331 "hash_type": hash_type, 1332 "attack_mode": attack_mode, 1333 "wordlist": wordlist, 1334 "mask": mask, 1335 "additional_args": additional_args 1336 } 1337 logger.info(f"๐ Starting Hashcat attack: mode {attack_mode}") 1338 result = hexstrike_client.safe_post("api/tools/hashcat", data) 1339 if result.get("success"): 1340 logger.info(f"โ Hashcat attack completed") 1341 else: 1342 logger.error(f"โ Hashcat attack failed") 1343 return result 1344 1345 @mcp.tool() 1346 def subfinder_scan(domain: str, silent: bool = True, all_sources: bool = False, additional_args: str = "") -> Dict[str, Any]: 1347 """ 1348 Execute Subfinder for passive subdomain enumeration with enhanced logging. 1349 1350 Args: 1351 domain: The target domain 1352 silent: Run in silent mode 1353 all_sources: Use all sources 1354 additional_args: Additional Subfinder arguments 1355 1356 Returns: 1357 Passive subdomain enumeration results 1358 """ 1359 data = { 1360 "domain": domain, 1361 "silent": silent, 1362 "all_sources": all_sources, 1363 "additional_args": additional_args 1364 } 1365 logger.info(f"๐ Starting Subfinder: {domain}") 1366 result = hexstrike_client.safe_post("api/tools/subfinder", data) 1367 if result.get("success"): 1368 logger.info(f"โ Subfinder completed for {domain}") 1369 else: 1370 logger.error(f"โ Subfinder failed for {domain}") 1371 return result 1372 1373 @mcp.tool() 1374 def smbmap_scan(target: str, username: str = "", password: str = "", domain: str = "", additional_args: str = "") -> Dict[str, Any]: 1375 """ 1376 Execute SMBMap for SMB share enumeration with enhanced logging. 1377 1378 Args: 1379 target: The target IP address 1380 username: Username for authentication 1381 password: Password for authentication 1382 domain: Domain for authentication 1383 additional_args: Additional SMBMap arguments 1384 1385 Returns: 1386 SMB share enumeration results 1387 """ 1388 data = { 1389 "target": target, 1390 "username": username, 1391 "password": password, 1392 "domain": domain, 1393 "additional_args": additional_args 1394 } 1395 logger.info(f"๐ Starting SMBMap: {target}") 1396 result = hexstrike_client.safe_post("api/tools/smbmap", data) 1397 if result.get("success"): 1398 logger.info(f"โ SMBMap completed for {target}") 1399 else: 1400 logger.error(f"โ SMBMap failed for {target}") 1401 return result 1402 1403 # ============================================================================ 1404 # ENHANCED NETWORK PENETRATION TESTING TOOLS (v6.0) 1405 # ============================================================================ 1406 1407 @mcp.tool() 1408 def rustscan_fast_scan(target: str, ports: str = "", ulimit: int = 5000, 1409 batch_size: int = 4500, timeout: int = 1500, 1410 scripts: bool = False, additional_args: str = "") -> Dict[str, Any]: 1411 """ 1412 Execute Rustscan for ultra-fast port scanning with enhanced logging. 1413 1414 Args: 1415 target: The target IP address or hostname 1416 ports: Specific ports to scan (e.g., "22,80,443") 1417 ulimit: File descriptor limit 1418 batch_size: Batch size for scanning 1419 timeout: Timeout in milliseconds 1420 scripts: Run Nmap scripts on discovered ports 1421 additional_args: Additional Rustscan arguments 1422 1423 Returns: 1424 Ultra-fast port scanning results 1425 """ 1426 data = { 1427 "target": target, 1428 "ports": ports, 1429 "ulimit": ulimit, 1430 "batch_size": batch_size, 1431 "timeout": timeout, 1432 "scripts": scripts, 1433 "additional_args": additional_args 1434 } 1435 logger.info(f"โก Starting Rustscan: {target}") 1436 result = hexstrike_client.safe_post("api/tools/rustscan", data) 1437 if result.get("success"): 1438 logger.info(f"โ Rustscan completed for {target}") 1439 else: 1440 logger.error(f"โ Rustscan failed for {target}") 1441 return result 1442 1443 @mcp.tool() 1444 def masscan_high_speed(target: str, ports: str = "1-65535", rate: int = 1000, 1445 interface: str = "", router_mac: str = "", source_ip: str = "", 1446 banners: bool = False, additional_args: str = "") -> Dict[str, Any]: 1447 """ 1448 Execute Masscan for high-speed Internet-scale port scanning with intelligent rate limiting. 1449 1450 Args: 1451 target: The target IP address or CIDR range 1452 ports: Port range to scan 1453 rate: Packets per second rate 1454 interface: Network interface to use 1455 router_mac: Router MAC address 1456 source_ip: Source IP address 1457 banners: Enable banner grabbing 1458 additional_args: Additional Masscan arguments 1459 1460 Returns: 1461 High-speed port scanning results with intelligent rate limiting 1462 """ 1463 data = { 1464 "target": target, 1465 "ports": ports, 1466 "rate": rate, 1467 "interface": interface, 1468 "router_mac": router_mac, 1469 "source_ip": source_ip, 1470 "banners": banners, 1471 "additional_args": additional_args 1472 } 1473 logger.info(f"๐ Starting Masscan: {target} at rate {rate}") 1474 result = hexstrike_client.safe_post("api/tools/masscan", data) 1475 if result.get("success"): 1476 logger.info(f"โ Masscan completed for {target}") 1477 else: 1478 logger.error(f"โ Masscan failed for {target}") 1479 return result 1480 1481 @mcp.tool() 1482 def nmap_advanced_scan(target: str, scan_type: str = "-sS", ports: str = "", 1483 timing: str = "T4", nse_scripts: str = "", os_detection: bool = False, 1484 version_detection: bool = False, aggressive: bool = False, 1485 stealth: bool = False, additional_args: str = "") -> Dict[str, Any]: 1486 """ 1487 Execute advanced Nmap scans with custom NSE scripts and optimized timing. 1488 1489 Args: 1490 target: The target IP address or hostname 1491 scan_type: Nmap scan type (e.g., -sS, -sT, -sU) 1492 ports: Specific ports to scan 1493 timing: Timing template (T0-T5) 1494 nse_scripts: Custom NSE scripts to run 1495 os_detection: Enable OS detection 1496 version_detection: Enable version detection 1497 aggressive: Enable aggressive scanning 1498 stealth: Enable stealth mode 1499 additional_args: Additional Nmap arguments 1500 1501 Returns: 1502 Advanced Nmap scanning results with custom NSE scripts 1503 """ 1504 data = { 1505 "target": target, 1506 "scan_type": scan_type, 1507 "ports": ports, 1508 "timing": timing, 1509 "nse_scripts": nse_scripts, 1510 "os_detection": os_detection, 1511 "version_detection": version_detection, 1512 "aggressive": aggressive, 1513 "stealth": stealth, 1514 "additional_args": additional_args 1515 } 1516 logger.info(f"๐ Starting Advanced Nmap: {target}") 1517 result = hexstrike_client.safe_post("api/tools/nmap-advanced", data) 1518 if result.get("success"): 1519 logger.info(f"โ Advanced Nmap completed for {target}") 1520 else: 1521 logger.error(f"โ Advanced Nmap failed for {target}") 1522 return result 1523 1524 @mcp.tool() 1525 def autorecon_comprehensive(target: str, output_dir: str = "/tmp/autorecon", 1526 port_scans: str = "top-100-ports", service_scans: str = "default", 1527 heartbeat: int = 60, timeout: int = 300, 1528 additional_args: str = "") -> Dict[str, Any]: 1529 """ 1530 Execute AutoRecon for comprehensive automated reconnaissance. 1531 1532 Args: 1533 target: The target IP address or hostname 1534 output_dir: Output directory for results 1535 port_scans: Port scan configuration 1536 service_scans: Service scan configuration 1537 heartbeat: Heartbeat interval in seconds 1538 timeout: Timeout for individual scans 1539 additional_args: Additional AutoRecon arguments 1540 1541 Returns: 1542 Comprehensive automated reconnaissance results 1543 """ 1544 data = { 1545 "target": target, 1546 "output_dir": output_dir, 1547 "port_scans": port_scans, 1548 "service_scans": service_scans, 1549 "heartbeat": heartbeat, 1550 "timeout": timeout, 1551 "additional_args": additional_args 1552 } 1553 logger.info(f"๐ Starting AutoRecon: {target}") 1554 result = hexstrike_client.safe_post("api/tools/autorecon", data) 1555 if result.get("success"): 1556 logger.info(f"โ AutoRecon completed for {target}") 1557 else: 1558 logger.error(f"โ AutoRecon failed for {target}") 1559 return result 1560 1561 @mcp.tool() 1562 def enum4linux_ng_advanced(target: str, username: str = "", password: str = "", 1563 domain: str = "", shares: bool = True, users: bool = True, 1564 groups: bool = True, policy: bool = True, 1565 additional_args: str = "") -> Dict[str, Any]: 1566 """ 1567 Execute Enum4linux-ng for advanced SMB enumeration with enhanced logging. 1568 1569 Args: 1570 target: The target IP address 1571 username: Username for authentication 1572 password: Password for authentication 1573 domain: Domain for authentication 1574 shares: Enumerate shares 1575 users: Enumerate users 1576 groups: Enumerate groups 1577 policy: Enumerate policies 1578 additional_args: Additional Enum4linux-ng arguments 1579 1580 Returns: 1581 Advanced SMB enumeration results 1582 """ 1583 data = { 1584 "target": target, 1585 "username": username, 1586 "password": password, 1587 "domain": domain, 1588 "shares": shares, 1589 "users": users, 1590 "groups": groups, 1591 "policy": policy, 1592 "additional_args": additional_args 1593 } 1594 logger.info(f"๐ Starting Enum4linux-ng: {target}") 1595 result = hexstrike_client.safe_post("api/tools/enum4linux-ng", data) 1596 if result.get("success"): 1597 logger.info(f"โ Enum4linux-ng completed for {target}") 1598 else: 1599 logger.error(f"โ Enum4linux-ng failed for {target}") 1600 return result 1601 1602 @mcp.tool() 1603 def rpcclient_enumeration(target: str, username: str = "", password: str = "", 1604 domain: str = "", commands: str = "enumdomusers;enumdomgroups;querydominfo", 1605 additional_args: str = "") -> Dict[str, Any]: 1606 """ 1607 Execute rpcclient for RPC enumeration with enhanced logging. 1608 1609 Args: 1610 target: The target IP address 1611 username: Username for authentication 1612 password: Password for authentication 1613 domain: Domain for authentication 1614 commands: Semicolon-separated RPC commands 1615 additional_args: Additional rpcclient arguments 1616 1617 Returns: 1618 RPC enumeration results 1619 """ 1620 data = { 1621 "target": target, 1622 "username": username, 1623 "password": password, 1624 "domain": domain, 1625 "commands": commands, 1626 "additional_args": additional_args 1627 } 1628 logger.info(f"๐ Starting rpcclient: {target}") 1629 result = hexstrike_client.safe_post("api/tools/rpcclient", data) 1630 if result.get("success"): 1631 logger.info(f"โ rpcclient completed for {target}") 1632 else: 1633 logger.error(f"โ rpcclient failed for {target}") 1634 return result 1635 1636 @mcp.tool() 1637 def nbtscan_netbios(target: str, verbose: bool = False, timeout: int = 2, 1638 additional_args: str = "") -> Dict[str, Any]: 1639 """ 1640 Execute nbtscan for NetBIOS name scanning with enhanced logging. 1641 1642 Args: 1643 target: The target IP address or range 1644 verbose: Enable verbose output 1645 timeout: Timeout in seconds 1646 additional_args: Additional nbtscan arguments 1647 1648 Returns: 1649 NetBIOS name scanning results 1650 """ 1651 data = { 1652 "target": target, 1653 "verbose": verbose, 1654 "timeout": timeout, 1655 "additional_args": additional_args 1656 } 1657 logger.info(f"๐ Starting nbtscan: {target}") 1658 result = hexstrike_client.safe_post("api/tools/nbtscan", data) 1659 if result.get("success"): 1660 logger.info(f"โ nbtscan completed for {target}") 1661 else: 1662 logger.error(f"โ nbtscan failed for {target}") 1663 return result 1664 1665 @mcp.tool() 1666 def arp_scan_discovery(target: str = "", interface: str = "", local_network: bool = False, 1667 timeout: int = 500, retry: int = 3, additional_args: str = "") -> Dict[str, Any]: 1668 """ 1669 Execute arp-scan for network discovery with enhanced logging. 1670 1671 Args: 1672 target: The target IP range (if not using local_network) 1673 interface: Network interface to use 1674 local_network: Scan local network 1675 timeout: Timeout in milliseconds 1676 retry: Number of retries 1677 additional_args: Additional arp-scan arguments 1678 1679 Returns: 1680 Network discovery results via ARP scanning 1681 """ 1682 data = { 1683 "target": target, 1684 "interface": interface, 1685 "local_network": local_network, 1686 "timeout": timeout, 1687 "retry": retry, 1688 "additional_args": additional_args 1689 } 1690 logger.info(f"๐ Starting arp-scan: {target if target else 'local network'}") 1691 result = hexstrike_client.safe_post("api/tools/arp-scan", data) 1692 if result.get("success"): 1693 logger.info(f"โ arp-scan completed") 1694 else: 1695 logger.error(f"โ arp-scan failed") 1696 return result 1697 1698 @mcp.tool() 1699 def responder_credential_harvest(interface: str = "eth0", analyze: bool = False, 1700 wpad: bool = True, force_wpad_auth: bool = False, 1701 fingerprint: bool = False, duration: int = 300, 1702 additional_args: str = "") -> Dict[str, Any]: 1703 """ 1704 Execute Responder for credential harvesting with enhanced logging. 1705 1706 Args: 1707 interface: Network interface to use 1708 analyze: Analyze mode only 1709 wpad: Enable WPAD rogue proxy 1710 force_wpad_auth: Force WPAD authentication 1711 fingerprint: Fingerprint mode 1712 duration: Duration to run in seconds 1713 additional_args: Additional Responder arguments 1714 1715 Returns: 1716 Credential harvesting results 1717 """ 1718 data = { 1719 "interface": interface, 1720 "analyze": analyze, 1721 "wpad": wpad, 1722 "force_wpad_auth": force_wpad_auth, 1723 "fingerprint": fingerprint, 1724 "duration": duration, 1725 "additional_args": additional_args 1726 } 1727 logger.info(f"๐ Starting Responder on interface: {interface}") 1728 result = hexstrike_client.safe_post("api/tools/responder", data) 1729 if result.get("success"): 1730 logger.info(f"โ Responder completed") 1731 else: 1732 logger.error(f"โ Responder failed") 1733 return result 1734 1735 @mcp.tool() 1736 def volatility_analyze(memory_file: str, plugin: str, profile: str = "", additional_args: str = "") -> Dict[str, Any]: 1737 """ 1738 Execute Volatility for memory forensics analysis with enhanced logging. 1739 1740 Args: 1741 memory_file: Path to memory dump file 1742 plugin: Volatility plugin to use 1743 profile: Memory profile to use 1744 additional_args: Additional Volatility arguments 1745 1746 Returns: 1747 Memory forensics analysis results 1748 """ 1749 data = { 1750 "memory_file": memory_file, 1751 "plugin": plugin, 1752 "profile": profile, 1753 "additional_args": additional_args 1754 } 1755 logger.info(f"๐ง Starting Volatility analysis: {plugin}") 1756 result = hexstrike_client.safe_post("api/tools/volatility", data) 1757 if result.get("success"): 1758 logger.info(f"โ Volatility analysis completed") 1759 else: 1760 logger.error(f"โ Volatility analysis failed") 1761 return result 1762 1763 @mcp.tool() 1764 def msfvenom_generate(payload: str, format_type: str = "", output_file: str = "", encoder: str = "", iterations: str = "", additional_args: str = "") -> Dict[str, Any]: 1765 """ 1766 Execute MSFVenom for payload generation with enhanced logging. 1767 1768 Args: 1769 payload: The payload to generate 1770 format_type: Output format (exe, elf, raw, etc.) 1771 output_file: Output file path 1772 encoder: Encoder to use 1773 iterations: Number of encoding iterations 1774 additional_args: Additional MSFVenom arguments 1775 1776 Returns: 1777 Payload generation results 1778 """ 1779 data = { 1780 "payload": payload, 1781 "format": format_type, 1782 "output_file": output_file, 1783 "encoder": encoder, 1784 "iterations": iterations, 1785 "additional_args": additional_args 1786 } 1787 logger.info(f"๐ Starting MSFVenom payload generation: {payload}") 1788 result = hexstrike_client.safe_post("api/tools/msfvenom", data) 1789 if result.get("success"): 1790 logger.info(f"โ MSFVenom payload generated") 1791 else: 1792 logger.error(f"โ MSFVenom payload generation failed") 1793 return result 1794 1795 # ============================================================================ 1796 # BINARY ANALYSIS & REVERSE ENGINEERING TOOLS 1797 # ============================================================================ 1798 1799 @mcp.tool() 1800 def gdb_analyze(binary: str, commands: str = "", script_file: str = "", additional_args: str = "") -> Dict[str, Any]: 1801 """ 1802 Execute GDB for binary analysis and debugging with enhanced logging. 1803 1804 Args: 1805 binary: Path to the binary file 1806 commands: GDB commands to execute 1807 script_file: Path to GDB script file 1808 additional_args: Additional GDB arguments 1809 1810 Returns: 1811 Binary analysis results 1812 """ 1813 data = { 1814 "binary": binary, 1815 "commands": commands, 1816 "script_file": script_file, 1817 "additional_args": additional_args 1818 } 1819 logger.info(f"๐ง Starting GDB analysis: {binary}") 1820 result = hexstrike_client.safe_post("api/tools/gdb", data) 1821 if result.get("success"): 1822 logger.info(f"โ GDB analysis completed for {binary}") 1823 else: 1824 logger.error(f"โ GDB analysis failed for {binary}") 1825 return result 1826 1827 @mcp.tool() 1828 def radare2_analyze(binary: str, commands: str = "", additional_args: str = "") -> Dict[str, Any]: 1829 """ 1830 Execute Radare2 for binary analysis and reverse engineering with enhanced logging. 1831 1832 Args: 1833 binary: Path to the binary file 1834 commands: Radare2 commands to execute 1835 additional_args: Additional Radare2 arguments 1836 1837 Returns: 1838 Binary analysis results 1839 """ 1840 data = { 1841 "binary": binary, 1842 "commands": commands, 1843 "additional_args": additional_args 1844 } 1845 logger.info(f"๐ง Starting Radare2 analysis: {binary}") 1846 result = hexstrike_client.safe_post("api/tools/radare2", data) 1847 if result.get("success"): 1848 logger.info(f"โ Radare2 analysis completed for {binary}") 1849 else: 1850 logger.error(f"โ Radare2 analysis failed for {binary}") 1851 return result 1852 1853 @mcp.tool() 1854 def binwalk_analyze(file_path: str, extract: bool = False, additional_args: str = "") -> Dict[str, Any]: 1855 """ 1856 Execute Binwalk for firmware and file analysis with enhanced logging. 1857 1858 Args: 1859 file_path: Path to the file to analyze 1860 extract: Whether to extract discovered files 1861 additional_args: Additional Binwalk arguments 1862 1863 Returns: 1864 Firmware analysis results 1865 """ 1866 data = { 1867 "file_path": file_path, 1868 "extract": extract, 1869 "additional_args": additional_args 1870 } 1871 logger.info(f"๐ง Starting Binwalk analysis: {file_path}") 1872 result = hexstrike_client.safe_post("api/tools/binwalk", data) 1873 if result.get("success"): 1874 logger.info(f"โ Binwalk analysis completed for {file_path}") 1875 else: 1876 logger.error(f"โ Binwalk analysis failed for {file_path}") 1877 return result 1878 1879 @mcp.tool() 1880 def ropgadget_search(binary: str, gadget_type: str = "", additional_args: str = "") -> Dict[str, Any]: 1881 """ 1882 Search for ROP gadgets in a binary using ROPgadget with enhanced logging. 1883 1884 Args: 1885 binary: Path to the binary file 1886 gadget_type: Type of gadgets to search for 1887 additional_args: Additional ROPgadget arguments 1888 1889 Returns: 1890 ROP gadget search results 1891 """ 1892 data = { 1893 "binary": binary, 1894 "gadget_type": gadget_type, 1895 "additional_args": additional_args 1896 } 1897 logger.info(f"๐ง Starting ROPgadget search: {binary}") 1898 result = hexstrike_client.safe_post("api/tools/ropgadget", data) 1899 if result.get("success"): 1900 logger.info(f"โ ROPgadget search completed for {binary}") 1901 else: 1902 logger.error(f"โ ROPgadget search failed for {binary}") 1903 return result 1904 1905 @mcp.tool() 1906 def checksec_analyze(binary: str) -> Dict[str, Any]: 1907 """ 1908 Check security features of a binary with enhanced logging. 1909 1910 Args: 1911 binary: Path to the binary file 1912 1913 Returns: 1914 Security features analysis results 1915 """ 1916 data = { 1917 "binary": binary 1918 } 1919 logger.info(f"๐ง Starting Checksec analysis: {binary}") 1920 result = hexstrike_client.safe_post("api/tools/checksec", data) 1921 if result.get("success"): 1922 logger.info(f"โ Checksec analysis completed for {binary}") 1923 else: 1924 logger.error(f"โ Checksec analysis failed for {binary}") 1925 return result 1926 1927 @mcp.tool() 1928 def xxd_hexdump(file_path: str, offset: str = "0", length: str = "", additional_args: str = "") -> Dict[str, Any]: 1929 """ 1930 Create a hex dump of a file using xxd with enhanced logging. 1931 1932 Args: 1933 file_path: Path to the file 1934 offset: Offset to start reading from 1935 length: Number of bytes to read 1936 additional_args: Additional xxd arguments 1937 1938 Returns: 1939 Hex dump results 1940 """ 1941 data = { 1942 "file_path": file_path, 1943 "offset": offset, 1944 "length": length, 1945 "additional_args": additional_args 1946 } 1947 logger.info(f"๐ง Starting XXD hex dump: {file_path}") 1948 result = hexstrike_client.safe_post("api/tools/xxd", data) 1949 if result.get("success"): 1950 logger.info(f"โ XXD hex dump completed for {file_path}") 1951 else: 1952 logger.error(f"โ XXD hex dump failed for {file_path}") 1953 return result 1954 1955 @mcp.tool() 1956 def strings_extract(file_path: str, min_len: int = 4, additional_args: str = "") -> Dict[str, Any]: 1957 """ 1958 Extract strings from a binary file with enhanced logging. 1959 1960 Args: 1961 file_path: Path to the file 1962 min_len: Minimum string length 1963 additional_args: Additional strings arguments 1964 1965 Returns: 1966 String extraction results 1967 """ 1968 data = { 1969 "file_path": file_path, 1970 "min_len": min_len, 1971 "additional_args": additional_args 1972 } 1973 logger.info(f"๐ง Starting Strings extraction: {file_path}") 1974 result = hexstrike_client.safe_post("api/tools/strings", data) 1975 if result.get("success"): 1976 logger.info(f"โ Strings extraction completed for {file_path}") 1977 else: 1978 logger.error(f"โ Strings extraction failed for {file_path}") 1979 return result 1980 1981 @mcp.tool() 1982 def objdump_analyze(binary: str, disassemble: bool = True, additional_args: str = "") -> Dict[str, Any]: 1983 """ 1984 Analyze a binary using objdump with enhanced logging. 1985 1986 Args: 1987 binary: Path to the binary file 1988 disassemble: Whether to disassemble the binary 1989 additional_args: Additional objdump arguments 1990 1991 Returns: 1992 Binary analysis results 1993 """ 1994 data = { 1995 "binary": binary, 1996 "disassemble": disassemble, 1997 "additional_args": additional_args 1998 } 1999 logger.info(f"๐ง Starting Objdump analysis: {binary}") 2000 result = hexstrike_client.safe_post("api/tools/objdump", data) 2001 if result.get("success"): 2002 logger.info(f"โ Objdump analysis completed for {binary}") 2003 else: 2004 logger.error(f"โ Objdump analysis failed for {binary}") 2005 return result 2006 2007 # ============================================================================ 2008 # ENHANCED BINARY ANALYSIS AND EXPLOITATION FRAMEWORK (v6.0) 2009 # ============================================================================ 2010 2011 @mcp.tool() 2012 def ghidra_analysis(binary: str, project_name: str = "hexstrike_analysis", 2013 script_file: str = "", analysis_timeout: int = 300, 2014 output_format: str = "xml", additional_args: str = "") -> Dict[str, Any]: 2015 """ 2016 Execute Ghidra for advanced binary analysis and reverse engineering. 2017 2018 Args: 2019 binary: Path to the binary file 2020 project_name: Ghidra project name 2021 script_file: Custom Ghidra script to run 2022 analysis_timeout: Analysis timeout in seconds 2023 output_format: Output format (xml, json) 2024 additional_args: Additional Ghidra arguments 2025 2026 Returns: 2027 Advanced binary analysis results from Ghidra 2028 """ 2029 data = { 2030 "binary": binary, 2031 "project_name": project_name, 2032 "script_file": script_file, 2033 "analysis_timeout": analysis_timeout, 2034 "output_format": output_format, 2035 "additional_args": additional_args 2036 } 2037 logger.info(f"๐ง Starting Ghidra analysis: {binary}") 2038 result = hexstrike_client.safe_post("api/tools/ghidra", data) 2039 if result.get("success"): 2040 logger.info(f"โ Ghidra analysis completed for {binary}") 2041 else: 2042 logger.error(f"โ Ghidra analysis failed for {binary}") 2043 return result 2044 2045 @mcp.tool() 2046 def pwntools_exploit(script_content: str = "", target_binary: str = "", 2047 target_host: str = "", target_port: int = 0, 2048 exploit_type: str = "local", additional_args: str = "") -> Dict[str, Any]: 2049 """ 2050 Execute Pwntools for exploit development and automation. 2051 2052 Args: 2053 script_content: Python script content using pwntools 2054 target_binary: Local binary to exploit 2055 target_host: Remote host to connect to 2056 target_port: Remote port to connect to 2057 exploit_type: Type of exploit (local, remote, format_string, rop) 2058 additional_args: Additional arguments 2059 2060 Returns: 2061 Exploit execution results 2062 """ 2063 data = { 2064 "script_content": script_content, 2065 "target_binary": target_binary, 2066 "target_host": target_host, 2067 "target_port": target_port, 2068 "exploit_type": exploit_type, 2069 "additional_args": additional_args 2070 } 2071 logger.info(f"๐ง Starting Pwntools exploit: {exploit_type}") 2072 result = hexstrike_client.safe_post("api/tools/pwntools", data) 2073 if result.get("success"): 2074 logger.info(f"โ Pwntools exploit completed") 2075 else: 2076 logger.error(f"โ Pwntools exploit failed") 2077 return result 2078 2079 @mcp.tool() 2080 def one_gadget_search(libc_path: str, level: int = 1, additional_args: str = "") -> Dict[str, Any]: 2081 """ 2082 Execute one_gadget to find one-shot RCE gadgets in libc. 2083 2084 Args: 2085 libc_path: Path to libc binary 2086 level: Constraint level (0, 1, 2) 2087 additional_args: Additional one_gadget arguments 2088 2089 Returns: 2090 One-shot RCE gadget search results 2091 """ 2092 data = { 2093 "libc_path": libc_path, 2094 "level": level, 2095 "additional_args": additional_args 2096 } 2097 logger.info(f"๐ง Starting one_gadget analysis: {libc_path}") 2098 result = hexstrike_client.safe_post("api/tools/one-gadget", data) 2099 if result.get("success"): 2100 logger.info(f"โ one_gadget analysis completed") 2101 else: 2102 logger.error(f"โ one_gadget analysis failed") 2103 return result 2104 2105 @mcp.tool() 2106 def libc_database_lookup(action: str = "find", symbols: str = "", 2107 libc_id: str = "", additional_args: str = "") -> Dict[str, Any]: 2108 """ 2109 Execute libc-database for libc identification and offset lookup. 2110 2111 Args: 2112 action: Action to perform (find, dump, download) 2113 symbols: Symbols with offsets for find action (format: "symbol1:offset1 symbol2:offset2") 2114 libc_id: Libc ID for dump/download actions 2115 additional_args: Additional arguments 2116 2117 Returns: 2118 Libc database lookup results 2119 """ 2120 data = { 2121 "action": action, 2122 "symbols": symbols, 2123 "libc_id": libc_id, 2124 "additional_args": additional_args 2125 } 2126 logger.info(f"๐ง Starting libc-database {action}: {symbols or libc_id}") 2127 result = hexstrike_client.safe_post("api/tools/libc-database", data) 2128 if result.get("success"): 2129 logger.info(f"โ libc-database {action} completed") 2130 else: 2131 logger.error(f"โ libc-database {action} failed") 2132 return result 2133 2134 @mcp.tool() 2135 def gdb_peda_debug(binary: str = "", commands: str = "", attach_pid: int = 0, 2136 core_file: str = "", additional_args: str = "") -> Dict[str, Any]: 2137 """ 2138 Execute GDB with PEDA for enhanced debugging and exploitation. 2139 2140 Args: 2141 binary: Binary to debug 2142 commands: GDB commands to execute 2143 attach_pid: Process ID to attach to 2144 core_file: Core dump file to analyze 2145 additional_args: Additional GDB arguments 2146 2147 Returns: 2148 Enhanced debugging results with PEDA 2149 """ 2150 data = { 2151 "binary": binary, 2152 "commands": commands, 2153 "attach_pid": attach_pid, 2154 "core_file": core_file, 2155 "additional_args": additional_args 2156 } 2157 logger.info(f"๐ง Starting GDB-PEDA analysis: {binary or f'PID {attach_pid}' or core_file}") 2158 result = hexstrike_client.safe_post("api/tools/gdb-peda", data) 2159 if result.get("success"): 2160 logger.info(f"โ GDB-PEDA analysis completed") 2161 else: 2162 logger.error(f"โ GDB-PEDA analysis failed") 2163 return result 2164 2165 @mcp.tool() 2166 def angr_symbolic_execution(binary: str, script_content: str = "", 2167 find_address: str = "", avoid_addresses: str = "", 2168 analysis_type: str = "symbolic", additional_args: str = "") -> Dict[str, Any]: 2169 """ 2170 Execute angr for symbolic execution and binary analysis. 2171 2172 Args: 2173 binary: Binary to analyze 2174 script_content: Custom angr script content 2175 find_address: Address to find during symbolic execution 2176 avoid_addresses: Comma-separated addresses to avoid 2177 analysis_type: Type of analysis (symbolic, cfg, static) 2178 additional_args: Additional arguments 2179 2180 Returns: 2181 Symbolic execution and binary analysis results 2182 """ 2183 data = { 2184 "binary": binary, 2185 "script_content": script_content, 2186 "find_address": find_address, 2187 "avoid_addresses": avoid_addresses, 2188 "analysis_type": analysis_type, 2189 "additional_args": additional_args 2190 } 2191 logger.info(f"๐ง Starting angr analysis: {binary}") 2192 result = hexstrike_client.safe_post("api/tools/angr", data) 2193 if result.get("success"): 2194 logger.info(f"โ angr analysis completed") 2195 else: 2196 logger.error(f"โ angr analysis failed") 2197 return result 2198 2199 @mcp.tool() 2200 def ropper_gadget_search(binary: str, gadget_type: str = "rop", quality: int = 1, 2201 arch: str = "", search_string: str = "", 2202 additional_args: str = "") -> Dict[str, Any]: 2203 """ 2204 Execute ropper for advanced ROP/JOP gadget searching. 2205 2206 Args: 2207 binary: Binary to search for gadgets 2208 gadget_type: Type of gadgets (rop, jop, sys, all) 2209 quality: Gadget quality level (1-5) 2210 arch: Target architecture (x86, x86_64, arm, etc.) 2211 search_string: Specific gadget pattern to search for 2212 additional_args: Additional ropper arguments 2213 2214 Returns: 2215 Advanced ROP/JOP gadget search results 2216 """ 2217 data = { 2218 "binary": binary, 2219 "gadget_type": gadget_type, 2220 "quality": quality, 2221 "arch": arch, 2222 "search_string": search_string, 2223 "additional_args": additional_args 2224 } 2225 logger.info(f"๐ง Starting ropper analysis: {binary}") 2226 result = hexstrike_client.safe_post("api/tools/ropper", data) 2227 if result.get("success"): 2228 logger.info(f"โ ropper analysis completed") 2229 else: 2230 logger.error(f"โ ropper analysis failed") 2231 return result 2232 2233 @mcp.tool() 2234 def pwninit_setup(binary: str, libc: str = "", ld: str = "", 2235 template_type: str = "python", additional_args: str = "") -> Dict[str, Any]: 2236 """ 2237 Execute pwninit for CTF binary exploitation setup. 2238 2239 Args: 2240 binary: Binary file to set up 2241 libc: Libc file to use 2242 ld: Loader file to use 2243 template_type: Template type (python, c) 2244 additional_args: Additional pwninit arguments 2245 2246 Returns: 2247 CTF binary exploitation setup results 2248 """ 2249 data = { 2250 "binary": binary, 2251 "libc": libc, 2252 "ld": ld, 2253 "template_type": template_type, 2254 "additional_args": additional_args 2255 } 2256 logger.info(f"๐ง Starting pwninit setup: {binary}") 2257 result = hexstrike_client.safe_post("api/tools/pwninit", data) 2258 if result.get("success"): 2259 logger.info(f"โ pwninit setup completed") 2260 else: 2261 logger.error(f"โ pwninit setup failed") 2262 return result 2263 2264 @mcp.tool() 2265 def feroxbuster_scan(url: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", threads: int = 10, additional_args: str = "") -> Dict[str, Any]: 2266 """ 2267 Execute Feroxbuster for recursive content discovery with enhanced logging. 2268 2269 Args: 2270 url: The target URL 2271 wordlist: Wordlist file to use 2272 threads: Number of threads 2273 additional_args: Additional Feroxbuster arguments 2274 2275 Returns: 2276 Content discovery results 2277 """ 2278 data = { 2279 "url": url, 2280 "wordlist": wordlist, 2281 "threads": threads, 2282 "additional_args": additional_args 2283 } 2284 logger.info(f"๐ Starting Feroxbuster scan: {url}") 2285 result = hexstrike_client.safe_post("api/tools/feroxbuster", data) 2286 if result.get("success"): 2287 logger.info(f"โ Feroxbuster scan completed for {url}") 2288 else: 2289 logger.error(f"โ Feroxbuster scan failed for {url}") 2290 return result 2291 2292 @mcp.tool() 2293 def dotdotpwn_scan(target: str, module: str = "http", additional_args: str = "") -> Dict[str, Any]: 2294 """ 2295 Execute DotDotPwn for directory traversal testing with enhanced logging. 2296 2297 Args: 2298 target: The target hostname or IP 2299 module: Module to use (http, ftp, tftp, etc.) 2300 additional_args: Additional DotDotPwn arguments 2301 2302 Returns: 2303 Directory traversal test results 2304 """ 2305 data = { 2306 "target": target, 2307 "module": module, 2308 "additional_args": additional_args 2309 } 2310 logger.info(f"๐ Starting DotDotPwn scan: {target}") 2311 result = hexstrike_client.safe_post("api/tools/dotdotpwn", data) 2312 if result.get("success"): 2313 logger.info(f"โ DotDotPwn scan completed for {target}") 2314 else: 2315 logger.error(f"โ DotDotPwn scan failed for {target}") 2316 return result 2317 2318 @mcp.tool() 2319 def xsser_scan(url: str, params: str = "", additional_args: str = "") -> Dict[str, Any]: 2320 """ 2321 Execute XSSer for XSS vulnerability testing with enhanced logging. 2322 2323 Args: 2324 url: The target URL 2325 params: Parameters to test 2326 additional_args: Additional XSSer arguments 2327 2328 Returns: 2329 XSS vulnerability test results 2330 """ 2331 data = { 2332 "url": url, 2333 "params": params, 2334 "additional_args": additional_args 2335 } 2336 logger.info(f"๐ Starting XSSer scan: {url}") 2337 result = hexstrike_client.safe_post("api/tools/xsser", data) 2338 if result.get("success"): 2339 logger.info(f"โ XSSer scan completed for {url}") 2340 else: 2341 logger.error(f"โ XSSer scan failed for {url}") 2342 return result 2343 2344 @mcp.tool() 2345 def wfuzz_scan(url: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", additional_args: str = "") -> Dict[str, Any]: 2346 """ 2347 Execute Wfuzz for web application fuzzing with enhanced logging. 2348 2349 Args: 2350 url: The target URL (use FUZZ where you want to inject payloads) 2351 wordlist: Wordlist file to use 2352 additional_args: Additional Wfuzz arguments 2353 2354 Returns: 2355 Web application fuzzing results 2356 """ 2357 data = { 2358 "url": url, 2359 "wordlist": wordlist, 2360 "additional_args": additional_args 2361 } 2362 logger.info(f"๐ Starting Wfuzz scan: {url}") 2363 result = hexstrike_client.safe_post("api/tools/wfuzz", data) 2364 if result.get("success"): 2365 logger.info(f"โ Wfuzz scan completed for {url}") 2366 else: 2367 logger.error(f"โ Wfuzz scan failed for {url}") 2368 return result 2369 2370 # ============================================================================ 2371 # ENHANCED WEB APPLICATION SECURITY TOOLS (v6.0) 2372 # ============================================================================ 2373 2374 @mcp.tool() 2375 def dirsearch_scan(url: str, extensions: str = "php,html,js,txt,xml,json", 2376 wordlist: str = "/usr/share/wordlists/dirsearch/common.txt", 2377 threads: int = 30, recursive: bool = False, additional_args: str = "") -> Dict[str, Any]: 2378 """ 2379 Execute Dirsearch for advanced directory and file discovery with enhanced logging. 2380 2381 Args: 2382 url: The target URL 2383 extensions: File extensions to search for 2384 wordlist: Wordlist file to use 2385 threads: Number of threads to use 2386 recursive: Enable recursive scanning 2387 additional_args: Additional Dirsearch arguments 2388 2389 Returns: 2390 Advanced directory discovery results 2391 """ 2392 data = { 2393 "url": url, 2394 "extensions": extensions, 2395 "wordlist": wordlist, 2396 "threads": threads, 2397 "recursive": recursive, 2398 "additional_args": additional_args 2399 } 2400 logger.info(f"๐ Starting Dirsearch scan: {url}") 2401 result = hexstrike_client.safe_post("api/tools/dirsearch", data) 2402 if result.get("success"): 2403 logger.info(f"โ Dirsearch scan completed for {url}") 2404 else: 2405 logger.error(f"โ Dirsearch scan failed for {url}") 2406 return result 2407 2408 @mcp.tool() 2409 def katana_crawl(url: str, depth: int = 3, js_crawl: bool = True, 2410 form_extraction: bool = True, output_format: str = "json", 2411 additional_args: str = "") -> Dict[str, Any]: 2412 """ 2413 Execute Katana for next-generation crawling and spidering with enhanced logging. 2414 2415 Args: 2416 url: The target URL to crawl 2417 depth: Crawling depth 2418 js_crawl: Enable JavaScript crawling 2419 form_extraction: Enable form extraction 2420 output_format: Output format (json, txt) 2421 additional_args: Additional Katana arguments 2422 2423 Returns: 2424 Advanced web crawling results with endpoints and forms 2425 """ 2426 data = { 2427 "url": url, 2428 "depth": depth, 2429 "js_crawl": js_crawl, 2430 "form_extraction": form_extraction, 2431 "output_format": output_format, 2432 "additional_args": additional_args 2433 } 2434 logger.info(f"โ๏ธ Starting Katana crawl: {url}") 2435 result = hexstrike_client.safe_post("api/tools/katana", data) 2436 if result.get("success"): 2437 logger.info(f"โ Katana crawl completed for {url}") 2438 else: 2439 logger.error(f"โ Katana crawl failed for {url}") 2440 return result 2441 2442 @mcp.tool() 2443 def gau_discovery(domain: str, providers: str = "wayback,commoncrawl,otx,urlscan", 2444 include_subs: bool = True, blacklist: str = "png,jpg,gif,jpeg,swf,woff,svg,pdf,css,ico", 2445 additional_args: str = "") -> Dict[str, Any]: 2446 """ 2447 Execute Gau (Get All URLs) for URL discovery from multiple sources with enhanced logging. 2448 2449 Args: 2450 domain: The target domain 2451 providers: Data providers to use 2452 include_subs: Include subdomains 2453 blacklist: File extensions to blacklist 2454 additional_args: Additional Gau arguments 2455 2456 Returns: 2457 Comprehensive URL discovery results from multiple sources 2458 """ 2459 data = { 2460 "domain": domain, 2461 "providers": providers, 2462 "include_subs": include_subs, 2463 "blacklist": blacklist, 2464 "additional_args": additional_args 2465 } 2466 logger.info(f"๐ก Starting Gau URL discovery: {domain}") 2467 result = hexstrike_client.safe_post("api/tools/gau", data) 2468 if result.get("success"): 2469 logger.info(f"โ Gau URL discovery completed for {domain}") 2470 else: 2471 logger.error(f"โ Gau URL discovery failed for {domain}") 2472 return result 2473 2474 @mcp.tool() 2475 def waybackurls_discovery(domain: str, get_versions: bool = False, 2476 no_subs: bool = False, additional_args: str = "") -> Dict[str, Any]: 2477 """ 2478 Execute Waybackurls for historical URL discovery with enhanced logging. 2479 2480 Args: 2481 domain: The target domain 2482 get_versions: Get all versions of URLs 2483 no_subs: Don't include subdomains 2484 additional_args: Additional Waybackurls arguments 2485 2486 Returns: 2487 Historical URL discovery results from Wayback Machine 2488 """ 2489 data = { 2490 "domain": domain, 2491 "get_versions": get_versions, 2492 "no_subs": no_subs, 2493 "additional_args": additional_args 2494 } 2495 logger.info(f"๐ฐ๏ธ Starting Waybackurls discovery: {domain}") 2496 result = hexstrike_client.safe_post("api/tools/waybackurls", data) 2497 if result.get("success"): 2498 logger.info(f"โ Waybackurls discovery completed for {domain}") 2499 else: 2500 logger.error(f"โ Waybackurls discovery failed for {domain}") 2501 return result 2502 2503 @mcp.tool() 2504 def arjun_parameter_discovery(url: str, method: str = "GET", wordlist: str = "", 2505 delay: int = 0, threads: int = 25, stable: bool = False, 2506 additional_args: str = "") -> Dict[str, Any]: 2507 """ 2508 Execute Arjun for HTTP parameter discovery with enhanced logging. 2509 2510 Args: 2511 url: The target URL 2512 method: HTTP method to use 2513 wordlist: Custom wordlist file 2514 delay: Delay between requests 2515 threads: Number of threads 2516 stable: Use stable mode 2517 additional_args: Additional Arjun arguments 2518 2519 Returns: 2520 HTTP parameter discovery results 2521 """ 2522 data = { 2523 "url": url, 2524 "method": method, 2525 "wordlist": wordlist, 2526 "delay": delay, 2527 "threads": threads, 2528 "stable": stable, 2529 "additional_args": additional_args 2530 } 2531 logger.info(f"๐ฏ Starting Arjun parameter discovery: {url}") 2532 result = hexstrike_client.safe_post("api/tools/arjun", data) 2533 if result.get("success"): 2534 logger.info(f"โ Arjun parameter discovery completed for {url}") 2535 else: 2536 logger.error(f"โ Arjun parameter discovery failed for {url}") 2537 return result 2538 2539 @mcp.tool() 2540 def paramspider_mining(domain: str, level: int = 2, 2541 exclude: str = "png,jpg,gif,jpeg,swf,woff,svg,pdf,css,ico", 2542 output: str = "", additional_args: str = "") -> Dict[str, Any]: 2543 """ 2544 Execute ParamSpider for parameter mining from web archives with enhanced logging. 2545 2546 Args: 2547 domain: The target domain 2548 level: Mining level depth 2549 exclude: File extensions to exclude 2550 output: Output file path 2551 additional_args: Additional ParamSpider arguments 2552 2553 Returns: 2554 Parameter mining results from web archives 2555 """ 2556 data = { 2557 "domain": domain, 2558 "level": level, 2559 "exclude": exclude, 2560 "output": output, 2561 "additional_args": additional_args 2562 } 2563 logger.info(f"๐ท๏ธ Starting ParamSpider mining: {domain}") 2564 result = hexstrike_client.safe_post("api/tools/paramspider", data) 2565 if result.get("success"): 2566 logger.info(f"โ ParamSpider mining completed for {domain}") 2567 else: 2568 logger.error(f"โ ParamSpider mining failed for {domain}") 2569 return result 2570 2571 @mcp.tool() 2572 def x8_parameter_discovery(url: str, wordlist: str = "/usr/share/wordlists/x8/params.txt", 2573 method: str = "GET", body: str = "", headers: str = "", 2574 additional_args: str = "") -> Dict[str, Any]: 2575 """ 2576 Execute x8 for hidden parameter discovery with enhanced logging. 2577 2578 Args: 2579 url: The target URL 2580 wordlist: Parameter wordlist 2581 method: HTTP method 2582 body: Request body 2583 headers: Custom headers 2584 additional_args: Additional x8 arguments 2585 2586 Returns: 2587 Hidden parameter discovery results 2588 """ 2589 data = { 2590 "url": url, 2591 "wordlist": wordlist, 2592 "method": method, 2593 "body": body, 2594 "headers": headers, 2595 "additional_args": additional_args 2596 } 2597 logger.info(f"๐ Starting x8 parameter discovery: {url}") 2598 result = hexstrike_client.safe_post("api/tools/x8", data) 2599 if result.get("success"): 2600 logger.info(f"โ x8 parameter discovery completed for {url}") 2601 else: 2602 logger.error(f"โ x8 parameter discovery failed for {url}") 2603 return result 2604 2605 @mcp.tool() 2606 def jaeles_vulnerability_scan(url: str, signatures: str = "", config: str = "", 2607 threads: int = 20, timeout: int = 20, 2608 additional_args: str = "") -> Dict[str, Any]: 2609 """ 2610 Execute Jaeles for advanced vulnerability scanning with custom signatures. 2611 2612 Args: 2613 url: The target URL 2614 signatures: Custom signature path 2615 config: Configuration file 2616 threads: Number of threads 2617 timeout: Request timeout 2618 additional_args: Additional Jaeles arguments 2619 2620 Returns: 2621 Advanced vulnerability scanning results with custom signatures 2622 """ 2623 data = { 2624 "url": url, 2625 "signatures": signatures, 2626 "config": config, 2627 "threads": threads, 2628 "timeout": timeout, 2629 "additional_args": additional_args 2630 } 2631 logger.info(f"๐ฌ Starting Jaeles vulnerability scan: {url}") 2632 result = hexstrike_client.safe_post("api/tools/jaeles", data) 2633 if result.get("success"): 2634 logger.info(f"โ Jaeles vulnerability scan completed for {url}") 2635 else: 2636 logger.error(f"โ Jaeles vulnerability scan failed for {url}") 2637 return result 2638 2639 @mcp.tool() 2640 def dalfox_xss_scan(url: str, pipe_mode: bool = False, blind: bool = False, 2641 mining_dom: bool = True, mining_dict: bool = True, 2642 custom_payload: str = "", additional_args: str = "") -> Dict[str, Any]: 2643 """ 2644 Execute Dalfox for advanced XSS vulnerability scanning with enhanced logging. 2645 2646 Args: 2647 url: The target URL 2648 pipe_mode: Use pipe mode for input 2649 blind: Enable blind XSS testing 2650 mining_dom: Enable DOM mining 2651 mining_dict: Enable dictionary mining 2652 custom_payload: Custom XSS payload 2653 additional_args: Additional Dalfox arguments 2654 2655 Returns: 2656 Advanced XSS vulnerability scanning results 2657 """ 2658 data = { 2659 "url": url, 2660 "pipe_mode": pipe_mode, 2661 "blind": blind, 2662 "mining_dom": mining_dom, 2663 "mining_dict": mining_dict, 2664 "custom_payload": custom_payload, 2665 "additional_args": additional_args 2666 } 2667 logger.info(f"๐ฏ Starting Dalfox XSS scan: {url if url else 'pipe mode'}") 2668 result = hexstrike_client.safe_post("api/tools/dalfox", data) 2669 if result.get("success"): 2670 logger.info(f"โ Dalfox XSS scan completed") 2671 else: 2672 logger.error(f"โ Dalfox XSS scan failed") 2673 return result 2674 2675 @mcp.tool() 2676 def httpx_probe(target: str, probe: bool = True, tech_detect: bool = False, 2677 status_code: bool = False, content_length: bool = False, 2678 title: bool = False, web_server: bool = False, threads: int = 50, 2679 additional_args: str = "") -> Dict[str, Any]: 2680 """ 2681 Execute httpx for fast HTTP probing and technology detection. 2682 2683 Args: 2684 target: Target file or single URL 2685 probe: Enable probing 2686 tech_detect: Enable technology detection 2687 status_code: Show status codes 2688 content_length: Show content length 2689 title: Show page titles 2690 web_server: Show web server 2691 threads: Number of threads 2692 additional_args: Additional httpx arguments 2693 2694 Returns: 2695 Fast HTTP probing results with technology detection 2696 """ 2697 data = { 2698 "target": target, 2699 "probe": probe, 2700 "tech_detect": tech_detect, 2701 "status_code": status_code, 2702 "content_length": content_length, 2703 "title": title, 2704 "web_server": web_server, 2705 "threads": threads, 2706 "additional_args": additional_args 2707 } 2708 logger.info(f"๐ Starting httpx probe: {target}") 2709 result = hexstrike_client.safe_post("api/tools/httpx", data) 2710 if result.get("success"): 2711 logger.info(f"โ httpx probe completed for {target}") 2712 else: 2713 logger.error(f"โ httpx probe failed for {target}") 2714 return result 2715 2716 @mcp.tool() 2717 def anew_data_processing(input_data: str, output_file: str = "", 2718 additional_args: str = "") -> Dict[str, Any]: 2719 """ 2720 Execute anew for appending new lines to files (useful for data processing). 2721 2722 Args: 2723 input_data: Input data to process 2724 output_file: Output file path 2725 additional_args: Additional anew arguments 2726 2727 Returns: 2728 Data processing results with unique line filtering 2729 """ 2730 data = { 2731 "input_data": input_data, 2732 "output_file": output_file, 2733 "additional_args": additional_args 2734 } 2735 logger.info("๐ Starting anew data processing") 2736 result = hexstrike_client.safe_post("api/tools/anew", data) 2737 if result.get("success"): 2738 logger.info("โ anew data processing completed") 2739 else: 2740 logger.error("โ anew data processing failed") 2741 return result 2742 2743 @mcp.tool() 2744 def qsreplace_parameter_replacement(urls: str, replacement: str = "FUZZ", 2745 additional_args: str = "") -> Dict[str, Any]: 2746 """ 2747 Execute qsreplace for query string parameter replacement. 2748 2749 Args: 2750 urls: URLs to process 2751 replacement: Replacement string for parameters 2752 additional_args: Additional qsreplace arguments 2753 2754 Returns: 2755 Parameter replacement results for fuzzing 2756 """ 2757 data = { 2758 "urls": urls, 2759 "replacement": replacement, 2760 "additional_args": additional_args 2761 } 2762 logger.info("๐ Starting qsreplace parameter replacement") 2763 result = hexstrike_client.safe_post("api/tools/qsreplace", data) 2764 if result.get("success"): 2765 logger.info("โ qsreplace parameter replacement completed") 2766 else: 2767 logger.error("โ qsreplace parameter replacement failed") 2768 return result 2769 2770 @mcp.tool() 2771 def uro_url_filtering(urls: str, whitelist: str = "", blacklist: str = "", 2772 additional_args: str = "") -> Dict[str, Any]: 2773 """ 2774 Execute uro for filtering out similar URLs. 2775 2776 Args: 2777 urls: URLs to filter 2778 whitelist: Whitelist patterns 2779 blacklist: Blacklist patterns 2780 additional_args: Additional uro arguments 2781 2782 Returns: 2783 Filtered URL results with duplicates removed 2784 """ 2785 data = { 2786 "urls": urls, 2787 "whitelist": whitelist, 2788 "blacklist": blacklist, 2789 "additional_args": additional_args 2790 } 2791 logger.info("๐ Starting uro URL filtering") 2792 result = hexstrike_client.safe_post("api/tools/uro", data) 2793 if result.get("success"): 2794 logger.info("โ uro URL filtering completed") 2795 else: 2796 logger.error("โ uro URL filtering failed") 2797 return result 2798 2799 # ============================================================================ 2800 # AI-POWERED PAYLOAD GENERATION (v5.0 ENHANCEMENT) 2801 # ============================================================================ 2802 2803 @mcp.tool() 2804 def ai_generate_payload(attack_type: str, complexity: str = "basic", technology: str = "", url: str = "") -> Dict[str, Any]: 2805 """ 2806 Generate AI-powered contextual payloads for security testing. 2807 2808 Args: 2809 attack_type: Type of attack (xss, sqli, lfi, cmd_injection, ssti, xxe) 2810 complexity: Complexity level (basic, advanced, bypass) 2811 technology: Target technology (php, asp, jsp, python, nodejs) 2812 url: Target URL for context 2813 2814 Returns: 2815 Contextual payloads with risk assessment and test cases 2816 """ 2817 data = { 2818 "attack_type": attack_type, 2819 "complexity": complexity, 2820 "technology": technology, 2821 "url": url 2822 } 2823 logger.info(f"๐ค Generating AI payloads for {attack_type} attack") 2824 result = hexstrike_client.safe_post("api/ai/generate_payload", data) 2825 2826 if result.get("success"): 2827 payload_data = result.get("ai_payload_generation", {}) 2828 count = payload_data.get("payload_count", 0) 2829 logger.info(f"โ Generated {count} contextual {attack_type} payloads") 2830 2831 # Log some example payloads for user awareness 2832 payloads = payload_data.get("payloads", []) 2833 if payloads: 2834 logger.info("๐ฏ Sample payloads generated:") 2835 for i, payload_info in enumerate(payloads[:3]): # Show first 3 2836 risk = payload_info.get("risk_level", "UNKNOWN") 2837 context = payload_info.get("context", "basic") 2838 logger.info(f" โโ [{risk}] {context}: {payload_info['payload'][:50]}...") 2839 else: 2840 logger.error("โ AI payload generation failed") 2841 2842 return result 2843 2844 @mcp.tool() 2845 def ai_test_payload(payload: str, target_url: str, method: str = "GET") -> Dict[str, Any]: 2846 """ 2847 Test generated payload against target with AI analysis. 2848 2849 Args: 2850 payload: The payload to test 2851 target_url: Target URL to test against 2852 method: HTTP method (GET, POST) 2853 2854 Returns: 2855 Test results with AI analysis and vulnerability assessment 2856 """ 2857 data = { 2858 "payload": payload, 2859 "target_url": target_url, 2860 "method": method 2861 } 2862 logger.info(f"๐งช Testing AI payload against {target_url}") 2863 result = hexstrike_client.safe_post("api/ai/test_payload", data) 2864 2865 if result.get("success"): 2866 analysis = result.get("ai_analysis", {}) 2867 potential_vuln = analysis.get("potential_vulnerability", False) 2868 logger.info(f"๐ Payload test completed | Vulnerability detected: {potential_vuln}") 2869 2870 if potential_vuln: 2871 logger.warning("โ ๏ธ Potential vulnerability found! Review the response carefully.") 2872 else: 2873 logger.info("โ No obvious vulnerability indicators detected") 2874 else: 2875 logger.error("โ Payload testing failed") 2876 2877 return result 2878 2879 @mcp.tool() 2880 def ai_generate_attack_suite(target_url: str, attack_types: str = "xss,sqli,lfi") -> Dict[str, Any]: 2881 """ 2882 Generate comprehensive attack suite with multiple payload types. 2883 2884 Args: 2885 target_url: Target URL for testing 2886 attack_types: Comma-separated list of attack types 2887 2888 Returns: 2889 Comprehensive attack suite with multiple payload types 2890 """ 2891 attack_list = [attack.strip() for attack in attack_types.split(",")] 2892 results = { 2893 "target_url": target_url, 2894 "attack_types": attack_list, 2895 "payload_suites": {}, 2896 "summary": { 2897 "total_payloads": 0, 2898 "high_risk_payloads": 0, 2899 "test_cases": 0 2900 } 2901 } 2902 2903 logger.info(f"๐ Generating comprehensive attack suite for {target_url}") 2904 logger.info(f"๐ฏ Attack types: {', '.join(attack_list)}") 2905 2906 for attack_type in attack_list: 2907 logger.info(f"๐ค Generating {attack_type} payloads...") 2908 2909 # Generate payloads for this attack type 2910 payload_result = self.ai_generate_payload(attack_type, "advanced", "", target_url) 2911 2912 if payload_result.get("success"): 2913 payload_data = payload_result.get("ai_payload_generation", {}) 2914 results["payload_suites"][attack_type] = payload_data 2915 2916 # Update summary 2917 results["summary"]["total_payloads"] += payload_data.get("payload_count", 0) 2918 results["summary"]["test_cases"] += len(payload_data.get("test_cases", [])) 2919 2920 # Count high-risk payloads 2921 for payload_info in payload_data.get("payloads", []): 2922 if payload_info.get("risk_level") == "HIGH": 2923 results["summary"]["high_risk_payloads"] += 1 2924 2925 logger.info(f"โ Attack suite generated:") 2926 logger.info(f" โโ Total payloads: {results['summary']['total_payloads']}") 2927 logger.info(f" โโ High-risk payloads: {results['summary']['high_risk_payloads']}") 2928 logger.info(f" โโ Test cases: {results['summary']['test_cases']}") 2929 2930 return { 2931 "success": True, 2932 "attack_suite": results, 2933 "timestamp": time.time() 2934 } 2935 2936 # ============================================================================ 2937 # ADVANCED API TESTING TOOLS (v5.0 ENHANCEMENT) 2938 # ============================================================================ 2939 2940 @mcp.tool() 2941 def api_fuzzer(base_url: str, endpoints: str = "", methods: str = "GET,POST,PUT,DELETE", wordlist: str = "/usr/share/wordlists/api/api-endpoints.txt") -> Dict[str, Any]: 2942 """ 2943 Advanced API endpoint fuzzing with intelligent parameter discovery. 2944 2945 Args: 2946 base_url: Base URL of the API 2947 endpoints: Comma-separated list of specific endpoints to test 2948 methods: HTTP methods to test (comma-separated) 2949 wordlist: Wordlist for endpoint discovery 2950 2951 Returns: 2952 API fuzzing results with endpoint discovery and vulnerability assessment 2953 """ 2954 data = { 2955 "base_url": base_url, 2956 "endpoints": [e.strip() for e in endpoints.split(",") if e.strip()] if endpoints else [], 2957 "methods": [m.strip() for m in methods.split(",")], 2958 "wordlist": wordlist 2959 } 2960 2961 logger.info(f"๐ Starting API fuzzing: {base_url}") 2962 result = hexstrike_client.safe_post("api/tools/api_fuzzer", data) 2963 2964 if result.get("success"): 2965 fuzzing_type = result.get("fuzzing_type", "unknown") 2966 if fuzzing_type == "endpoint_testing": 2967 endpoint_count = len(result.get("results", [])) 2968 logger.info(f"โ API endpoint testing completed: {endpoint_count} endpoints tested") 2969 else: 2970 logger.info(f"โ API endpoint discovery completed") 2971 else: 2972 logger.error("โ API fuzzing failed") 2973 2974 return result 2975 2976 @mcp.tool() 2977 def graphql_scanner(endpoint: str, introspection: bool = True, query_depth: int = 10, test_mutations: bool = True) -> Dict[str, Any]: 2978 """ 2979 Advanced GraphQL security scanning and introspection. 2980 2981 Args: 2982 endpoint: GraphQL endpoint URL 2983 introspection: Test introspection queries 2984 query_depth: Maximum query depth to test 2985 test_mutations: Test mutation operations 2986 2987 Returns: 2988 GraphQL security scan results with vulnerability assessment 2989 """ 2990 data = { 2991 "endpoint": endpoint, 2992 "introspection": introspection, 2993 "query_depth": query_depth, 2994 "test_mutations": test_mutations 2995 } 2996 2997 logger.info(f"๐ Starting GraphQL security scan: {endpoint}") 2998 result = hexstrike_client.safe_post("api/tools/graphql_scanner", data) 2999 3000 if result.get("success"): 3001 scan_results = result.get("graphql_scan_results", {}) 3002 vuln_count = len(scan_results.get("vulnerabilities", [])) 3003 tests_count = len(scan_results.get("tests_performed", [])) 3004 3005 logger.info(f"โ GraphQL scan completed: {tests_count} tests, {vuln_count} vulnerabilities") 3006 3007 if vuln_count > 0: 3008 logger.warning(f"โ ๏ธ Found {vuln_count} GraphQL vulnerabilities!") 3009 for vuln in scan_results.get("vulnerabilities", [])[:3]: # Show first 3 3010 severity = vuln.get("severity", "UNKNOWN") 3011 vuln_type = vuln.get("type", "unknown") 3012 logger.warning(f" โโ [{severity}] {vuln_type}") 3013 else: 3014 logger.error("โ GraphQL scanning failed") 3015 3016 return result 3017 3018 @mcp.tool() 3019 def jwt_analyzer(jwt_token: str, target_url: str = "") -> Dict[str, Any]: 3020 """ 3021 Advanced JWT token analysis and vulnerability testing. 3022 3023 Args: 3024 jwt_token: JWT token to analyze 3025 target_url: Optional target URL for testing token manipulation 3026 3027 Returns: 3028 JWT analysis results with vulnerability assessment and attack vectors 3029 """ 3030 data = { 3031 "jwt_token": jwt_token, 3032 "target_url": target_url 3033 } 3034 3035 logger.info(f"๐ Starting JWT security analysis") 3036 result = hexstrike_client.safe_post("api/tools/jwt_analyzer", data) 3037 3038 if result.get("success"): 3039 analysis = result.get("jwt_analysis_results", {}) 3040 vuln_count = len(analysis.get("vulnerabilities", [])) 3041 algorithm = analysis.get("token_info", {}).get("algorithm", "unknown") 3042 3043 logger.info(f"โ JWT analysis completed: {vuln_count} vulnerabilities found") 3044 logger.info(f"๐ Token algorithm: {algorithm}") 3045 3046 if vuln_count > 0: 3047 logger.warning(f"โ ๏ธ Found {vuln_count} JWT vulnerabilities!") 3048 for vuln in analysis.get("vulnerabilities", [])[:3]: # Show first 3 3049 severity = vuln.get("severity", "UNKNOWN") 3050 vuln_type = vuln.get("type", "unknown") 3051 logger.warning(f" โโ [{severity}] {vuln_type}") 3052 else: 3053 logger.error("โ JWT analysis failed") 3054 3055 return result 3056 3057 @mcp.tool() 3058 def api_schema_analyzer(schema_url: str, schema_type: str = "openapi") -> Dict[str, Any]: 3059 """ 3060 Analyze API schemas and identify potential security issues. 3061 3062 Args: 3063 schema_url: URL to the API schema (OpenAPI/Swagger/GraphQL) 3064 schema_type: Type of schema (openapi, swagger, graphql) 3065 3066 Returns: 3067 Schema analysis results with security issues and recommendations 3068 """ 3069 data = { 3070 "schema_url": schema_url, 3071 "schema_type": schema_type 3072 } 3073 3074 logger.info(f"๐ Starting API schema analysis: {schema_url}") 3075 result = hexstrike_client.safe_post("api/tools/api_schema_analyzer", data) 3076 3077 if result.get("success"): 3078 analysis = result.get("schema_analysis_results", {}) 3079 endpoint_count = len(analysis.get("endpoints_found", [])) 3080 issue_count = len(analysis.get("security_issues", [])) 3081 3082 logger.info(f"โ Schema analysis completed: {endpoint_count} endpoints, {issue_count} issues") 3083 3084 if issue_count > 0: 3085 logger.warning(f"โ ๏ธ Found {issue_count} security issues in schema!") 3086 for issue in analysis.get("security_issues", [])[:3]: # Show first 3 3087 severity = issue.get("severity", "UNKNOWN") 3088 issue_type = issue.get("issue", "unknown") 3089 logger.warning(f" โโ [{severity}] {issue_type}") 3090 3091 if endpoint_count > 0: 3092 logger.info(f"๐ Discovered endpoints:") 3093 for endpoint in analysis.get("endpoints_found", [])[:5]: # Show first 5 3094 method = endpoint.get("method", "GET") 3095 path = endpoint.get("path", "/") 3096 logger.info(f" โโ {method} {path}") 3097 else: 3098 logger.error("โ Schema analysis failed") 3099 3100 return result 3101 3102 @mcp.tool() 3103 def comprehensive_api_audit(base_url: str, schema_url: str = "", jwt_token: str = "", graphql_endpoint: str = "") -> Dict[str, Any]: 3104 """ 3105 Comprehensive API security audit combining multiple testing techniques. 3106 3107 Args: 3108 base_url: Base URL of the API 3109 schema_url: Optional API schema URL 3110 jwt_token: Optional JWT token for analysis 3111 graphql_endpoint: Optional GraphQL endpoint 3112 3113 Returns: 3114 Comprehensive audit results with all API security tests 3115 """ 3116 audit_results = { 3117 "base_url": base_url, 3118 "audit_timestamp": time.time(), 3119 "tests_performed": [], 3120 "total_vulnerabilities": 0, 3121 "summary": {}, 3122 "recommendations": [] 3123 } 3124 3125 logger.info(f"๐ Starting comprehensive API security audit: {base_url}") 3126 3127 # 1. API Endpoint Fuzzing 3128 logger.info("๐ Phase 1: API endpoint discovery and fuzzing") 3129 fuzz_result = self.api_fuzzer(base_url) 3130 if fuzz_result.get("success"): 3131 audit_results["tests_performed"].append("api_fuzzing") 3132 audit_results["api_fuzzing"] = fuzz_result 3133 3134 # 2. Schema Analysis (if provided) 3135 if schema_url: 3136 logger.info("๐ Phase 2: API schema analysis") 3137 schema_result = self.api_schema_analyzer(schema_url) 3138 if schema_result.get("success"): 3139 audit_results["tests_performed"].append("schema_analysis") 3140 audit_results["schema_analysis"] = schema_result 3141 3142 schema_data = schema_result.get("schema_analysis_results", {}) 3143 audit_results["total_vulnerabilities"] += len(schema_data.get("security_issues", [])) 3144 3145 # 3. JWT Analysis (if provided) 3146 if jwt_token: 3147 logger.info("๐ Phase 3: JWT token analysis") 3148 jwt_result = self.jwt_analyzer(jwt_token, base_url) 3149 if jwt_result.get("success"): 3150 audit_results["tests_performed"].append("jwt_analysis") 3151 audit_results["jwt_analysis"] = jwt_result 3152 3153 jwt_data = jwt_result.get("jwt_analysis_results", {}) 3154 audit_results["total_vulnerabilities"] += len(jwt_data.get("vulnerabilities", [])) 3155 3156 # 4. GraphQL Testing (if provided) 3157 if graphql_endpoint: 3158 logger.info("๐ Phase 4: GraphQL security scanning") 3159 graphql_result = self.graphql_scanner(graphql_endpoint) 3160 if graphql_result.get("success"): 3161 audit_results["tests_performed"].append("graphql_scanning") 3162 audit_results["graphql_scanning"] = graphql_result 3163 3164 graphql_data = graphql_result.get("graphql_scan_results", {}) 3165 audit_results["total_vulnerabilities"] += len(graphql_data.get("vulnerabilities", [])) 3166 3167 # Generate comprehensive recommendations 3168 audit_results["recommendations"] = [ 3169 "Implement proper authentication and authorization", 3170 "Use HTTPS for all API communications", 3171 "Validate and sanitize all input parameters", 3172 "Implement rate limiting and request throttling", 3173 "Add comprehensive logging and monitoring", 3174 "Regular security testing and code reviews", 3175 "Keep API documentation updated and secure", 3176 "Implement proper error handling" 3177 ] 3178 3179 # Summary 3180 audit_results["summary"] = { 3181 "tests_performed": len(audit_results["tests_performed"]), 3182 "total_vulnerabilities": audit_results["total_vulnerabilities"], 3183 "audit_coverage": "comprehensive" if len(audit_results["tests_performed"]) >= 3 else "partial" 3184 } 3185 3186 logger.info(f"โ Comprehensive API audit completed:") 3187 logger.info(f" โโ Tests performed: {audit_results['summary']['tests_performed']}") 3188 logger.info(f" โโ Total vulnerabilities: {audit_results['summary']['total_vulnerabilities']}") 3189 logger.info(f" โโ Coverage: {audit_results['summary']['audit_coverage']}") 3190 3191 return { 3192 "success": True, 3193 "comprehensive_audit": audit_results 3194 } 3195 3196 # ============================================================================ 3197 # ADVANCED CTF TOOLS (v5.0 ENHANCEMENT) 3198 # ============================================================================ 3199 3200 @mcp.tool() 3201 def volatility3_analyze(memory_file: str, plugin: str, output_file: str = "", additional_args: str = "") -> Dict[str, Any]: 3202 """ 3203 Execute Volatility3 for advanced memory forensics with enhanced logging. 3204 3205 Args: 3206 memory_file: Path to memory dump file 3207 plugin: Volatility3 plugin to execute 3208 output_file: Output file path 3209 additional_args: Additional Volatility3 arguments 3210 3211 Returns: 3212 Advanced memory forensics results 3213 """ 3214 data = { 3215 "memory_file": memory_file, 3216 "plugin": plugin, 3217 "output_file": output_file, 3218 "additional_args": additional_args 3219 } 3220 logger.info(f"๐ง Starting Volatility3 analysis: {plugin}") 3221 result = hexstrike_client.safe_post("api/tools/volatility3", data) 3222 if result.get("success"): 3223 logger.info(f"โ Volatility3 analysis completed") 3224 else: 3225 logger.error(f"โ Volatility3 analysis failed") 3226 return result 3227 3228 @mcp.tool() 3229 def foremost_carving(input_file: str, output_dir: str = "/tmp/foremost_output", file_types: str = "", additional_args: str = "") -> Dict[str, Any]: 3230 """ 3231 Execute Foremost for file carving with enhanced logging. 3232 3233 Args: 3234 input_file: Input file or device to carve 3235 output_dir: Output directory for carved files 3236 file_types: File types to carve (jpg,gif,png,etc.) 3237 additional_args: Additional Foremost arguments 3238 3239 Returns: 3240 File carving results 3241 """ 3242 data = { 3243 "input_file": input_file, 3244 "output_dir": output_dir, 3245 "file_types": file_types, 3246 "additional_args": additional_args 3247 } 3248 logger.info(f"๐ Starting Foremost file carving: {input_file}") 3249 result = hexstrike_client.safe_post("api/tools/foremost", data) 3250 if result.get("success"): 3251 logger.info(f"โ Foremost carving completed") 3252 else: 3253 logger.error(f"โ Foremost carving failed") 3254 return result 3255 3256 @mcp.tool() 3257 def steghide_analysis(action: str, cover_file: str, embed_file: str = "", passphrase: str = "", output_file: str = "", additional_args: str = "") -> Dict[str, Any]: 3258 """ 3259 Execute Steghide for steganography analysis with enhanced logging. 3260 3261 Args: 3262 action: Action to perform (extract, embed, info) 3263 cover_file: Cover file for steganography 3264 embed_file: File to embed (for embed action) 3265 passphrase: Passphrase for steganography 3266 output_file: Output file path 3267 additional_args: Additional Steghide arguments 3268 3269 Returns: 3270 Steganography analysis results 3271 """ 3272 data = { 3273 "action": action, 3274 "cover_file": cover_file, 3275 "embed_file": embed_file, 3276 "passphrase": passphrase, 3277 "output_file": output_file, 3278 "additional_args": additional_args 3279 } 3280 logger.info(f"๐ผ๏ธ Starting Steghide {action}: {cover_file}") 3281 result = hexstrike_client.safe_post("api/tools/steghide", data) 3282 if result.get("success"): 3283 logger.info(f"โ Steghide {action} completed") 3284 else: 3285 logger.error(f"โ Steghide {action} failed") 3286 return result 3287 3288 @mcp.tool() 3289 def exiftool_extract(file_path: str, output_format: str = "", tags: str = "", additional_args: str = "") -> Dict[str, Any]: 3290 """ 3291 Execute ExifTool for metadata extraction with enhanced logging. 3292 3293 Args: 3294 file_path: Path to file for metadata extraction 3295 output_format: Output format (json, xml, csv) 3296 tags: Specific tags to extract 3297 additional_args: Additional ExifTool arguments 3298 3299 Returns: 3300 Metadata extraction results 3301 """ 3302 data = { 3303 "file_path": file_path, 3304 "output_format": output_format, 3305 "tags": tags, 3306 "additional_args": additional_args 3307 } 3308 logger.info(f"๐ท Starting ExifTool analysis: {file_path}") 3309 result = hexstrike_client.safe_post("api/tools/exiftool", data) 3310 if result.get("success"): 3311 logger.info(f"โ ExifTool analysis completed") 3312 else: 3313 logger.error(f"โ ExifTool analysis failed") 3314 return result 3315 3316 @mcp.tool() 3317 def hashpump_attack(signature: str, data: str, key_length: str, append_data: str, additional_args: str = "") -> Dict[str, Any]: 3318 """ 3319 Execute HashPump for hash length extension attacks with enhanced logging. 3320 3321 Args: 3322 signature: Original hash signature 3323 data: Original data 3324 key_length: Length of secret key 3325 append_data: Data to append 3326 additional_args: Additional HashPump arguments 3327 3328 Returns: 3329 Hash length extension attack results 3330 """ 3331 data = { 3332 "signature": signature, 3333 "data": data, 3334 "key_length": key_length, 3335 "append_data": append_data, 3336 "additional_args": additional_args 3337 } 3338 logger.info(f"๐ Starting HashPump attack") 3339 result = hexstrike_client.safe_post("api/tools/hashpump", data) 3340 if result.get("success"): 3341 logger.info(f"โ HashPump attack completed") 3342 else: 3343 logger.error(f"โ HashPump attack failed") 3344 return result 3345 3346 # ============================================================================ 3347 # BUG BOUNTY RECONNAISSANCE TOOLS (v5.0 ENHANCEMENT) 3348 # ============================================================================ 3349 3350 @mcp.tool() 3351 def hakrawler_crawl(url: str, depth: int = 2, forms: bool = True, robots: bool = True, sitemap: bool = True, wayback: bool = False, additional_args: str = "") -> Dict[str, Any]: 3352 """ 3353 Execute Hakrawler for web endpoint discovery with enhanced logging. 3354 3355 Note: Uses standard Kali Linux hakrawler (hakluke/hakrawler) with parameter mapping: 3356 - url: Piped via echo to stdin (not -url flag) 3357 - depth: Mapped to -d flag (not -depth) 3358 - forms: Mapped to -s flag for showing sources 3359 - robots/sitemap/wayback: Mapped to -subs for subdomain inclusion 3360 - Always includes -u for unique URLs 3361 3362 Args: 3363 url: Target URL to crawl 3364 depth: Crawling depth (mapped to -d) 3365 forms: Include forms in crawling (mapped to -s) 3366 robots: Check robots.txt (mapped to -subs) 3367 sitemap: Check sitemap.xml (mapped to -subs) 3368 wayback: Use Wayback Machine (mapped to -subs) 3369 additional_args: Additional Hakrawler arguments 3370 3371 Returns: 3372 Web endpoint discovery results 3373 """ 3374 data = { 3375 "url": url, 3376 "depth": depth, 3377 "forms": forms, 3378 "robots": robots, 3379 "sitemap": sitemap, 3380 "wayback": wayback, 3381 "additional_args": additional_args 3382 } 3383 logger.info(f"๐ท๏ธ Starting Hakrawler crawling: {url}") 3384 result = hexstrike_client.safe_post("api/tools/hakrawler", data) 3385 if result.get("success"): 3386 logger.info(f"โ Hakrawler crawling completed") 3387 else: 3388 logger.error(f"โ Hakrawler crawling failed") 3389 return result 3390 3391 @mcp.tool() 3392 def httpx_probe(targets: str = "", target_file: str = "", ports: str = "", methods: str = "GET", status_code: str = "", content_length: bool = False, output_file: str = "", additional_args: str = "") -> Dict[str, Any]: 3393 """ 3394 Execute HTTPx for HTTP probing with enhanced logging. 3395 3396 Args: 3397 targets: Target URLs or IPs 3398 target_file: File containing targets 3399 ports: Ports to probe 3400 methods: HTTP methods to use 3401 status_code: Filter by status code 3402 content_length: Show content length 3403 output_file: Output file path 3404 additional_args: Additional HTTPx arguments 3405 3406 Returns: 3407 HTTP probing results 3408 """ 3409 data = { 3410 "targets": targets, 3411 "target_file": target_file, 3412 "ports": ports, 3413 "methods": methods, 3414 "status_code": status_code, 3415 "content_length": content_length, 3416 "output_file": output_file, 3417 "additional_args": additional_args 3418 } 3419 logger.info(f"๐ Starting HTTPx probing") 3420 result = hexstrike_client.safe_post("api/tools/httpx", data) 3421 if result.get("success"): 3422 logger.info(f"โ HTTPx probing completed") 3423 else: 3424 logger.error(f"โ HTTPx probing failed") 3425 return result 3426 3427 @mcp.tool() 3428 def paramspider_discovery(domain: str, exclude: str = "", output_file: str = "", level: int = 2, additional_args: str = "") -> Dict[str, Any]: 3429 """ 3430 Execute ParamSpider for parameter discovery with enhanced logging. 3431 3432 Args: 3433 domain: Target domain 3434 exclude: Extensions to exclude 3435 output_file: Output file path 3436 level: Crawling level 3437 additional_args: Additional ParamSpider arguments 3438 3439 Returns: 3440 Parameter discovery results 3441 """ 3442 data = { 3443 "domain": domain, 3444 "exclude": exclude, 3445 "output_file": output_file, 3446 "level": level, 3447 "additional_args": additional_args 3448 } 3449 logger.info(f"๐ Starting ParamSpider discovery: {domain}") 3450 result = hexstrike_client.safe_post("api/tools/paramspider", data) 3451 if result.get("success"): 3452 logger.info(f"โ ParamSpider discovery completed") 3453 else: 3454 logger.error(f"โ ParamSpider discovery failed") 3455 return result 3456 3457 # ============================================================================ 3458 # ADVANCED WEB SECURITY TOOLS CONTINUED 3459 # ============================================================================ 3460 3461 @mcp.tool() 3462 def burpsuite_scan(project_file: str = "", config_file: str = "", target: str = "", headless: bool = False, scan_type: str = "", scan_config: str = "", output_file: str = "", additional_args: str = "") -> Dict[str, Any]: 3463 """ 3464 Execute Burp Suite with enhanced logging. 3465 3466 Args: 3467 project_file: Burp project file path 3468 config_file: Burp configuration file path 3469 target: Target URL 3470 headless: Run in headless mode 3471 scan_type: Type of scan to perform 3472 scan_config: Scan configuration 3473 output_file: Output file path 3474 additional_args: Additional Burp Suite arguments 3475 3476 Returns: 3477 Burp Suite scan results 3478 """ 3479 data = { 3480 "project_file": project_file, 3481 "config_file": config_file, 3482 "target": target, 3483 "headless": headless, 3484 "scan_type": scan_type, 3485 "scan_config": scan_config, 3486 "output_file": output_file, 3487 "additional_args": additional_args 3488 } 3489 logger.info(f"๐ Starting Burp Suite scan") 3490 result = hexstrike_client.safe_post("api/tools/burpsuite", data) 3491 if result.get("success"): 3492 logger.info(f"โ Burp Suite scan completed") 3493 else: 3494 logger.error(f"โ Burp Suite scan failed") 3495 return result 3496 3497 @mcp.tool() 3498 def zap_scan(target: str = "", scan_type: str = "baseline", api_key: str = "", daemon: bool = False, port: str = "8090", host: str = "0.0.0.0", format_type: str = "xml", output_file: str = "", additional_args: str = "") -> Dict[str, Any]: 3499 """ 3500 Execute OWASP ZAP with enhanced logging. 3501 3502 Args: 3503 target: Target URL 3504 scan_type: Type of scan (baseline, full, api) 3505 api_key: ZAP API key 3506 daemon: Run in daemon mode 3507 port: Port for ZAP daemon 3508 host: Host for ZAP daemon 3509 format_type: Output format (xml, json, html) 3510 output_file: Output file path 3511 additional_args: Additional ZAP arguments 3512 3513 Returns: 3514 ZAP scan results 3515 """ 3516 data = { 3517 "target": target, 3518 "scan_type": scan_type, 3519 "api_key": api_key, 3520 "daemon": daemon, 3521 "port": port, 3522 "host": host, 3523 "format": format_type, 3524 "output_file": output_file, 3525 "additional_args": additional_args 3526 } 3527 logger.info(f"๐ Starting ZAP scan: {target}") 3528 result = hexstrike_client.safe_post("api/tools/zap", data) 3529 if result.get("success"): 3530 logger.info(f"โ ZAP scan completed for {target}") 3531 else: 3532 logger.error(f"โ ZAP scan failed for {target}") 3533 return result 3534 3535 @mcp.tool() 3536 def arjun_scan(url: str, method: str = "GET", data: str = "", headers: str = "", timeout: str = "", output_file: str = "", additional_args: str = "") -> Dict[str, Any]: 3537 """ 3538 Execute Arjun for parameter discovery with enhanced logging. 3539 3540 Args: 3541 url: Target URL 3542 method: HTTP method (GET, POST, etc.) 3543 data: POST data for testing 3544 headers: Custom headers 3545 timeout: Request timeout 3546 output_file: Output file path 3547 additional_args: Additional Arjun arguments 3548 3549 Returns: 3550 Parameter discovery results 3551 """ 3552 data = { 3553 "url": url, 3554 "method": method, 3555 "data": data, 3556 "headers": headers, 3557 "timeout": timeout, 3558 "output_file": output_file, 3559 "additional_args": additional_args 3560 } 3561 logger.info(f"๐ Starting Arjun parameter discovery: {url}") 3562 result = hexstrike_client.safe_post("api/tools/arjun", data) 3563 if result.get("success"): 3564 logger.info(f"โ Arjun completed for {url}") 3565 else: 3566 logger.error(f"โ Arjun failed for {url}") 3567 return result 3568 3569 @mcp.tool() 3570 def wafw00f_scan(target: str, additional_args: str = "") -> Dict[str, Any]: 3571 """ 3572 Execute wafw00f to identify and fingerprint WAF products with enhanced logging. 3573 3574 Args: 3575 target: Target URL or IP 3576 additional_args: Additional wafw00f arguments 3577 3578 Returns: 3579 WAF detection results 3580 """ 3581 data = { 3582 "target": target, 3583 "additional_args": additional_args 3584 } 3585 logger.info(f"๐ก๏ธ Starting Wafw00f WAF detection: {target}") 3586 result = hexstrike_client.safe_post("api/tools/wafw00f", data) 3587 if result.get("success"): 3588 logger.info(f"โ Wafw00f completed for {target}") 3589 else: 3590 logger.error(f"โ Wafw00f failed for {target}") 3591 return result 3592 3593 @mcp.tool() 3594 def fierce_scan(domain: str, dns_server: str = "", additional_args: str = "") -> Dict[str, Any]: 3595 """ 3596 Execute fierce for DNS reconnaissance with enhanced logging. 3597 3598 Args: 3599 domain: Target domain 3600 dns_server: DNS server to use 3601 additional_args: Additional fierce arguments 3602 3603 Returns: 3604 DNS reconnaissance results 3605 """ 3606 data = { 3607 "domain": domain, 3608 "dns_server": dns_server, 3609 "additional_args": additional_args 3610 } 3611 logger.info(f"๐ Starting Fierce DNS recon: {domain}") 3612 result = hexstrike_client.safe_post("api/tools/fierce", data) 3613 if result.get("success"): 3614 logger.info(f"โ Fierce completed for {domain}") 3615 else: 3616 logger.error(f"โ Fierce failed for {domain}") 3617 return result 3618 3619 @mcp.tool() 3620 def dnsenum_scan(domain: str, dns_server: str = "", wordlist: str = "", additional_args: str = "") -> Dict[str, Any]: 3621 """ 3622 Execute dnsenum for DNS enumeration with enhanced logging. 3623 3624 Args: 3625 domain: Target domain 3626 dns_server: DNS server to use 3627 wordlist: Wordlist for brute forcing 3628 additional_args: Additional dnsenum arguments 3629 3630 Returns: 3631 DNS enumeration results 3632 """ 3633 data = { 3634 "domain": domain, 3635 "dns_server": dns_server, 3636 "wordlist": wordlist, 3637 "additional_args": additional_args 3638 } 3639 logger.info(f"๐ Starting DNSenum: {domain}") 3640 result = hexstrike_client.safe_post("api/tools/dnsenum", data) 3641 if result.get("success"): 3642 logger.info(f"โ DNSenum completed for {domain}") 3643 else: 3644 logger.error(f"โ DNSenum failed for {domain}") 3645 return result 3646 3647 @mcp.tool() 3648 def autorecon_scan( 3649 target: str = "", 3650 target_file: str = "", 3651 ports: str = "", 3652 output_dir: str = "", 3653 max_scans: str = "", 3654 max_port_scans: str = "", 3655 heartbeat: str = "", 3656 timeout: str = "", 3657 target_timeout: str = "", 3658 config_file: str = "", 3659 global_file: str = "", 3660 plugins_dir: str = "", 3661 add_plugins_dir: str = "", 3662 tags: str = "", 3663 exclude_tags: str = "", 3664 port_scans: str = "", 3665 service_scans: str = "", 3666 reports: str = "", 3667 single_target: bool = False, 3668 only_scans_dir: bool = False, 3669 no_port_dirs: bool = False, 3670 nmap: str = "", 3671 nmap_append: str = "", 3672 proxychains: bool = False, 3673 disable_sanity_checks: bool = False, 3674 disable_keyboard_control: bool = False, 3675 force_services: str = "", 3676 accessible: bool = False, 3677 verbose: int = 0, 3678 curl_path: str = "", 3679 dirbuster_tool: str = "", 3680 dirbuster_wordlist: str = "", 3681 dirbuster_threads: str = "", 3682 dirbuster_ext: str = "", 3683 onesixtyone_community_strings: str = "", 3684 global_username_wordlist: str = "", 3685 global_password_wordlist: str = "", 3686 global_domain: str = "", 3687 additional_args: str = "" 3688 ) -> Dict[str, Any]: 3689 """ 3690 Execute AutoRecon for comprehensive target enumeration with full parameter support. 3691 3692 Args: 3693 target: Single target to scan 3694 target_file: File containing multiple targets 3695 ports: Specific ports to scan 3696 output_dir: Output directory 3697 max_scans: Maximum number of concurrent scans 3698 max_port_scans: Maximum number of concurrent port scans 3699 heartbeat: Heartbeat interval 3700 timeout: Global timeout 3701 target_timeout: Per-target timeout 3702 config_file: Configuration file path 3703 global_file: Global configuration file 3704 plugins_dir: Plugins directory 3705 add_plugins_dir: Additional plugins directory 3706 tags: Plugin tags to include 3707 exclude_tags: Plugin tags to exclude 3708 port_scans: Port scan plugins to run 3709 service_scans: Service scan plugins to run 3710 reports: Report plugins to run 3711 single_target: Use single target directory structure 3712 only_scans_dir: Only create scans directory 3713 no_port_dirs: Don't create port directories 3714 nmap: Custom nmap command 3715 nmap_append: Arguments to append to nmap 3716 proxychains: Use proxychains 3717 disable_sanity_checks: Disable sanity checks 3718 disable_keyboard_control: Disable keyboard control 3719 force_services: Force service detection 3720 accessible: Enable accessible output 3721 verbose: Verbosity level (0-3) 3722 curl_path: Custom curl path 3723 dirbuster_tool: Directory busting tool 3724 dirbuster_wordlist: Directory busting wordlist 3725 dirbuster_threads: Directory busting threads 3726 dirbuster_ext: Directory busting extensions 3727 onesixtyone_community_strings: SNMP community strings 3728 global_username_wordlist: Global username wordlist 3729 global_password_wordlist: Global password wordlist 3730 global_domain: Global domain 3731 additional_args: Additional AutoRecon arguments 3732 3733 Returns: 3734 Comprehensive enumeration results with full configurability 3735 """ 3736 data = { 3737 "target": target, 3738 "target_file": target_file, 3739 "ports": ports, 3740 "output_dir": output_dir, 3741 "max_scans": max_scans, 3742 "max_port_scans": max_port_scans, 3743 "heartbeat": heartbeat, 3744 "timeout": timeout, 3745 "target_timeout": target_timeout, 3746 "config_file": config_file, 3747 "global_file": global_file, 3748 "plugins_dir": plugins_dir, 3749 "add_plugins_dir": add_plugins_dir, 3750 "tags": tags, 3751 "exclude_tags": exclude_tags, 3752 "port_scans": port_scans, 3753 "service_scans": service_scans, 3754 "reports": reports, 3755 "single_target": single_target, 3756 "only_scans_dir": only_scans_dir, 3757 "no_port_dirs": no_port_dirs, 3758 "nmap": nmap, 3759 "nmap_append": nmap_append, 3760 "proxychains": proxychains, 3761 "disable_sanity_checks": disable_sanity_checks, 3762 "disable_keyboard_control": disable_keyboard_control, 3763 "force_services": force_services, 3764 "accessible": accessible, 3765 "verbose": verbose, 3766 "curl_path": curl_path, 3767 "dirbuster_tool": dirbuster_tool, 3768 "dirbuster_wordlist": dirbuster_wordlist, 3769 "dirbuster_threads": dirbuster_threads, 3770 "dirbuster_ext": dirbuster_ext, 3771 "onesixtyone_community_strings": onesixtyone_community_strings, 3772 "global_username_wordlist": global_username_wordlist, 3773 "global_password_wordlist": global_password_wordlist, 3774 "global_domain": global_domain, 3775 "additional_args": additional_args 3776 } 3777 logger.info(f"๐ Starting AutoRecon comprehensive enumeration: {target}") 3778 result = hexstrike_client.safe_post("api/tools/autorecon", data) 3779 if result.get("success"): 3780 logger.info(f"โ AutoRecon comprehensive enumeration completed for {target}") 3781 else: 3782 logger.error(f"โ AutoRecon failed for {target}") 3783 return result 3784 3785 # ============================================================================ 3786 # SYSTEM MONITORING & TELEMETRY 3787 # ============================================================================ 3788 3789 @mcp.tool() 3790 def server_health() -> Dict[str, Any]: 3791 """ 3792 Check the health status of the HexStrike AI server. 3793 3794 Returns: 3795 Server health information with tool availability and telemetry 3796 """ 3797 logger.info(f"๐ฅ Checking HexStrike AI server health") 3798 result = hexstrike_client.check_health() 3799 if result.get("status") == "healthy": 3800 logger.info(f"โ Server is healthy - {result.get('total_tools_available', 0)} tools available") 3801 else: 3802 logger.warning(f"โ ๏ธ Server health check returned: {result.get('status', 'unknown')}") 3803 return result 3804 3805 @mcp.tool() 3806 def get_cache_stats() -> Dict[str, Any]: 3807 """ 3808 Get cache statistics from the HexStrike AI server. 3809 3810 Returns: 3811 Cache performance statistics 3812 """ 3813 logger.info(f"๐พ Getting cache statistics") 3814 result = hexstrike_client.safe_get("api/cache/stats") 3815 if "hit_rate" in result: 3816 logger.info(f"๐ Cache hit rate: {result.get('hit_rate', 'unknown')}") 3817 return result 3818 3819 @mcp.tool() 3820 def clear_cache() -> Dict[str, Any]: 3821 """ 3822 Clear the cache on the HexStrike AI server. 3823 3824 Returns: 3825 Cache clear operation results 3826 """ 3827 logger.info(f"๐งน Clearing server cache") 3828 result = hexstrike_client.safe_post("api/cache/clear", {}) 3829 if result.get("success"): 3830 logger.info(f"โ Cache cleared successfully") 3831 else: 3832 logger.error(f"โ Failed to clear cache") 3833 return result 3834 3835 @mcp.tool() 3836 def get_telemetry() -> Dict[str, Any]: 3837 """ 3838 Get system telemetry from the HexStrike AI server. 3839 3840 Returns: 3841 System performance and usage telemetry 3842 """ 3843 logger.info(f"๐ Getting system telemetry") 3844 result = hexstrike_client.safe_get("api/telemetry") 3845 if "commands_executed" in result: 3846 logger.info(f"๐ Commands executed: {result.get('commands_executed', 0)}") 3847 return result 3848 3849 # ============================================================================ 3850 # PROCESS MANAGEMENT TOOLS (v5.0 ENHANCEMENT) 3851 # ============================================================================ 3852 3853 @mcp.tool() 3854 def list_active_processes() -> Dict[str, Any]: 3855 """ 3856 List all active processes on the HexStrike AI server. 3857 3858 Returns: 3859 List of active processes with their status and progress 3860 """ 3861 logger.info("๐ Listing active processes") 3862 result = hexstrike_client.safe_get("api/processes/list") 3863 if result.get("success"): 3864 logger.info(f"โ Found {result.get('total_count', 0)} active processes") 3865 else: 3866 logger.error("โ Failed to list processes") 3867 return result 3868 3869 @mcp.tool() 3870 def get_process_status(pid: int) -> Dict[str, Any]: 3871 """ 3872 Get the status of a specific process. 3873 3874 Args: 3875 pid: Process ID to check 3876 3877 Returns: 3878 Process status information including progress and runtime 3879 """ 3880 logger.info(f"๐ Checking status of process {pid}") 3881 result = hexstrike_client.safe_get(f"api/processes/status/{pid}") 3882 if result.get("success"): 3883 logger.info(f"โ Process {pid} status retrieved") 3884 else: 3885 logger.error(f"โ Process {pid} not found or error occurred") 3886 return result 3887 3888 @mcp.tool() 3889 def terminate_process(pid: int) -> Dict[str, Any]: 3890 """ 3891 Terminate a specific running process. 3892 3893 Args: 3894 pid: Process ID to terminate 3895 3896 Returns: 3897 Success status of the termination operation 3898 """ 3899 logger.info(f"๐ Terminating process {pid}") 3900 result = hexstrike_client.safe_post(f"api/processes/terminate/{pid}", {}) 3901 if result.get("success"): 3902 logger.info(f"โ Process {pid} terminated successfully") 3903 else: 3904 logger.error(f"โ Failed to terminate process {pid}") 3905 return result 3906 3907 @mcp.tool() 3908 def pause_process(pid: int) -> Dict[str, Any]: 3909 """ 3910 Pause a specific running process. 3911 3912 Args: 3913 pid: Process ID to pause 3914 3915 Returns: 3916 Success status of the pause operation 3917 """ 3918 logger.info(f"โธ๏ธ Pausing process {pid}") 3919 result = hexstrike_client.safe_post(f"api/processes/pause/{pid}", {}) 3920 if result.get("success"): 3921 logger.info(f"โ Process {pid} paused successfully") 3922 else: 3923 logger.error(f"โ Failed to pause process {pid}") 3924 return result 3925 3926 @mcp.tool() 3927 def resume_process(pid: int) -> Dict[str, Any]: 3928 """ 3929 Resume a paused process. 3930 3931 Args: 3932 pid: Process ID to resume 3933 3934 Returns: 3935 Success status of the resume operation 3936 """ 3937 logger.info(f"โถ๏ธ Resuming process {pid}") 3938 result = hexstrike_client.safe_post(f"api/processes/resume/{pid}", {}) 3939 if result.get("success"): 3940 logger.info(f"โ Process {pid} resumed successfully") 3941 else: 3942 logger.error(f"โ Failed to resume process {pid}") 3943 return result 3944 3945 @mcp.tool() 3946 def get_process_dashboard() -> Dict[str, Any]: 3947 """ 3948 Get enhanced process dashboard with visual status indicators. 3949 3950 Returns: 3951 Real-time dashboard with progress bars, system metrics, and process status 3952 """ 3953 logger.info("๐ Getting process dashboard") 3954 result = hexstrike_client.safe_get("api/processes/dashboard") 3955 if result.get("success", True) and "total_processes" in result: 3956 total = result.get("total_processes", 0) 3957 logger.info(f"โ Dashboard retrieved: {total} active processes") 3958 3959 # Log visual summary for better UX 3960 if total > 0: 3961 logger.info("๐ Active Processes Summary:") 3962 for proc in result.get("processes", [])[:3]: # Show first 3 3963 logger.info(f" โโ PID {proc['pid']}: {proc['progress_bar']} {proc['progress_percent']}") 3964 else: 3965 logger.error("โ Failed to get process dashboard") 3966 return result 3967 3968 @mcp.tool() 3969 def execute_command(command: str, use_cache: bool = True) -> Dict[str, Any]: 3970 """ 3971 Execute an arbitrary command on the HexStrike AI server with enhanced logging. 3972 3973 Args: 3974 command: The command to execute 3975 use_cache: Whether to use caching for this command 3976 3977 Returns: 3978 Command execution results with enhanced telemetry 3979 """ 3980 try: 3981 logger.info(f"โก Executing command: {command}") 3982 result = hexstrike_client.execute_command(command, use_cache) 3983 if "error" in result: 3984 logger.error(f"โ Command failed: {result['error']}") 3985 return { 3986 "success": False, 3987 "error": result["error"], 3988 "stdout": "", 3989 "stderr": f"Error executing command: {result['error']}" 3990 } 3991 3992 if result.get("success"): 3993 execution_time = result.get("execution_time", 0) 3994 logger.info(f"โ Command completed successfully in {execution_time:.2f}s") 3995 else: 3996 logger.warning(f"โ ๏ธ Command completed with errors") 3997 3998 return result 3999 except Exception as e: 4000 logger.error(f"๐ฅ Error executing command '{command}': {str(e)}") 4001 return { 4002 "success": False, 4003 "error": str(e), 4004 "stdout": "", 4005 "stderr": f"Error executing command: {str(e)}" 4006 } 4007 4008 # ============================================================================ 4009 # ADVANCED VULNERABILITY INTELLIGENCE MCP TOOLS (v6.0 ENHANCEMENT) 4010 # ============================================================================ 4011 4012 @mcp.tool() 4013 def monitor_cve_feeds(hours: int = 24, severity_filter: str = "HIGH,CRITICAL", keywords: str = "") -> Dict[str, Any]: 4014 """ 4015 Monitor CVE databases for new vulnerabilities with AI analysis. 4016 4017 Args: 4018 hours: Hours to look back for new CVEs (default: 24) 4019 severity_filter: Filter by CVSS severity - comma-separated values (LOW,MEDIUM,HIGH,CRITICAL,ALL) 4020 keywords: Filter CVEs by keywords in description (comma-separated) 4021 4022 Returns: 4023 Latest CVEs with exploitability analysis and threat intelligence 4024 4025 Example: 4026 monitor_cve_feeds(48, "CRITICAL", "remote code execution") 4027 """ 4028 data = { 4029 "hours": hours, 4030 "severity_filter": severity_filter, 4031 "keywords": keywords 4032 } 4033 logger.info(f"๐ Monitoring CVE feeds for last {hours} hours | Severity: {severity_filter}") 4034 result = hexstrike_client.safe_post("api/vuln-intel/cve-monitor", data) 4035 4036 if result.get("success"): 4037 cve_count = len(result.get("cve_monitoring", {}).get("cves", [])) 4038 exploit_analysis_count = len(result.get("exploitability_analysis", [])) 4039 logger.info(f"โ Found {cve_count} CVEs with {exploit_analysis_count} exploitability analyses") 4040 4041 return result 4042 4043 @mcp.tool() 4044 def generate_exploit_from_cve(cve_id: str, target_os: str = "", target_arch: str = "x64", exploit_type: str = "poc", evasion_level: str = "none") -> Dict[str, Any]: 4045 """ 4046 Generate working exploits from CVE information using AI-powered analysis. 4047 4048 Args: 4049 cve_id: CVE identifier (e.g., CVE-2024-1234) 4050 target_os: Target operating system (windows, linux, macos, any) 4051 target_arch: Target architecture (x86, x64, arm, any) 4052 exploit_type: Type of exploit to generate (poc, weaponized, stealth) 4053 evasion_level: Evasion sophistication (none, basic, advanced) 4054 4055 Returns: 4056 Generated exploit code with testing instructions and evasion techniques 4057 4058 Example: 4059 generate_exploit_from_cve("CVE-2024-1234", "linux", "x64", "weaponized", "advanced") 4060 """ 4061 data = { 4062 "cve_id": cve_id, 4063 "target_os": target_os, 4064 "target_arch": target_arch, 4065 "exploit_type": exploit_type, 4066 "evasion_level": evasion_level 4067 } 4068 logger.info(f"๐ค Generating {exploit_type} exploit for {cve_id} | Target: {target_os} {target_arch}") 4069 result = hexstrike_client.safe_post("api/vuln-intel/exploit-generate", data) 4070 4071 if result.get("success"): 4072 cve_analysis = result.get("cve_analysis", {}) 4073 exploit_gen = result.get("exploit_generation", {}) 4074 exploitability = cve_analysis.get("exploitability_level", "UNKNOWN") 4075 exploit_success = exploit_gen.get("success", False) 4076 4077 logger.info(f"๐ CVE Analysis: {exploitability} exploitability") 4078 logger.info(f"๐ฏ Exploit Generation: {'SUCCESS' if exploit_success else 'FAILED'}") 4079 4080 return result 4081 4082 @mcp.tool() 4083 def discover_attack_chains(target_software: str, attack_depth: int = 3, include_zero_days: bool = False) -> Dict[str, Any]: 4084 """ 4085 Discover multi-stage attack chains for target software with vulnerability correlation. 4086 4087 Args: 4088 target_software: Target software/system (e.g., "Apache HTTP Server", "Windows Server 2019") 4089 attack_depth: Maximum number of stages in attack chain (1-5) 4090 include_zero_days: Include potential zero-day vulnerabilities in analysis 4091 4092 Returns: 4093 Attack chains with vulnerability combinations, success probabilities, and exploit availability 4094 4095 Example: 4096 discover_attack_chains("Apache HTTP Server 2.4", 4, True) 4097 """ 4098 data = { 4099 "target_software": target_software, 4100 "attack_depth": min(max(attack_depth, 1), 5), # Clamp between 1-5 4101 "include_zero_days": include_zero_days 4102 } 4103 logger.info(f"๐ Discovering attack chains for {target_software} | Depth: {attack_depth} | Zero-days: {include_zero_days}") 4104 result = hexstrike_client.safe_post("api/vuln-intel/attack-chains", data) 4105 4106 if result.get("success"): 4107 chains = result.get("attack_chain_discovery", {}).get("attack_chains", []) 4108 enhanced_chains = result.get("attack_chain_discovery", {}).get("enhanced_chains", []) 4109 4110 logger.info(f"๐ Found {len(chains)} attack chains") 4111 if enhanced_chains: 4112 logger.info(f"๐ฏ Enhanced {len(enhanced_chains)} chains with exploit analysis") 4113 4114 return result 4115 4116 @mcp.tool() 4117 def research_zero_day_opportunities(target_software: str, analysis_depth: str = "standard", source_code_url: str = "") -> Dict[str, Any]: 4118 """ 4119 Automated zero-day vulnerability research using AI analysis and pattern recognition. 4120 4121 Args: 4122 target_software: Software to research for vulnerabilities (e.g., "nginx", "OpenSSL") 4123 analysis_depth: Depth of analysis (quick, standard, comprehensive) 4124 source_code_url: URL to source code repository for enhanced analysis 4125 4126 Returns: 4127 Potential vulnerability areas with exploitation feasibility and research recommendations 4128 4129 Example: 4130 research_zero_day_opportunities("nginx 1.20", "comprehensive", "https://github.com/nginx/nginx") 4131 """ 4132 if analysis_depth not in ["quick", "standard", "comprehensive"]: 4133 analysis_depth = "standard" 4134 4135 data = { 4136 "target_software": target_software, 4137 "analysis_depth": analysis_depth, 4138 "source_code_url": source_code_url 4139 } 4140 logger.info(f"๐ฌ Researching zero-day opportunities in {target_software} | Depth: {analysis_depth}") 4141 result = hexstrike_client.safe_post("api/vuln-intel/zero-day-research", data) 4142 4143 if result.get("success"): 4144 research = result.get("zero_day_research", {}) 4145 potential_vulns = len(research.get("potential_vulnerabilities", [])) 4146 risk_score = research.get("risk_assessment", {}).get("risk_score", 0) 4147 4148 logger.info(f"๐ Found {potential_vulns} potential vulnerability areas") 4149 logger.info(f"๐ฏ Risk Score: {risk_score}/100") 4150 4151 return result 4152 4153 @mcp.tool() 4154 def correlate_threat_intelligence(indicators: str, timeframe: str = "30d", sources: str = "all") -> Dict[str, Any]: 4155 """ 4156 Correlate threat intelligence across multiple sources with advanced analysis. 4157 4158 Args: 4159 indicators: Comma-separated IOCs (IPs, domains, hashes, CVEs, etc.) 4160 timeframe: Time window for correlation (7d, 30d, 90d, 1y) 4161 sources: Intelligence sources to query (cve, exploit-db, github, twitter, all) 4162 4163 Returns: 4164 Correlated threat intelligence with attribution, timeline, and threat scoring 4165 4166 Example: 4167 correlate_threat_intelligence("CVE-2024-1234,192.168.1.100,malware.exe", "90d", "all") 4168 """ 4169 # Validate timeframe 4170 valid_timeframes = ["7d", "30d", "90d", "1y"] 4171 if timeframe not in valid_timeframes: 4172 timeframe = "30d" 4173 4174 # Parse indicators 4175 indicator_list = [i.strip() for i in indicators.split(",") if i.strip()] 4176 4177 if not indicator_list: 4178 logger.error("โ No valid indicators provided") 4179 return {"success": False, "error": "No valid indicators provided"} 4180 4181 data = { 4182 "indicators": indicator_list, 4183 "timeframe": timeframe, 4184 "sources": sources 4185 } 4186 logger.info(f"๐ง Correlating threat intelligence for {len(indicator_list)} indicators | Timeframe: {timeframe}") 4187 result = hexstrike_client.safe_post("api/vuln-intel/threat-feeds", data) 4188 4189 if result.get("success"): 4190 threat_intel = result.get("threat_intelligence", {}) 4191 correlations = len(threat_intel.get("correlations", [])) 4192 threat_score = threat_intel.get("threat_score", 0) 4193 4194 logger.info(f"๐ Found {correlations} threat correlations") 4195 logger.info(f"๐ฏ Overall Threat Score: {threat_score:.1f}/100") 4196 4197 return result 4198 4199 @mcp.tool() 4200 def advanced_payload_generation(attack_type: str, target_context: str = "", evasion_level: str = "standard", custom_constraints: str = "") -> Dict[str, Any]: 4201 """ 4202 Generate advanced payloads with AI-powered evasion techniques and contextual adaptation. 4203 4204 Args: 4205 attack_type: Type of attack (rce, privilege_escalation, persistence, exfiltration, xss, sqli) 4206 target_context: Target environment details (OS, software versions, security controls) 4207 evasion_level: Evasion sophistication (basic, standard, advanced, nation-state) 4208 custom_constraints: Custom payload constraints (size limits, character restrictions, etc.) 4209 4210 Returns: 4211 Advanced payloads with multiple evasion techniques and deployment instructions 4212 4213 Example: 4214 advanced_payload_generation("rce", "Windows 11 + Defender + AppLocker", "nation-state", "max_size:256,no_quotes") 4215 """ 4216 valid_attack_types = ["rce", "privilege_escalation", "persistence", "exfiltration", "xss", "sqli", "lfi", "ssrf"] 4217 valid_evasion_levels = ["basic", "standard", "advanced", "nation-state"] 4218 4219 if attack_type not in valid_attack_types: 4220 attack_type = "rce" 4221 4222 if evasion_level not in valid_evasion_levels: 4223 evasion_level = "standard" 4224 4225 data = { 4226 "attack_type": attack_type, 4227 "target_context": target_context, 4228 "evasion_level": evasion_level, 4229 "custom_constraints": custom_constraints 4230 } 4231 logger.info(f"๐ฏ Generating advanced {attack_type} payload | Evasion: {evasion_level}") 4232 if target_context: 4233 logger.info(f"๐ฏ Target Context: {target_context}") 4234 4235 result = hexstrike_client.safe_post("api/ai/advanced-payload-generation", data) 4236 4237 if result.get("success"): 4238 payload_gen = result.get("advanced_payload_generation", {}) 4239 payload_count = payload_gen.get("payload_count", 0) 4240 evasion_applied = payload_gen.get("evasion_level", "none") 4241 4242 logger.info(f"๐ Generated {payload_count} advanced payloads") 4243 logger.info(f"๐ก๏ธ Evasion Level Applied: {evasion_applied}") 4244 4245 return result 4246 4247 @mcp.tool() 4248 def vulnerability_intelligence_dashboard() -> Dict[str, Any]: 4249 """ 4250 Get a comprehensive vulnerability intelligence dashboard with latest threats and trends. 4251 4252 Returns: 4253 Dashboard with latest CVEs, trending vulnerabilities, exploit availability, and threat landscape 4254 4255 Example: 4256 vulnerability_intelligence_dashboard() 4257 """ 4258 logger.info("๐ Generating vulnerability intelligence dashboard") 4259 4260 # Get latest critical CVEs 4261 latest_cves = hexstrike_client.safe_post("api/vuln-intel/cve-monitor", { 4262 "hours": 24, 4263 "severity_filter": "CRITICAL", 4264 "keywords": "" 4265 }) 4266 4267 # Get trending attack types 4268 trending_research = hexstrike_client.safe_post("api/vuln-intel/zero-day-research", { 4269 "target_software": "web applications", 4270 "analysis_depth": "quick" 4271 }) 4272 4273 # Compile dashboard 4274 dashboard = { 4275 "timestamp": time.time(), 4276 "latest_critical_cves": latest_cves.get("cve_monitoring", {}).get("cves", [])[:5], 4277 "threat_landscape": { 4278 "high_risk_software": ["Apache HTTP Server", "Microsoft Exchange", "VMware vCenter", "Fortinet FortiOS"], 4279 "trending_attack_vectors": ["Supply chain attacks", "Cloud misconfigurations", "Zero-day exploits", "AI-powered attacks"], 4280 "active_threat_groups": ["APT29", "Lazarus Group", "FIN7", "REvil"], 4281 }, 4282 "exploit_intelligence": { 4283 "new_public_exploits": "Simulated data - check exploit-db for real data", 4284 "weaponized_exploits": "Monitor threat intelligence feeds", 4285 "exploit_kits": "Track underground markets" 4286 }, 4287 "recommendations": [ 4288 "Prioritize patching for critical CVEs discovered in last 24h", 4289 "Monitor for zero-day activity in trending attack vectors", 4290 "Implement advanced threat detection for active threat groups", 4291 "Review security controls against nation-state level attacks" 4292 ] 4293 } 4294 4295 logger.info("โ Vulnerability intelligence dashboard generated") 4296 return { 4297 "success": True, 4298 "dashboard": dashboard 4299 } 4300 4301 @mcp.tool() 4302 def threat_hunting_assistant(target_environment: str, threat_indicators: str = "", hunt_focus: str = "general") -> Dict[str, Any]: 4303 """ 4304 AI-powered threat hunting assistant with vulnerability correlation and attack simulation. 4305 4306 Args: 4307 target_environment: Environment to hunt in (e.g., "Windows Domain", "Cloud Infrastructure") 4308 threat_indicators: Known IOCs or suspicious indicators to investigate 4309 hunt_focus: Focus area (general, apt, ransomware, insider_threat, supply_chain) 4310 4311 Returns: 4312 Threat hunting playbook with detection queries, IOCs, and investigation steps 4313 4314 Example: 4315 threat_hunting_assistant("Windows Domain", "suspicious_process.exe,192.168.1.100", "apt") 4316 """ 4317 valid_hunt_focus = ["general", "apt", "ransomware", "insider_threat", "supply_chain"] 4318 if hunt_focus not in valid_hunt_focus: 4319 hunt_focus = "general" 4320 4321 logger.info(f"๐ Generating threat hunting playbook for {target_environment} | Focus: {hunt_focus}") 4322 4323 # Parse indicators if provided 4324 indicators = [i.strip() for i in threat_indicators.split(",") if i.strip()] if threat_indicators else [] 4325 4326 # Generate hunting playbook 4327 hunting_playbook = { 4328 "target_environment": target_environment, 4329 "hunt_focus": hunt_focus, 4330 "indicators_analyzed": indicators, 4331 "detection_queries": [], 4332 "investigation_steps": [], 4333 "threat_scenarios": [], 4334 "mitigation_strategies": [] 4335 } 4336 4337 # Environment-specific detection queries 4338 if "windows" in target_environment.lower(): 4339 hunting_playbook["detection_queries"] = [ 4340 "Get-WinEvent | Where-Object {$_.Id -eq 4688 -and $_.Message -like '*suspicious*'}", 4341 "Get-Process | Where-Object {$_.ProcessName -notin @('explorer.exe', 'svchost.exe')}", 4342 "Get-ItemProperty HKLM:\\Software\\Microsoft\\Windows\\CurrentVersion\\Run", 4343 "Get-NetTCPConnection | Where-Object {$_.State -eq 'Established' -and $_.RemoteAddress -notlike '10.*'}" 4344 ] 4345 elif "cloud" in target_environment.lower(): 4346 hunting_playbook["detection_queries"] = [ 4347 "CloudTrail logs for unusual API calls", 4348 "Failed authentication attempts from unknown IPs", 4349 "Privilege escalation events", 4350 "Data exfiltration indicators" 4351 ] 4352 4353 # Focus-specific threat scenarios 4354 focus_scenarios = { 4355 "apt": [ 4356 "Spear phishing with weaponized documents", 4357 "Living-off-the-land techniques", 4358 "Lateral movement via stolen credentials", 4359 "Data staging and exfiltration" 4360 ], 4361 "ransomware": [ 4362 "Initial access via RDP/VPN", 4363 "Privilege escalation and persistence", 4364 "Shadow copy deletion", 4365 "Encryption and ransom note deployment" 4366 ], 4367 "insider_threat": [ 4368 "Unusual data access patterns", 4369 "After-hours activity", 4370 "Large data downloads", 4371 "Access to sensitive systems" 4372 ] 4373 } 4374 4375 hunting_playbook["threat_scenarios"] = focus_scenarios.get(hunt_focus, [ 4376 "Unauthorized access attempts", 4377 "Suspicious process execution", 4378 "Network anomalies", 4379 "Data access violations" 4380 ]) 4381 4382 # Investigation steps 4383 hunting_playbook["investigation_steps"] = [ 4384 "1. Validate initial indicators and expand IOC list", 4385 "2. Run detection queries and analyze results", 4386 "3. Correlate events across multiple data sources", 4387 "4. Identify affected systems and user accounts", 4388 "5. Assess scope and impact of potential compromise", 4389 "6. Implement containment measures if threat confirmed", 4390 "7. Document findings and update detection rules" 4391 ] 4392 4393 # Correlate with vulnerability intelligence if indicators provided 4394 if indicators: 4395 logger.info(f"๐ง Correlating {len(indicators)} indicators with threat intelligence") 4396 correlation_result = correlate_threat_intelligence(",".join(indicators), "30d", "all") 4397 4398 if correlation_result.get("success"): 4399 hunting_playbook["threat_correlation"] = correlation_result.get("threat_intelligence", {}) 4400 4401 logger.info("โ Threat hunting playbook generated") 4402 return { 4403 "success": True, 4404 "hunting_playbook": hunting_playbook 4405 } 4406 4407 # ============================================================================ 4408 # ENHANCED VISUAL OUTPUT TOOLS 4409 # ============================================================================ 4410 4411 @mcp.tool() 4412 def get_live_dashboard() -> Dict[str, Any]: 4413 """ 4414 Get a beautiful live dashboard showing all active processes with enhanced visual formatting. 4415 4416 Returns: 4417 Live dashboard with visual process monitoring and system metrics 4418 """ 4419 logger.info("๐ Fetching live process dashboard") 4420 result = hexstrike_client.safe_get("api/processes/dashboard") 4421 if result.get("success", True): 4422 logger.info("โ Live dashboard retrieved successfully") 4423 else: 4424 logger.error("โ Failed to retrieve live dashboard") 4425 return result 4426 4427 @mcp.tool() 4428 def create_vulnerability_report(vulnerabilities: str, target: str = "", scan_type: str = "comprehensive") -> Dict[str, Any]: 4429 """ 4430 Create a beautiful vulnerability report with severity-based styling and visual indicators. 4431 4432 Args: 4433 vulnerabilities: JSON string containing vulnerability data 4434 target: Target that was scanned 4435 scan_type: Type of scan performed 4436 4437 Returns: 4438 Formatted vulnerability report with visual enhancements 4439 """ 4440 import json 4441 4442 try: 4443 # Parse vulnerabilities if provided as JSON string 4444 if isinstance(vulnerabilities, str): 4445 vuln_data = json.loads(vulnerabilities) 4446 else: 4447 vuln_data = vulnerabilities 4448 4449 logger.info(f"๐ Creating vulnerability report for {len(vuln_data)} findings") 4450 4451 # Create individual vulnerability cards 4452 vulnerability_cards = [] 4453 for vuln in vuln_data: 4454 card_result = hexstrike_client.safe_post("api/visual/vulnerability-card", vuln) 4455 if card_result.get("success"): 4456 vulnerability_cards.append(card_result.get("vulnerability_card", "")) 4457 4458 # Create summary report 4459 summary_data = { 4460 "target": target, 4461 "vulnerabilities": vuln_data, 4462 "tools_used": [scan_type], 4463 "execution_time": 0 4464 } 4465 4466 summary_result = hexstrike_client.safe_post("api/visual/summary-report", summary_data) 4467 4468 logger.info("โ Vulnerability report created successfully") 4469 return { 4470 "success": True, 4471 "vulnerability_cards": vulnerability_cards, 4472 "summary_report": summary_result.get("summary_report", ""), 4473 "total_vulnerabilities": len(vuln_data), 4474 "timestamp": summary_result.get("timestamp", "") 4475 } 4476 4477 except Exception as e: 4478 logger.error(f"โ Failed to create vulnerability report: {str(e)}") 4479 return {"success": False, "error": str(e)} 4480 4481 @mcp.tool() 4482 def format_tool_output_visual(tool_name: str, output: str, success: bool = True) -> Dict[str, Any]: 4483 """ 4484 Format tool output with beautiful visual styling, syntax highlighting, and structure. 4485 4486 Args: 4487 tool_name: Name of the security tool 4488 output: Raw output from the tool 4489 success: Whether the tool execution was successful 4490 4491 Returns: 4492 Beautifully formatted tool output with visual enhancements 4493 """ 4494 logger.info(f"๐จ Formatting output for {tool_name}") 4495 4496 data = { 4497 "tool": tool_name, 4498 "output": output, 4499 "success": success 4500 } 4501 4502 result = hexstrike_client.safe_post("api/visual/tool-output", data) 4503 if result.get("success"): 4504 logger.info(f"โ Tool output formatted successfully for {tool_name}") 4505 else: 4506 logger.error(f"โ Failed to format tool output for {tool_name}") 4507 4508 return result 4509 4510 @mcp.tool() 4511 def create_scan_summary(target: str, tools_used: str, vulnerabilities_found: int = 0, 4512 execution_time: float = 0.0, findings: str = "") -> Dict[str, Any]: 4513 """ 4514 Create a comprehensive scan summary report with beautiful visual formatting. 4515 4516 Args: 4517 target: Target that was scanned 4518 tools_used: Comma-separated list of tools used 4519 vulnerabilities_found: Number of vulnerabilities discovered 4520 execution_time: Total execution time in seconds 4521 findings: Additional findings or notes 4522 4523 Returns: 4524 Beautiful scan summary report with visual enhancements 4525 """ 4526 logger.info(f"๐ Creating scan summary for {target}") 4527 4528 tools_list = [tool.strip() for tool in tools_used.split(",")] 4529 4530 summary_data = { 4531 "target": target, 4532 "tools_used": tools_list, 4533 "execution_time": execution_time, 4534 "vulnerabilities": [{"severity": "info"}] * vulnerabilities_found, # Mock data for count 4535 "findings": findings 4536 } 4537 4538 result = hexstrike_client.safe_post("api/visual/summary-report", summary_data) 4539 if result.get("success"): 4540 logger.info("โ Scan summary created successfully") 4541 else: 4542 logger.error("โ Failed to create scan summary") 4543 4544 return result 4545 4546 @mcp.tool() 4547 def display_system_metrics() -> Dict[str, Any]: 4548 """ 4549 Display current system metrics and performance indicators with visual formatting. 4550 4551 Returns: 4552 System metrics with beautiful visual presentation 4553 """ 4554 logger.info("๐ Fetching system metrics") 4555 4556 # Get telemetry data 4557 telemetry_result = hexstrike_client.safe_get("api/telemetry") 4558 4559 if telemetry_result.get("success", True): 4560 logger.info("โ System metrics retrieved successfully") 4561 4562 # Format the metrics for better display 4563 metrics = telemetry_result.get("system_metrics", {}) 4564 stats = { 4565 "cpu_percent": metrics.get("cpu_percent", 0), 4566 "memory_percent": metrics.get("memory_percent", 0), 4567 "disk_usage": metrics.get("disk_usage", 0), 4568 "uptime_seconds": telemetry_result.get("uptime_seconds", 0), 4569 "commands_executed": telemetry_result.get("commands_executed", 0), 4570 "success_rate": telemetry_result.get("success_rate", "0%") 4571 } 4572 4573 return { 4574 "success": True, 4575 "metrics": stats, 4576 "formatted_display": f""" 4577 ๐ฅ๏ธ System Performance Metrics: 4578 โโ CPU Usage: {stats['cpu_percent']:.1f}% 4579 โโ Memory Usage: {stats['memory_percent']:.1f}% 4580 โโ Disk Usage: {stats['disk_usage']:.1f}% 4581 โโ Uptime: {stats['uptime_seconds']:.0f}s 4582 โโ Commands Executed: {stats['commands_executed']} 4583 โโ Success Rate: {stats['success_rate']} 4584 """, 4585 "timestamp": telemetry_result.get("timestamp", "") 4586 } 4587 else: 4588 logger.error("โ Failed to retrieve system metrics") 4589 return telemetry_result 4590 4591 # ============================================================================ 4592 # INTELLIGENT DECISION ENGINE TOOLS 4593 # ============================================================================ 4594 4595 @mcp.tool() 4596 def analyze_target_intelligence(target: str) -> Dict[str, Any]: 4597 """ 4598 Analyze target using AI-powered intelligence to create comprehensive profile. 4599 4600 Args: 4601 target: Target URL, IP address, or domain to analyze 4602 4603 Returns: 4604 Comprehensive target profile with technology detection, risk assessment, and recommendations 4605 """ 4606 logger.info(f"๐ง Analyzing target intelligence for: {target}") 4607 4608 data = {"target": target} 4609 result = hexstrike_client.safe_post("api/intelligence/analyze-target", data) 4610 4611 if result.get("success"): 4612 profile = result.get("target_profile", {}) 4613 logger.info(f"โ Target analysis completed - Type: {profile.get('target_type')}, Risk: {profile.get('risk_level')}") 4614 else: 4615 logger.error(f"โ Target analysis failed for {target}") 4616 4617 return result 4618 4619 @mcp.tool() 4620 def select_optimal_tools_ai(target: str, objective: str = "comprehensive") -> Dict[str, Any]: 4621 """ 4622 Use AI to select optimal security tools based on target analysis and testing objective. 4623 4624 Args: 4625 target: Target to analyze 4626 objective: Testing objective - "comprehensive", "quick", or "stealth" 4627 4628 Returns: 4629 AI-selected optimal tools with effectiveness ratings and target profile 4630 """ 4631 logger.info(f"๐ฏ Selecting optimal tools for {target} with objective: {objective}") 4632 4633 data = { 4634 "target": target, 4635 "objective": objective 4636 } 4637 result = hexstrike_client.safe_post("api/intelligence/select-tools", data) 4638 4639 if result.get("success"): 4640 tools = result.get("selected_tools", []) 4641 logger.info(f"โ AI selected {len(tools)} optimal tools: {', '.join(tools[:3])}{'...' if len(tools) > 3 else ''}") 4642 else: 4643 logger.error(f"โ Tool selection failed for {target}") 4644 4645 return result 4646 4647 @mcp.tool() 4648 def optimize_tool_parameters_ai(target: str, tool: str, context: str = "{}") -> Dict[str, Any]: 4649 """ 4650 Use AI to optimize tool parameters based on target profile and context. 4651 4652 Args: 4653 target: Target to test 4654 tool: Security tool to optimize 4655 context: JSON string with additional context (stealth, aggressive, etc.) 4656 4657 Returns: 4658 AI-optimized parameters for maximum effectiveness 4659 """ 4660 import json 4661 4662 logger.info(f"โ๏ธ Optimizing parameters for {tool} against {target}") 4663 4664 try: 4665 context_dict = json.loads(context) if context != "{}" else {} 4666 except: 4667 context_dict = {} 4668 4669 data = { 4670 "target": target, 4671 "tool": tool, 4672 "context": context_dict 4673 } 4674 result = hexstrike_client.safe_post("api/intelligence/optimize-parameters", data) 4675 4676 if result.get("success"): 4677 params = result.get("optimized_parameters", {}) 4678 logger.info(f"โ Parameters optimized for {tool} - {len(params)} parameters configured") 4679 else: 4680 logger.error(f"โ Parameter optimization failed for {tool}") 4681 4682 return result 4683 4684 @mcp.tool() 4685 def create_attack_chain_ai(target: str, objective: str = "comprehensive") -> Dict[str, Any]: 4686 """ 4687 Create an intelligent attack chain using AI-driven tool sequencing and optimization. 4688 4689 Args: 4690 target: Target for the attack chain 4691 objective: Attack objective - "comprehensive", "quick", or "stealth" 4692 4693 Returns: 4694 AI-generated attack chain with success probability and time estimates 4695 """ 4696 logger.info(f"โ๏ธ Creating AI-driven attack chain for {target}") 4697 4698 data = { 4699 "target": target, 4700 "objective": objective 4701 } 4702 result = hexstrike_client.safe_post("api/intelligence/create-attack-chain", data) 4703 4704 if result.get("success"): 4705 chain = result.get("attack_chain", {}) 4706 steps = len(chain.get("steps", [])) 4707 success_prob = chain.get("success_probability", 0) 4708 estimated_time = chain.get("estimated_time", 0) 4709 4710 logger.info(f"โ Attack chain created - {steps} steps, {success_prob:.2f} success probability, ~{estimated_time}s") 4711 else: 4712 logger.error(f"โ Attack chain creation failed for {target}") 4713 4714 return result 4715 4716 @mcp.tool() 4717 def intelligent_smart_scan(target: str, objective: str = "comprehensive", max_tools: int = 5) -> Dict[str, Any]: 4718 """ 4719 Execute an intelligent scan using AI-driven tool selection and parameter optimization. 4720 4721 Args: 4722 target: Target to scan 4723 objective: Scanning objective - "comprehensive", "quick", or "stealth" 4724 max_tools: Maximum number of tools to use 4725 4726 Returns: 4727 Results from AI-optimized scanning with tool execution summary 4728 """ 4729 logger.info(f"{HexStrikeColors.FIRE_RED}๐ Starting intelligent smart scan for {target}{HexStrikeColors.RESET}") 4730 4731 data = { 4732 "target": target, 4733 "objective": objective, 4734 "max_tools": max_tools 4735 } 4736 result = hexstrike_client.safe_post("api/intelligence/smart-scan", data) 4737 4738 if result.get("success"): 4739 scan_results = result.get("scan_results", {}) 4740 tools_executed = scan_results.get("tools_executed", []) 4741 execution_summary = scan_results.get("execution_summary", {}) 4742 4743 # Enhanced logging with detailed results 4744 logger.info(f"{HexStrikeColors.SUCCESS}โ Intelligent scan completed for {target}{HexStrikeColors.RESET}") 4745 logger.info(f"{HexStrikeColors.CYBER_ORANGE}๐ Execution Summary:{HexStrikeColors.RESET}") 4746 logger.info(f" โข Tools executed: {execution_summary.get('successful_tools', 0)}/{execution_summary.get('total_tools', 0)}") 4747 logger.info(f" โข Success rate: {execution_summary.get('success_rate', 0):.1f}%") 4748 logger.info(f" โข Total vulnerabilities: {scan_results.get('total_vulnerabilities', 0)}") 4749 logger.info(f" โข Execution time: {execution_summary.get('total_execution_time', 0):.2f}s") 4750 4751 # Log successful tools 4752 successful_tools = [t['tool'] for t in tools_executed if t.get('success')] 4753 if successful_tools: 4754 logger.info(f"{HexStrikeColors.HIGHLIGHT_GREEN} Successful tools: {', '.join(successful_tools)} {HexStrikeColors.RESET}") 4755 4756 # Log failed tools 4757 failed_tools = [t['tool'] for t in tools_executed if not t.get('success')] 4758 if failed_tools: 4759 logger.warning(f"{HexStrikeColors.HIGHLIGHT_RED} Failed tools: {', '.join(failed_tools)} {HexStrikeColors.RESET}") 4760 4761 # Log vulnerabilities found 4762 if scan_results.get('total_vulnerabilities', 0) > 0: 4763 logger.warning(f"{HexStrikeColors.VULN_HIGH}๐จ {scan_results['total_vulnerabilities']} vulnerabilities detected!{HexStrikeColors.RESET}") 4764 else: 4765 logger.error(f"{HexStrikeColors.ERROR}โ Intelligent scan failed for {target}: {result.get('error', 'Unknown error')}{HexStrikeColors.RESET}") 4766 4767 return result 4768 4769 @mcp.tool() 4770 def detect_technologies_ai(target: str) -> Dict[str, Any]: 4771 """ 4772 Use AI to detect technologies and provide technology-specific testing recommendations. 4773 4774 Args: 4775 target: Target to analyze for technology detection 4776 4777 Returns: 4778 Detected technologies with AI-generated testing recommendations 4779 """ 4780 logger.info(f"๐ Detecting technologies for {target}") 4781 4782 data = {"target": target} 4783 result = hexstrike_client.safe_post("api/intelligence/technology-detection", data) 4784 4785 if result.get("success"): 4786 technologies = result.get("detected_technologies", []) 4787 cms = result.get("cms_type") 4788 recommendations = result.get("technology_recommendations", {}) 4789 4790 tech_info = f"Technologies: {', '.join(technologies)}" 4791 if cms: 4792 tech_info += f", CMS: {cms}" 4793 4794 logger.info(f"โ Technology detection completed - {tech_info}") 4795 logger.info(f"๐ Generated {len(recommendations)} technology-specific recommendations") 4796 else: 4797 logger.error(f"โ Technology detection failed for {target}") 4798 4799 return result 4800 4801 @mcp.tool() 4802 def ai_reconnaissance_workflow(target: str, depth: str = "standard") -> Dict[str, Any]: 4803 """ 4804 Execute AI-driven reconnaissance workflow with intelligent tool chaining. 4805 4806 Args: 4807 target: Target for reconnaissance 4808 depth: Reconnaissance depth - "surface", "standard", or "deep" 4809 4810 Returns: 4811 Comprehensive reconnaissance results with AI-driven insights 4812 """ 4813 logger.info(f"๐ต๏ธ Starting AI reconnaissance workflow for {target} (depth: {depth})") 4814 4815 # First analyze the target 4816 analysis_result = hexstrike_client.safe_post("api/intelligence/analyze-target", {"target": target}) 4817 4818 if not analysis_result.get("success"): 4819 return analysis_result 4820 4821 # Create attack chain for reconnaissance 4822 objective = "comprehensive" if depth == "deep" else "quick" if depth == "surface" else "comprehensive" 4823 chain_result = hexstrike_client.safe_post("api/intelligence/create-attack-chain", { 4824 "target": target, 4825 "objective": objective 4826 }) 4827 4828 if not chain_result.get("success"): 4829 return chain_result 4830 4831 # Execute the reconnaissance 4832 scan_result = hexstrike_client.safe_post("api/intelligence/smart-scan", { 4833 "target": target, 4834 "objective": objective, 4835 "max_tools": 8 if depth == "deep" else 3 if depth == "surface" else 5 4836 }) 4837 4838 logger.info(f"โ AI reconnaissance workflow completed for {target}") 4839 4840 return { 4841 "success": True, 4842 "target": target, 4843 "depth": depth, 4844 "target_analysis": analysis_result.get("target_profile", {}), 4845 "attack_chain": chain_result.get("attack_chain", {}), 4846 "scan_results": scan_result.get("scan_results", {}), 4847 "timestamp": datetime.now().isoformat() 4848 } 4849 4850 @mcp.tool() 4851 def ai_vulnerability_assessment(target: str, focus_areas: str = "all") -> Dict[str, Any]: 4852 """ 4853 Perform AI-driven vulnerability assessment with intelligent prioritization. 4854 4855 Args: 4856 target: Target for vulnerability assessment 4857 focus_areas: Comma-separated focus areas - "web", "network", "api", "all" 4858 4859 Returns: 4860 Prioritized vulnerability assessment results with AI insights 4861 """ 4862 logger.info(f"๐ฌ Starting AI vulnerability assessment for {target}") 4863 4864 # Analyze target first 4865 analysis_result = hexstrike_client.safe_post("api/intelligence/analyze-target", {"target": target}) 4866 4867 if not analysis_result.get("success"): 4868 return analysis_result 4869 4870 profile = analysis_result.get("target_profile", {}) 4871 target_type = profile.get("target_type", "unknown") 4872 4873 # Select tools based on focus areas and target type 4874 if focus_areas == "all": 4875 objective = "comprehensive" 4876 elif "web" in focus_areas and target_type == "web_application": 4877 objective = "comprehensive" 4878 elif "network" in focus_areas and target_type == "network_host": 4879 objective = "comprehensive" 4880 else: 4881 objective = "quick" 4882 4883 # Execute vulnerability assessment 4884 scan_result = hexstrike_client.safe_post("api/intelligence/smart-scan", { 4885 "target": target, 4886 "objective": objective, 4887 "max_tools": 6 4888 }) 4889 4890 logger.info(f"โ AI vulnerability assessment completed for {target}") 4891 4892 return { 4893 "success": True, 4894 "target": target, 4895 "focus_areas": focus_areas, 4896 "target_analysis": profile, 4897 "vulnerability_scan": scan_result.get("scan_results", {}), 4898 "risk_assessment": { 4899 "risk_level": profile.get("risk_level", "unknown"), 4900 "attack_surface_score": profile.get("attack_surface_score", 0), 4901 "confidence_score": profile.get("confidence_score", 0) 4902 }, 4903 "timestamp": datetime.now().isoformat() 4904 } 4905 4906 # ============================================================================ 4907 # BUG BOUNTY HUNTING SPECIALIZED WORKFLOWS 4908 # ============================================================================ 4909 4910 @mcp.tool() 4911 def bugbounty_reconnaissance_workflow(domain: str, scope: str = "", out_of_scope: str = "", 4912 program_type: str = "web") -> Dict[str, Any]: 4913 """ 4914 Create comprehensive reconnaissance workflow for bug bounty hunting. 4915 4916 Args: 4917 domain: Target domain for bug bounty 4918 scope: Comma-separated list of in-scope domains/IPs 4919 out_of_scope: Comma-separated list of out-of-scope domains/IPs 4920 program_type: Type of program (web, api, mobile, iot) 4921 4922 Returns: 4923 Comprehensive reconnaissance workflow with phases and tools 4924 """ 4925 data = { 4926 "domain": domain, 4927 "scope": scope.split(",") if scope else [], 4928 "out_of_scope": out_of_scope.split(",") if out_of_scope else [], 4929 "program_type": program_type 4930 } 4931 4932 logger.info(f"๐ฏ Creating reconnaissance workflow for {domain}") 4933 result = hexstrike_client.safe_post("api/bugbounty/reconnaissance-workflow", data) 4934 4935 if result.get("success"): 4936 workflow = result.get("workflow", {}) 4937 logger.info(f"โ Reconnaissance workflow created - {workflow.get('tools_count', 0)} tools, ~{workflow.get('estimated_time', 0)}s") 4938 else: 4939 logger.error(f"โ Failed to create reconnaissance workflow for {domain}") 4940 4941 return result 4942 4943 @mcp.tool() 4944 def bugbounty_vulnerability_hunting(domain: str, priority_vulns: str = "rce,sqli,xss,idor,ssrf", 4945 bounty_range: str = "unknown") -> Dict[str, Any]: 4946 """ 4947 Create vulnerability hunting workflow prioritized by impact and bounty potential. 4948 4949 Args: 4950 domain: Target domain for bug bounty 4951 priority_vulns: Comma-separated list of priority vulnerability types 4952 bounty_range: Expected bounty range (low, medium, high, critical) 4953 4954 Returns: 4955 Vulnerability hunting workflow prioritized by impact 4956 """ 4957 data = { 4958 "domain": domain, 4959 "priority_vulns": priority_vulns.split(",") if priority_vulns else [], 4960 "bounty_range": bounty_range 4961 } 4962 4963 logger.info(f"๐ฏ Creating vulnerability hunting workflow for {domain}") 4964 result = hexstrike_client.safe_post("api/bugbounty/vulnerability-hunting-workflow", data) 4965 4966 if result.get("success"): 4967 workflow = result.get("workflow", {}) 4968 logger.info(f"โ Vulnerability hunting workflow created - Priority score: {workflow.get('priority_score', 0)}") 4969 else: 4970 logger.error(f"โ Failed to create vulnerability hunting workflow for {domain}") 4971 4972 return result 4973 4974 @mcp.tool() 4975 def bugbounty_business_logic_testing(domain: str, program_type: str = "web") -> Dict[str, Any]: 4976 """ 4977 Create business logic testing workflow for advanced bug bounty hunting. 4978 4979 Args: 4980 domain: Target domain for bug bounty 4981 program_type: Type of program (web, api, mobile) 4982 4983 Returns: 4984 Business logic testing workflow with manual and automated tests 4985 """ 4986 data = { 4987 "domain": domain, 4988 "program_type": program_type 4989 } 4990 4991 logger.info(f"๐ฏ Creating business logic testing workflow for {domain}") 4992 result = hexstrike_client.safe_post("api/bugbounty/business-logic-workflow", data) 4993 4994 if result.get("success"): 4995 workflow = result.get("workflow", {}) 4996 test_count = sum(len(category["tests"]) for category in workflow.get("business_logic_tests", [])) 4997 logger.info(f"โ Business logic testing workflow created - {test_count} tests") 4998 else: 4999 logger.error(f"โ Failed to create business logic testing workflow for {domain}") 5000 5001 return result 5002 5003 @mcp.tool() 5004 def bugbounty_osint_gathering(domain: str) -> Dict[str, Any]: 5005 """ 5006 Create OSINT (Open Source Intelligence) gathering workflow for bug bounty reconnaissance. 5007 5008 Args: 5009 domain: Target domain for OSINT gathering 5010 5011 Returns: 5012 OSINT gathering workflow with multiple intelligence phases 5013 """ 5014 data = {"domain": domain} 5015 5016 logger.info(f"๐ฏ Creating OSINT gathering workflow for {domain}") 5017 result = hexstrike_client.safe_post("api/bugbounty/osint-workflow", data) 5018 5019 if result.get("success"): 5020 workflow = result.get("workflow", {}) 5021 phases = len(workflow.get("osint_phases", [])) 5022 logger.info(f"โ OSINT workflow created - {phases} intelligence phases") 5023 else: 5024 logger.error(f"โ Failed to create OSINT workflow for {domain}") 5025 5026 return result 5027 5028 @mcp.tool() 5029 def bugbounty_file_upload_testing(target_url: str) -> Dict[str, Any]: 5030 """ 5031 Create file upload vulnerability testing workflow with bypass techniques. 5032 5033 Args: 5034 target_url: Target URL with file upload functionality 5035 5036 Returns: 5037 File upload testing workflow with malicious files and bypass techniques 5038 """ 5039 data = {"target_url": target_url} 5040 5041 logger.info(f"๐ฏ Creating file upload testing workflow for {target_url}") 5042 result = hexstrike_client.safe_post("api/bugbounty/file-upload-testing", data) 5043 5044 if result.get("success"): 5045 workflow = result.get("workflow", {}) 5046 phases = len(workflow.get("test_phases", [])) 5047 logger.info(f"โ File upload testing workflow created - {phases} test phases") 5048 else: 5049 logger.error(f"โ Failed to create file upload testing workflow for {target_url}") 5050 5051 return result 5052 5053 @mcp.tool() 5054 def bugbounty_comprehensive_assessment(domain: str, scope: str = "", 5055 priority_vulns: str = "rce,sqli,xss,idor,ssrf", 5056 include_osint: bool = True, 5057 include_business_logic: bool = True) -> Dict[str, Any]: 5058 """ 5059 Create comprehensive bug bounty assessment combining all specialized workflows. 5060 5061 Args: 5062 domain: Target domain for bug bounty 5063 scope: Comma-separated list of in-scope domains/IPs 5064 priority_vulns: Comma-separated list of priority vulnerability types 5065 include_osint: Include OSINT gathering workflow 5066 include_business_logic: Include business logic testing workflow 5067 5068 Returns: 5069 Comprehensive bug bounty assessment with all workflows and summary 5070 """ 5071 data = { 5072 "domain": domain, 5073 "scope": scope.split(",") if scope else [], 5074 "priority_vulns": priority_vulns.split(",") if priority_vulns else [], 5075 "include_osint": include_osint, 5076 "include_business_logic": include_business_logic 5077 } 5078 5079 logger.info(f"๐ฏ Creating comprehensive bug bounty assessment for {domain}") 5080 result = hexstrike_client.safe_post("api/bugbounty/comprehensive-assessment", data) 5081 5082 if result.get("success"): 5083 assessment = result.get("assessment", {}) 5084 summary = assessment.get("summary", {}) 5085 logger.info(f"โ Comprehensive assessment created - {summary.get('workflow_count', 0)} workflows, ~{summary.get('total_estimated_time', 0)}s") 5086 else: 5087 logger.error(f"โ Failed to create comprehensive assessment for {domain}") 5088 5089 return result 5090 5091 @mcp.tool() 5092 def bugbounty_authentication_bypass_testing(target_url: str, auth_type: str = "form") -> Dict[str, Any]: 5093 """ 5094 Create authentication bypass testing workflow for bug bounty hunting. 5095 5096 Args: 5097 target_url: Target URL with authentication 5098 auth_type: Type of authentication (form, jwt, oauth, saml) 5099 5100 Returns: 5101 Authentication bypass testing strategies and techniques 5102 """ 5103 bypass_techniques = { 5104 "form": [ 5105 {"technique": "SQL Injection", "payloads": ["admin'--", "' OR '1'='1'--"]}, 5106 {"technique": "Default Credentials", "payloads": ["admin:admin", "admin:password"]}, 5107 {"technique": "Password Reset", "description": "Test password reset token reuse and manipulation"}, 5108 {"technique": "Session Fixation", "description": "Test session ID prediction and fixation"} 5109 ], 5110 "jwt": [ 5111 {"technique": "Algorithm Confusion", "description": "Change RS256 to HS256"}, 5112 {"technique": "None Algorithm", "description": "Set algorithm to 'none'"}, 5113 {"technique": "Key Confusion", "description": "Use public key as HMAC secret"}, 5114 {"technique": "Token Manipulation", "description": "Modify claims and resign token"} 5115 ], 5116 "oauth": [ 5117 {"technique": "Redirect URI Manipulation", "description": "Test open redirect in redirect_uri"}, 5118 {"technique": "State Parameter", "description": "Test CSRF via missing/weak state parameter"}, 5119 {"technique": "Code Reuse", "description": "Test authorization code reuse"}, 5120 {"technique": "Client Secret", "description": "Test for exposed client secrets"} 5121 ], 5122 "saml": [ 5123 {"technique": "XML Signature Wrapping", "description": "Manipulate SAML assertions"}, 5124 {"technique": "XML External Entity", "description": "Test XXE in SAML requests"}, 5125 {"technique": "Replay Attacks", "description": "Test assertion replay"}, 5126 {"technique": "Signature Bypass", "description": "Test signature validation bypass"} 5127 ] 5128 } 5129 5130 workflow = { 5131 "target": target_url, 5132 "auth_type": auth_type, 5133 "bypass_techniques": bypass_techniques.get(auth_type, []), 5134 "testing_phases": [ 5135 {"phase": "reconnaissance", "description": "Identify authentication mechanisms"}, 5136 {"phase": "baseline_testing", "description": "Test normal authentication flow"}, 5137 {"phase": "bypass_testing", "description": "Apply bypass techniques"}, 5138 {"phase": "privilege_escalation", "description": "Test for privilege escalation"} 5139 ], 5140 "estimated_time": 240, 5141 "manual_testing_required": True 5142 } 5143 5144 logger.info(f"๐ฏ Created authentication bypass testing workflow for {target_url}") 5145 5146 return { 5147 "success": True, 5148 "workflow": workflow, 5149 "timestamp": datetime.now().isoformat() 5150 } 5151 5152 # ============================================================================ 5153 # ENHANCED HTTP TESTING FRAMEWORK & BROWSER AGENT (BURP SUITE ALTERNATIVE) 5154 # ============================================================================ 5155 5156 @mcp.tool() 5157 def http_framework_test(url: str, method: str = "GET", data: dict = {}, 5158 headers: dict = {}, cookies: dict = {}, action: str = "request") -> Dict[str, Any]: 5159 """ 5160 Enhanced HTTP testing framework (Burp Suite alternative) for comprehensive web security testing. 5161 5162 Args: 5163 url: Target URL to test 5164 method: HTTP method (GET, POST, PUT, DELETE, etc.) 5165 data: Request data/parameters 5166 headers: Custom headers 5167 cookies: Custom cookies 5168 action: Action to perform (request, spider, proxy_history, set_rules, set_scope, repeater, intruder) 5169 5170 Returns: 5171 HTTP testing results with vulnerability analysis 5172 """ 5173 data_payload = { 5174 "url": url, 5175 "method": method, 5176 "data": data, 5177 "headers": headers, 5178 "cookies": cookies, 5179 "action": action 5180 } 5181 5182 logger.info(f"{HexStrikeColors.FIRE_RED}๐ฅ Starting HTTP Framework {action}: {url}{HexStrikeColors.RESET}") 5183 result = hexstrike_client.safe_post("api/tools/http-framework", data_payload) 5184 5185 if result.get("success"): 5186 logger.info(f"{HexStrikeColors.SUCCESS}โ HTTP Framework {action} completed for {url}{HexStrikeColors.RESET}") 5187 5188 # Enhanced logging for vulnerabilities found 5189 if result.get("result", {}).get("vulnerabilities"): 5190 vuln_count = len(result["result"]["vulnerabilities"]) 5191 logger.info(f"{HexStrikeColors.HIGHLIGHT_RED} Found {vuln_count} potential vulnerabilities {HexStrikeColors.RESET}") 5192 else: 5193 logger.error(f"{HexStrikeColors.ERROR}โ HTTP Framework {action} failed for {url}{HexStrikeColors.RESET}") 5194 5195 return result 5196 5197 @mcp.tool() 5198 def browser_agent_inspect(url: str, headless: bool = True, wait_time: int = 5, 5199 action: str = "navigate", proxy_port: int = None, active_tests: bool = False) -> Dict[str, Any]: 5200 """ 5201 AI-powered browser agent for comprehensive web application inspection and security analysis. 5202 5203 Args: 5204 url: Target URL to inspect 5205 headless: Run browser in headless mode 5206 wait_time: Time to wait after page load 5207 action: Action to perform (navigate, screenshot, close, status) 5208 proxy_port: Optional proxy port for request interception 5209 active_tests: Run lightweight active reflected XSS tests (safe GET-only) 5210 5211 Returns: 5212 Browser inspection results with security analysis 5213 """ 5214 data_payload = { 5215 "url": url, 5216 "headless": headless, 5217 "wait_time": wait_time, 5218 "action": action, 5219 "proxy_port": proxy_port, 5220 "active_tests": active_tests 5221 } 5222 5223 logger.info(f"{HexStrikeColors.CRIMSON}๐ Starting Browser Agent {action}: {url}{HexStrikeColors.RESET}") 5224 result = hexstrike_client.safe_post("api/tools/browser-agent", data_payload) 5225 5226 if result.get("success"): 5227 logger.info(f"{HexStrikeColors.SUCCESS}โ Browser Agent {action} completed for {url}{HexStrikeColors.RESET}") 5228 5229 # Enhanced logging for security analysis 5230 if action == "navigate" and result.get("result", {}).get("security_analysis"): 5231 security_analysis = result["result"]["security_analysis"] 5232 issues_count = security_analysis.get("total_issues", 0) 5233 security_score = security_analysis.get("security_score", 0) 5234 5235 if issues_count > 0: 5236 logger.warning(f"{HexStrikeColors.HIGHLIGHT_YELLOW} Security Issues: {issues_count} | Score: {security_score}/100 {HexStrikeColors.RESET}") 5237 else: 5238 logger.info(f"{HexStrikeColors.HIGHLIGHT_GREEN} No security issues found | Score: {security_score}/100 {HexStrikeColors.RESET}") 5239 else: 5240 logger.error(f"{HexStrikeColors.ERROR}โ Browser Agent {action} failed for {url}{HexStrikeColors.RESET}") 5241 5242 return result 5243 5244 # ---------------- Additional HTTP Framework Tools (sync with server) ---------------- 5245 @mcp.tool() 5246 def http_set_rules(rules: list) -> Dict[str, Any]: 5247 """Set match/replace rules used to rewrite parts of URL/query/headers/body before sending. 5248 Rule format: {'where':'url|query|headers|body','pattern':'regex','replacement':'string'}""" 5249 payload = {"action": "set_rules", "rules": rules} 5250 return hexstrike_client.safe_post("api/tools/http-framework", payload) 5251 5252 @mcp.tool() 5253 def http_set_scope(host: str, include_subdomains: bool = True) -> Dict[str, Any]: 5254 """Define in-scope host (and optionally subdomains) so out-of-scope requests are skipped.""" 5255 payload = {"action": "set_scope", "host": host, "include_subdomains": include_subdomains} 5256 return hexstrike_client.safe_post("api/tools/http-framework", payload) 5257 5258 @mcp.tool() 5259 def http_repeater(request_spec: dict) -> Dict[str, Any]: 5260 """Send a crafted request (Burp Repeater equivalent). request_spec keys: url, method, headers, cookies, data.""" 5261 payload = {"action": "repeater", "request": request_spec} 5262 return hexstrike_client.safe_post("api/tools/http-framework", payload) 5263 5264 @mcp.tool() 5265 def http_intruder(url: str, method: str = "GET", location: str = "query", params: list = None, 5266 payloads: list = None, base_data: dict = None, max_requests: int = 100) -> Dict[str, Any]: 5267 """Simple Intruder (sniper) fuzzing. Iterates payloads over each param individually. 5268 location: query|body|headers|cookie.""" 5269 payload = { 5270 "action": "intruder", 5271 "url": url, 5272 "method": method, 5273 "location": location, 5274 "params": params or [], 5275 "payloads": payloads or [], 5276 "base_data": base_data or {}, 5277 "max_requests": max_requests 5278 } 5279 return hexstrike_client.safe_post("api/tools/http-framework", payload) 5280 5281 @mcp.tool() 5282 def burpsuite_alternative_scan(target: str, scan_type: str = "comprehensive", 5283 headless: bool = True, max_depth: int = 3, 5284 max_pages: int = 50) -> Dict[str, Any]: 5285 """ 5286 Comprehensive Burp Suite alternative combining HTTP framework and browser agent for complete web security testing. 5287 5288 Args: 5289 target: Target URL or domain to scan 5290 scan_type: Type of scan (comprehensive, spider, passive, active) 5291 headless: Run browser in headless mode 5292 max_depth: Maximum crawling depth 5293 max_pages: Maximum pages to analyze 5294 5295 Returns: 5296 Comprehensive security assessment results 5297 """ 5298 data_payload = { 5299 "target": target, 5300 "scan_type": scan_type, 5301 "headless": headless, 5302 "max_depth": max_depth, 5303 "max_pages": max_pages 5304 } 5305 5306 logger.info(f"{HexStrikeColors.BLOOD_RED}๐ฅ Starting Burp Suite Alternative {scan_type} scan: {target}{HexStrikeColors.RESET}") 5307 result = hexstrike_client.safe_post("api/tools/burpsuite-alternative", data_payload) 5308 5309 if result.get("success"): 5310 logger.info(f"{HexStrikeColors.SUCCESS}โ Burp Suite Alternative scan completed for {target}{HexStrikeColors.RESET}") 5311 5312 # Enhanced logging for comprehensive results 5313 if result.get("result", {}).get("summary"): 5314 summary = result["result"]["summary"] 5315 total_vulns = summary.get("total_vulnerabilities", 0) 5316 pages_analyzed = summary.get("pages_analyzed", 0) 5317 security_score = summary.get("security_score", 0) 5318 5319 logger.info(f"{HexStrikeColors.HIGHLIGHT_BLUE} SCAN SUMMARY {HexStrikeColors.RESET}") 5320 logger.info(f" ๐ Pages Analyzed: {pages_analyzed}") 5321 logger.info(f" ๐จ Vulnerabilities: {total_vulns}") 5322 logger.info(f" ๐ก๏ธ Security Score: {security_score}/100") 5323 5324 # Log vulnerability breakdown 5325 vuln_breakdown = summary.get("vulnerability_breakdown", {}) 5326 for severity, count in vuln_breakdown.items(): 5327 if count > 0: 5328 color = { 5329 'critical': HexStrikeColors.CRITICAL, 5330 'high': HexStrikeColors.FIRE_RED, 5331 'medium': HexStrikeColors.CYBER_ORANGE, 5332 'low': HexStrikeColors.YELLOW, 5333 'info': HexStrikeColors.INFO 5334 }.get(severity.lower(), HexStrikeColors.WHITE) 5335 5336 logger.info(f" {color}{severity.upper()}: {count}{HexStrikeColors.RESET}") 5337 else: 5338 logger.error(f"{HexStrikeColors.ERROR}โ Burp Suite Alternative scan failed for {target}{HexStrikeColors.RESET}") 5339 5340 return result 5341 5342 @mcp.tool() 5343 def error_handling_statistics() -> Dict[str, Any]: 5344 """ 5345 Get intelligent error handling system statistics and recent error patterns. 5346 5347 Returns: 5348 Error handling statistics and patterns 5349 """ 5350 logger.info(f"{HexStrikeColors.ELECTRIC_PURPLE}๐ Retrieving error handling statistics{HexStrikeColors.RESET}") 5351 result = hexstrike_client.safe_get("api/error-handling/statistics") 5352 5353 if result.get("success"): 5354 stats = result.get("statistics", {}) 5355 total_errors = stats.get("total_errors", 0) 5356 recent_errors = stats.get("recent_errors_count", 0) 5357 5358 logger.info(f"{HexStrikeColors.SUCCESS}โ Error statistics retrieved{HexStrikeColors.RESET}") 5359 logger.info(f" ๐ Total Errors: {total_errors}") 5360 logger.info(f" ๐ Recent Errors: {recent_errors}") 5361 5362 # Log error breakdown by type 5363 error_counts = stats.get("error_counts_by_type", {}) 5364 if error_counts: 5365 logger.info(f"{HexStrikeColors.HIGHLIGHT_BLUE} ERROR BREAKDOWN {HexStrikeColors.RESET}") 5366 for error_type, count in error_counts.items(): 5367 logger.info(f" {HexStrikeColors.FIRE_RED}{error_type}: {count}{HexStrikeColors.RESET}") 5368 else: 5369 logger.error(f"{HexStrikeColors.ERROR}โ Failed to retrieve error statistics{HexStrikeColors.RESET}") 5370 5371 return result 5372 5373 @mcp.tool() 5374 def test_error_recovery(tool_name: str, error_type: str = "timeout", 5375 target: str = "example.com") -> Dict[str, Any]: 5376 """ 5377 Test the intelligent error recovery system with simulated failures. 5378 5379 Args: 5380 tool_name: Name of tool to simulate error for 5381 error_type: Type of error to simulate (timeout, permission_denied, network_unreachable, etc.) 5382 target: Target for the simulated test 5383 5384 Returns: 5385 Recovery strategy and system response 5386 """ 5387 data_payload = { 5388 "tool_name": tool_name, 5389 "error_type": error_type, 5390 "target": target 5391 } 5392 5393 logger.info(f"{HexStrikeColors.RUBY}๐งช Testing error recovery for {tool_name} with {error_type}{HexStrikeColors.RESET}") 5394 result = hexstrike_client.safe_post("api/error-handling/test-recovery", data_payload) 5395 5396 if result.get("success"): 5397 recovery_strategy = result.get("recovery_strategy", {}) 5398 action = recovery_strategy.get("action", "unknown") 5399 success_prob = recovery_strategy.get("success_probability", 0) 5400 5401 logger.info(f"{HexStrikeColors.SUCCESS}โ Error recovery test completed{HexStrikeColors.RESET}") 5402 logger.info(f" ๐ง Recovery Action: {action}") 5403 logger.info(f" ๐ Success Probability: {success_prob:.2%}") 5404 5405 # Log alternative tools if available 5406 alternatives = result.get("alternative_tools", []) 5407 if alternatives: 5408 logger.info(f" ๐ Alternative Tools: {', '.join(alternatives)}") 5409 else: 5410 logger.error(f"{HexStrikeColors.ERROR}โ Error recovery test failed{HexStrikeColors.RESET}") 5411 5412 return result 5413 5414 return mcp 5415 5416 def parse_args(): 5417 """Parse command line arguments.""" 5418 parser = argparse.ArgumentParser(description="Run the HexStrike AI MCP Client") 5419 parser.add_argument("--server", type=str, default=DEFAULT_HEXSTRIKE_SERVER, 5420 help=f"HexStrike AI API server URL (default: {DEFAULT_HEXSTRIKE_SERVER})") 5421 parser.add_argument("--timeout", type=int, default=DEFAULT_REQUEST_TIMEOUT, 5422 help=f"Request timeout in seconds (default: {DEFAULT_REQUEST_TIMEOUT})") 5423 parser.add_argument("--debug", action="store_true", help="Enable debug logging") 5424 return parser.parse_args() 5425 5426 def main(): 5427 """Main entry point for the MCP server.""" 5428 args = parse_args() 5429 5430 # Configure logging based on debug flag 5431 if args.debug: 5432 logger.setLevel(logging.DEBUG) 5433 logger.debug("๐ Debug logging enabled") 5434 5435 # MCP compatibility: No banner output to avoid JSON parsing issues 5436 logger.info(f"๐ Starting HexStrike AI MCP Client v6.0") 5437 logger.info(f"๐ Connecting to: {args.server}") 5438 5439 try: 5440 # Initialize the HexStrike AI client 5441 hexstrike_client = HexStrikeClient(args.server, args.timeout) 5442 5443 # Check server health and log the result 5444 health = hexstrike_client.check_health() 5445 if "error" in health: 5446 logger.warning(f"โ ๏ธ Unable to connect to HexStrike AI API server at {args.server}: {health['error']}") 5447 logger.warning("๐ MCP server will start, but tool execution may fail") 5448 else: 5449 logger.info(f"๐ฏ Successfully connected to HexStrike AI API server at {args.server}") 5450 logger.info(f"๐ฅ Server health status: {health['status']}") 5451 logger.info(f"๐ Version: {health.get('version', 'unknown')}") 5452 if not health.get("all_essential_tools_available", False): 5453 logger.warning("โ ๏ธ Not all essential tools are available on the HexStrike server") 5454 missing_tools = [tool for tool, available in health.get("tools_status", {}).items() if not available] 5455 if missing_tools: 5456 logger.warning(f"โ Missing tools: {', '.join(missing_tools[:5])}{'...' if len(missing_tools) > 5 else ''}") 5457 5458 # Set up and run the MCP server 5459 mcp = setup_mcp_server(hexstrike_client) 5460 logger.info("๐ Starting HexStrike AI MCP server") 5461 logger.info("๐ค Ready to serve AI agents with enhanced cybersecurity capabilities") 5462 mcp.run() 5463 except Exception as e: 5464 logger.error(f"๐ฅ Error starting MCP server: {str(e)}") 5465 import traceback 5466 logger.error(traceback.format_exc()) 5467 sys.exit(1) 5468 5469 if __name__ == "__main__": 5470 main()