/ mcp_tool_context_estimator.py
mcp_tool_context_estimator.py
1 #!/usr/bin/env python 2 """ 3 MCP Tool Context Estimator 4 5 This script connects to an already running MCP server and estimates how much 6 of an LLM's context window would be consumed by the registered tools when 7 they're sent to the model via the Model Context Protocol. 8 """ 9 10 import argparse 11 import asyncio 12 import json 13 import os 14 import sys 15 import traceback 16 from typing import Any, Dict, List, Optional 17 18 import aiohttp 19 import tiktoken 20 from mcp import ClientSession 21 from mcp.client.sse import sse_client 22 from mcp.client.stdio import stdio_client 23 from rich.console import Console 24 from rich.table import Table 25 26 # Add the current directory to the Python path to ensure we can import modules 27 sys.path.append("/data/projects/ultimate_mcp_server") 28 29 # Import the existing decouple configuration from the project 30 from ultimate_mcp_server.config import decouple_config 31 32 # Import actual model pricing from constants 33 from ultimate_mcp_server.constants import COST_PER_MILLION_TOKENS 34 35 # Removed dependency on STANDALONE_TOOL_FUNCTIONS to avoid circular imports 36 # from ultimate_mcp_server.tools import STANDALONE_TOOL_FUNCTIONS 37 38 39 # Define a function to read tool names from a file generated by the server 40 def read_tool_names_from_file(filename="tools_list.json", quiet=False): 41 """Read tool names from a JSON file generated by the server""" 42 console = Console() 43 try: 44 if os.path.exists(filename): 45 with open(filename, "r") as f: 46 tool_data = json.load(f) 47 if not quiet: 48 console.print( 49 f"[green]Successfully loaded {len(tool_data)} tools from {filename}[/green]" 50 ) 51 return tool_data 52 else: 53 if not quiet: 54 console.print( 55 f"[yellow]Tool list file {filename} not found. Will use server-provided tools only.[/yellow]" 56 ) 57 return [] 58 except Exception as e: 59 if not quiet: 60 console.print(f"[red]Error reading tool list: {str(e)}[/red]") 61 return [] 62 63 64 # Run another server with --load-all-tools for comparison 65 RUN_LOAD_ALL_TOOLS_COMPARISON = True 66 SHOW_DESCRIPTIONS = True 67 68 69 async def detect_server_transport(host: str, port: str, quiet: bool = False) -> tuple[str, str]: 70 """ 71 Detect what transport mode the server is running and return the appropriate URL and transport type. 72 73 Args: 74 host: Server hostname 75 port: Server port 76 quiet: If True, suppress detection messages 77 78 Returns: 79 Tuple of (url, transport_type) where transport_type is 'sse', 'streamable-http', or 'stdio' 80 """ 81 console = Console() 82 83 if not quiet: 84 console.print(f"[blue]Detecting transport mode for server at {host}:{port}...[/blue]") 85 86 # Test MCP protocol endpoints with proper requests 87 endpoints_to_try = [ 88 (f"http://{host}:{port}/mcp/", "streamable-http"), 89 (f"http://{host}:{port}/sse", "sse"), 90 (f"http://{host}:{port}", "sse"), # fallback for sse 91 ] 92 93 # Create a simple MCP initialization message for testing 94 test_message = { 95 "jsonrpc": "2.0", 96 "id": 1, 97 "method": "initialize", 98 "params": { 99 "protocolVersion": "2024-11-05", 100 "capabilities": {}, 101 "clientInfo": {"name": "mcp-detector", "version": "1.0.0"}, 102 }, 103 } 104 105 for url, transport in endpoints_to_try: 106 try: 107 timeout = aiohttp.ClientTimeout(total=5) 108 async with aiohttp.ClientSession(timeout=timeout) as session: 109 if transport == "streamable-http": 110 # Test streamable-http with POST + MCP message 111 headers = { 112 "Content-Type": "application/json", 113 "Accept": "application/json, text/event-stream", 114 } 115 async with session.post(url, json=test_message, headers=headers) as response: 116 if response.status == 200: 117 # Check if response looks like MCP 118 try: 119 data = await response.text() 120 if '"jsonrpc":"2.0"' in data or '"result"' in data: 121 if not quiet: 122 console.print( 123 f"[green]Detected {transport} transport at {url}[/green]" 124 ) 125 return url, transport 126 except Exception: 127 pass 128 elif response.status in [400, 404, 405, 406]: 129 # Server exists but doesn't support this transport 130 if not quiet: 131 console.print( 132 f"[dim]Endpoint {url} returned {response.status}[/dim]" 133 ) 134 continue 135 else: 136 # Test SSE endpoints - they might respond to GET or POST 137 # Try GET first for SSE 138 try: 139 async with session.get(url) as response: 140 if response.status == 200: 141 content_type = response.headers.get("content-type", "").lower() 142 if "text/event-stream" in content_type: 143 if not quiet: 144 console.print( 145 f"[green]Detected {transport} transport at {url}[/green]" 146 ) 147 return url, transport 148 except Exception: 149 pass 150 151 # If GET failed, try POST for SSE (some servers might expect it) 152 try: 153 async with session.post(url, json=test_message) as response: 154 if response.status == 200: 155 content_type = response.headers.get("content-type", "").lower() 156 if ( 157 "text/event-stream" in content_type 158 or "application/json" in content_type 159 ): 160 if not quiet: 161 console.print( 162 f"[green]Detected {transport} transport at {url}[/green]" 163 ) 164 return url, transport 165 except Exception: 166 pass 167 168 except Exception as e: 169 if not quiet: 170 console.print(f"[dim]Could not connect to {url}: {str(e)}[/dim]") 171 continue 172 173 # If HTTP detection fails, try to guess based on what we know 174 # Check if port 8013 responds at all 175 try: 176 timeout = aiohttp.ClientTimeout(total=2) 177 async with aiohttp.ClientSession(timeout=timeout) as session: 178 async with session.get(f"http://{host}:{port}/") as response: 179 if response.status == 200: 180 # Server is running, probably streamable-http since that's the new default 181 default_url = f"http://{host}:{port}/mcp/" 182 if not quiet: 183 console.print( 184 f"[yellow]Server detected but transport unclear, defaulting to streamable-http at {default_url}[/yellow]" 185 ) 186 return default_url, "streamable-http" 187 except Exception: 188 pass 189 190 # Final fallback to SSE for backwards compatibility 191 fallback_url = f"http://{host}:{port}/sse" 192 if not quiet: 193 console.print( 194 f"[yellow]Could not detect transport mode, defaulting to SSE at {fallback_url}[/yellow]" 195 ) 196 return fallback_url, "sse" 197 198 199 def get_server_url_and_transport() -> tuple[str, str]: 200 """ 201 Get the MCP server URL and transport type from .env file or environment variables 202 203 Returns: 204 Tuple of (server_url, transport_type) 205 """ 206 # Try to get from python-decouple (.env file) 207 try: 208 host = decouple_config("MCP_SERVER_HOST", default="localhost") 209 port = decouple_config("MCP_SERVER_PORT", default="8013") 210 211 # Try to detect transport type - this will be resolved in the async context 212 return host, port 213 except Exception: 214 # Fallback to environment variables if decouple fails 215 if "MCP_SERVER_HOST" in os.environ and "MCP_SERVER_PORT" in os.environ: 216 host = os.environ["MCP_SERVER_HOST"] 217 port = os.environ["MCP_SERVER_PORT"] 218 return host, port 219 220 # Default fallback 221 return "localhost", "8013" 222 223 224 # Calculate token counts for different models 225 def count_tokens(text: str) -> int: 226 """Count tokens using tiktoken with cl100k_base encoding (used by most modern models)""" 227 encoding = tiktoken.get_encoding("cl100k_base") 228 return len(encoding.encode(text)) 229 230 231 # Use real pricing imported from constants.py 232 # Convert from dollars per million tokens to dollars per 1000 tokens for our calculations 233 MODEL_PRICES = { 234 model: price_info["input"] / 1000 # Convert from per million to per thousand 235 for model, price_info in COST_PER_MILLION_TOKENS.items() 236 } 237 238 239 def format_capabilities(capabilities): 240 """Safely format capabilities object to string for display""" 241 result = {} 242 # Check for specific capabilities we know about 243 if hasattr(capabilities, "tools"): 244 result["tools"] = "Available" if capabilities.tools else "Not available" 245 if hasattr(capabilities, "prompts"): 246 result["prompts"] = "Available" if capabilities.prompts else "Not available" 247 if hasattr(capabilities, "resources"): 248 result["resources"] = "Available" if capabilities.resources else "Not available" 249 if hasattr(capabilities, "logging"): 250 result["logging"] = "Available" if capabilities.logging else "Not available" 251 if hasattr(capabilities, "completions"): 252 result["completions"] = "Available" if capabilities.completions else "Not available" 253 if hasattr(capabilities, "experimental"): 254 result["experimental"] = "Available" if capabilities.experimental else "Not available" 255 256 return json.dumps(result, indent=2) 257 258 259 async def get_mcp_server_tools_streamable_http( 260 server_url: str, 261 include_tools: Optional[List[str]] = None, 262 console: Console = None, 263 quiet: bool = False, 264 ) -> Dict[str, Any]: 265 """ 266 Connect to an MCP server running in streamable-http mode and fetch all registered tools. 267 268 Args: 269 server_url: The URL of the running MCP server (should be http://host:port/mcp) 270 include_tools: Optional list of tool names to include (if None, get all tools) 271 console: Optional console for output 272 quiet: If True, only show most important output 273 274 Returns: 275 Dictionary with server info and tool definitions 276 """ 277 if console is None: 278 console = Console() 279 280 if not quiet: 281 console.print( 282 f"[bold blue]Connecting to streamable-http MCP server at {server_url}...[/bold blue]" 283 ) 284 285 try: 286 timeout = aiohttp.ClientTimeout(total=30) 287 async with aiohttp.ClientSession(timeout=timeout) as session: 288 # First, try to initialize the MCP connection 289 init_data = { 290 "jsonrpc": "2.0", 291 "id": 1, 292 "method": "initialize", 293 "params": { 294 "protocolVersion": "2024-11-05", 295 "capabilities": {"roots": {"listChanged": True}}, 296 "clientInfo": {"name": "mcp-tool-context-estimator", "version": "1.0.0"}, 297 }, 298 } 299 300 headers = { 301 "Content-Type": "application/json", 302 "Accept": "application/json, text/event-stream", 303 } 304 305 if not quiet: 306 console.print( 307 "[bold blue]Initializing MCP protocol via streamable-http...[/bold blue]" 308 ) 309 310 async with session.post(server_url, json=init_data, headers=headers) as response: 311 if response.status != 200: 312 raise Exception(f"Failed to initialize: HTTP {response.status}") 313 314 # Capture session ID from response headers 315 session_id = response.headers.get("mcp-session-id") 316 if not session_id: 317 raise Exception("No session ID returned from server") 318 319 # Handle SSE-formatted response 320 response_text = await response.text() 321 if response.content_type == "text/event-stream": 322 # Parse SSE format 323 lines = response_text.strip().split("\n") 324 json_data = None 325 for line in lines: 326 if line.startswith("data: "): 327 json_data = line[6:] # Remove 'data: ' prefix 328 break 329 if json_data: 330 init_result = json.loads(json_data) 331 else: 332 raise Exception("No JSON data found in SSE response") 333 else: 334 init_result = await response.json() 335 336 if "error" in init_result: 337 raise Exception(f"MCP initialization error: {init_result['error']}") 338 339 if "result" not in init_result: 340 raise Exception("Invalid MCP initialization response") 341 342 result = init_result["result"] 343 server_info = result.get("serverInfo", {}) 344 server_name = server_info.get("name", "Unknown Server") 345 server_version = server_info.get("version", "Unknown Version") 346 347 if not quiet: 348 console.print( 349 f"[green]Connected to server:[/green] {server_name} v{server_version}" 350 ) 351 352 # Show server capabilities 353 capabilities = result.get("capabilities", {}) 354 if not quiet: 355 console.print("[bold blue]Server capabilities:[/bold blue]") 356 console.print(json.dumps(capabilities, indent=2)) 357 358 # Check if tools capability is present 359 has_tools = capabilities.get("tools", False) 360 361 if not quiet and not has_tools: 362 console.print( 363 "[bold yellow]Warning: This server does not advertise tools capability![/bold yellow]" 364 ) 365 console.print( 366 "The server might not support tool listing, but we'll try anyway." 367 ) 368 369 # Get server instructions (from server info) 370 server_instructions = server_info.get("instructions", "") 371 if server_instructions and not quiet: 372 console.print( 373 f"[green]Server provides instructions of length {len(server_instructions):,} chars[/green]" 374 ) 375 elif not quiet: 376 console.print("[yellow]Server does not provide instructions[/yellow]") 377 378 # Update headers to include session ID for subsequent requests 379 headers["mcp-session-id"] = session_id 380 381 # Send initialized notification 382 init_notify_data = {"jsonrpc": "2.0", "method": "notifications/initialized"} 383 384 async with session.post(server_url, json=init_notify_data, headers=headers) as response: 385 # This is a notification, so we don't expect a response 386 pass 387 388 # Now list the tools 389 if not quiet: 390 console.print("[bold blue]Retrieving tool definitions...[/bold blue]") 391 392 list_tools_data = {"jsonrpc": "2.0", "id": 2, "method": "tools/list"} 393 394 async with session.post(server_url, json=list_tools_data, headers=headers) as response: 395 if response.status != 200: 396 raise Exception(f"Failed to list tools: HTTP {response.status}") 397 398 # Handle SSE-formatted response for tools list 399 response_text = await response.text() 400 if response.content_type == "text/event-stream": 401 # Parse SSE format 402 lines = response_text.strip().split("\n") 403 json_data = None 404 for line in lines: 405 if line.startswith("data: "): 406 json_data = line[6:] # Remove 'data: ' prefix 407 break 408 if json_data: 409 tools_result = json.loads(json_data) 410 else: 411 raise Exception("No JSON data found in SSE response") 412 else: 413 tools_result = await response.json() 414 415 if "error" in tools_result: 416 raise Exception(f"MCP tools/list error: {tools_result['error']}") 417 418 if "result" not in tools_result: 419 raise Exception("Invalid MCP tools/list response") 420 421 tools_data = tools_result["result"] 422 tools = tools_data.get("tools", []) 423 424 # Count tools 425 tool_count = len(tools) if tools else 0 426 if not quiet: 427 console.print(f"[green]Found {tool_count} tools[/green]") 428 429 if tool_count == 0: 430 console.print("[bold yellow]No tools found on the server.[/bold yellow]") 431 return {} 432 433 # Convert tools to their JSON representation (exactly as sent to LLMs) 434 tool_defs = [] 435 436 # Add debug information about descriptions 437 has_descriptions = 0 438 total_desc_length = 0 439 440 for tool in tools: 441 # Convert to dict that matches the MCP protocol spec for tool definitions 442 tool_dict = {"name": tool.get("name"), "inputSchema": tool.get("inputSchema")} 443 444 # Debug description handling 445 if tool.get("description"): 446 desc = tool["description"] 447 has_descriptions += 1 448 total_desc_length += len(desc) 449 if not quiet: 450 console.print( 451 f"[dim]Tool '{tool['name']}' has description ({len(desc):,} chars)[/dim]" 452 ) 453 tool_dict["description"] = desc 454 elif not quiet: 455 console.print( 456 f"[dim yellow]Tool '{tool['name']}' has no description[/dim yellow]" 457 ) 458 459 if tool.get("annotations"): 460 tool_dict["annotations"] = tool["annotations"] 461 462 tool_defs.append(tool_dict) 463 464 # Print description statistics 465 if not quiet: 466 console.print( 467 f"[green]{has_descriptions} out of {tool_count} tools have descriptions[/green]" 468 ) 469 if has_descriptions > 0: 470 console.print( 471 f"[green]Average description length: {total_desc_length / has_descriptions:,.1f} chars[/green]" 472 ) 473 474 # Include server info in the result to be used for creating the complete LLM prompt 475 return { 476 "tools": tool_defs, 477 "server_name": server_name, 478 "server_version": server_version, 479 "server_instructions": server_instructions, 480 } 481 482 except Exception as e: 483 console.print( 484 f"[bold red]Error connecting to streamable-http MCP server:[/bold red] {str(e)}" 485 ) 486 if not quiet: 487 console.print("[bold yellow]Stack trace:[/bold yellow]") 488 console.print(traceback.format_exc()) 489 raise 490 491 492 async def get_mcp_server_tools_stdio( 493 command: str, 494 args: Optional[List[str]] = None, 495 include_tools: Optional[List[str]] = None, 496 console: Console = None, 497 quiet: bool = False, 498 ) -> Dict[str, Any]: 499 """ 500 Connect to an MCP server via stdio transport and fetch all registered tools. 501 502 Args: 503 command: Command to run the MCP server 504 args: Additional arguments for the command 505 include_tools: Optional list of tool names to include (if None, get all tools) 506 console: Optional console for output 507 quiet: If True, only show most important output 508 509 Returns: 510 Dictionary with server info and tool definitions 511 """ 512 if console is None: 513 console = Console() 514 515 if not quiet: 516 console.print( 517 f"[bold blue]Connecting to MCP server via stdio: {command} {' '.join(args or [])}[/bold blue]" 518 ) 519 520 try: 521 # Build the command array 522 cmd = command.split() if isinstance(command, str) else [command] 523 if args: 524 cmd.extend(args) 525 526 async with stdio_client(cmd) as (read, write): 527 # Create a client session 528 async with ClientSession(read, write) as session: 529 # Initialize connection to server 530 if not quiet: 531 console.print("[bold blue]Initializing MCP protocol via stdio...[/bold blue]") 532 init_result = await session.initialize() 533 534 # Get server info 535 server_name = init_result.serverInfo.name 536 server_version = init_result.serverInfo.version 537 if not quiet: 538 console.print( 539 f"[green]Connected to server:[/green] {server_name} v{server_version}" 540 ) 541 542 # Show server capabilities safely 543 if not quiet: 544 console.print("[bold blue]Server capabilities:[/bold blue]") 545 console.print(format_capabilities(init_result.capabilities)) 546 547 # Check if tools capability is present 548 has_tools = False 549 if hasattr(init_result.capabilities, "tools") and init_result.capabilities.tools: 550 has_tools = True 551 552 if not quiet and not has_tools: 553 console.print( 554 "[bold yellow]Warning: This server does not advertise tools capability![/bold yellow]" 555 ) 556 console.print( 557 "The server might not support tool listing, but we'll try anyway." 558 ) 559 560 # Get server instructions (will be used in the LLM prompt) 561 server_instructions = "" 562 if hasattr(init_result, "instructions") and init_result.instructions: 563 server_instructions = init_result.instructions 564 if not quiet: 565 console.print( 566 f"[green]Server provides instructions of length {len(server_instructions):,} chars[/green]" 567 ) 568 elif not quiet: 569 console.print("[yellow]Server does not provide instructions[/yellow]") 570 571 # List available tools 572 if not quiet: 573 console.print("[bold blue]Retrieving tool definitions...[/bold blue]") 574 try: 575 tools_result = await session.list_tools() 576 577 # Handle ListToolsResult object 578 tools = [] 579 if hasattr(tools_result, "tools"): 580 tools = tools_result.tools 581 else: 582 if not quiet: 583 console.print( 584 "[bold yellow]Tools result doesn't have expected structure. Trying alternatives...[/bold yellow]" 585 ) 586 if hasattr(tools_result, "__iter__"): 587 tools = list(tools_result) 588 else: 589 if not quiet: 590 console.print( 591 f"[bold yellow]Tools result type: {type(tools_result)}[/bold yellow]" 592 ) 593 console.print(f"Tools result attributes: {dir(tools_result)}") 594 raise ValueError("Unable to extract tools from server response") 595 596 # Count tools 597 tool_count = len(tools) if tools else 0 598 if not quiet: 599 console.print(f"[green]Found {tool_count} tools[/green]") 600 601 if tool_count == 0: 602 console.print("[bold yellow]No tools found on the server.[/bold yellow]") 603 return {} 604 605 # Convert tools to their JSON representation (exactly as sent to LLMs) 606 tool_defs = [] 607 608 # Add debug information about descriptions 609 has_descriptions = 0 610 total_desc_length = 0 611 612 for tool in tools: 613 # Convert to dict that matches the MCP protocol spec for tool definitions 614 tool_dict = {"name": tool.name, "inputSchema": tool.inputSchema} 615 616 # Debug description handling 617 if hasattr(tool, "description") and tool.description: 618 desc = tool.description 619 has_descriptions += 1 620 total_desc_length += len(desc) 621 if not quiet: 622 console.print( 623 f"[dim]Tool '{tool.name}' has description ({len(desc):,} chars)[/dim]" 624 ) 625 tool_dict["description"] = desc 626 elif not quiet: 627 console.print( 628 f"[dim yellow]Tool '{tool.name}' has no description[/dim yellow]" 629 ) 630 631 if hasattr(tool, "annotations") and tool.annotations: 632 tool_dict["annotations"] = tool.annotations 633 634 tool_defs.append(tool_dict) 635 636 # Print description statistics 637 if not quiet: 638 console.print( 639 f"[green]{has_descriptions} out of {tool_count} tools have descriptions[/green]" 640 ) 641 if has_descriptions > 0: 642 console.print( 643 f"[green]Average description length: {total_desc_length / has_descriptions:,.1f} chars[/green]" 644 ) 645 646 # Include server info in the result to be used for creating the complete LLM prompt 647 return { 648 "tools": tool_defs, 649 "server_name": server_name, 650 "server_version": server_version, 651 "server_instructions": server_instructions, 652 } 653 except Exception as e: 654 console.print(f"[bold red]Error listing tools:[/bold red] {str(e)}") 655 if not quiet: 656 console.print("[bold yellow]Stack trace:[/bold yellow]") 657 console.print(traceback.format_exc()) 658 raise 659 except Exception as e: 660 console.print(f"[bold red]Error connecting to MCP server via stdio:[/bold red] {str(e)}") 661 if not quiet: 662 console.print("[bold yellow]Stack trace:[/bold yellow]") 663 console.print(traceback.format_exc()) 664 raise 665 666 667 async def get_mcp_server_tools( 668 server_url: str, 669 transport_type: str, 670 include_tools: Optional[List[str]] = None, 671 console: Console = None, 672 quiet: bool = False, 673 command: Optional[str] = None, 674 args: Optional[List[str]] = None, 675 ) -> Dict[str, Any]: 676 """ 677 Connect to an already running MCP server and fetch all registered tools. 678 679 Args: 680 server_url: The URL of the running MCP server (ignored for stdio) 681 transport_type: The transport type ('sse', 'streamable-http', or 'stdio') 682 include_tools: Optional list of tool names to include (if None, get all tools) 683 console: Optional console for output 684 quiet: If True, only show most important output 685 command: Command to run for stdio transport 686 args: Additional arguments for stdio command 687 688 Returns: 689 Dictionary with server info and tool definitions 690 """ 691 if console is None: 692 console = Console() 693 694 if transport_type == "streamable-http": 695 return await get_mcp_server_tools_streamable_http(server_url, include_tools, console, quiet) 696 elif transport_type == "stdio": 697 if not command: 698 raise ValueError("Command must be provided for stdio transport") 699 return await get_mcp_server_tools_stdio(command, args, include_tools, console, quiet) 700 701 # Original SSE implementation 702 if not quiet: 703 console.print(f"[bold blue]Connecting to MCP server at {server_url}...[/bold blue]") 704 705 try: 706 async with sse_client(server_url) as (read, write): 707 # Create a client session 708 async with ClientSession(read, write) as session: 709 # Initialize connection to server 710 if not quiet: 711 console.print("[bold blue]Initializing MCP protocol...[/bold blue]") 712 init_result = await session.initialize() 713 714 # Get server info 715 server_name = init_result.serverInfo.name 716 server_version = init_result.serverInfo.version 717 if not quiet: 718 console.print( 719 f"[green]Connected to server:[/green] {server_name} v{server_version}" 720 ) 721 722 # Show server capabilities safely 723 if not quiet: 724 console.print("[bold blue]Server capabilities:[/bold blue]") 725 console.print(format_capabilities(init_result.capabilities)) 726 727 # Check if tools capability is present 728 has_tools = False 729 if hasattr(init_result.capabilities, "tools") and init_result.capabilities.tools: 730 has_tools = True 731 732 if not quiet and not has_tools: 733 console.print( 734 "[bold yellow]Warning: This server does not advertise tools capability![/bold yellow]" 735 ) 736 console.print( 737 "The server might not support tool listing, but we'll try anyway." 738 ) 739 740 # Get server instructions (will be used in the LLM prompt) 741 server_instructions = "" 742 if hasattr(init_result, "instructions") and init_result.instructions: 743 server_instructions = init_result.instructions 744 if not quiet: 745 console.print( 746 f"[green]Server provides instructions of length {len(server_instructions):,} chars[/green]" 747 ) 748 elif not quiet: 749 console.print("[yellow]Server does not provide instructions[/yellow]") 750 751 # List available tools 752 if not quiet: 753 console.print("[bold blue]Retrieving tool definitions...[/bold blue]") 754 try: 755 tools_result = await session.list_tools() 756 757 # Handle ListToolsResult object 758 # The result should have a 'tools' attribute which is the actual list 759 tools = [] 760 if hasattr(tools_result, "tools"): 761 tools = tools_result.tools 762 else: 763 # If it doesn't have a tools attribute, try to access it as a list directly 764 # or check other common patterns 765 if not quiet: 766 console.print( 767 "[bold yellow]Tools result doesn't have expected structure. Trying alternatives...[/bold yellow]" 768 ) 769 if hasattr(tools_result, "__iter__"): 770 tools = list(tools_result) 771 else: 772 # Print the object to help diagnose 773 if not quiet: 774 console.print( 775 f"[bold yellow]Tools result type: {type(tools_result)}[/bold yellow]" 776 ) 777 console.print(f"Tools result attributes: {dir(tools_result)}") 778 raise ValueError("Unable to extract tools from server response") 779 780 # Count tools 781 tool_count = len(tools) if tools else 0 782 if not quiet: 783 console.print(f"[green]Found {tool_count} tools[/green]") 784 785 if tool_count == 0: 786 console.print("[bold yellow]No tools found on the server.[/bold yellow]") 787 return {} 788 789 # Convert tools to their JSON representation (exactly as sent to LLMs) 790 tool_defs = [] 791 792 # Add debug information about descriptions 793 has_descriptions = 0 794 total_desc_length = 0 795 796 for tool in tools: 797 # Convert to dict that matches the MCP protocol spec for tool definitions 798 tool_dict = {"name": tool.name, "inputSchema": tool.inputSchema} 799 800 # Debug description handling 801 if hasattr(tool, "description") and tool.description: 802 desc = tool.description 803 has_descriptions += 1 804 total_desc_length += len(desc) 805 if not quiet: 806 console.print( 807 f"[dim]Tool '{tool.name}' has description ({len(desc):,} chars)[/dim]" 808 ) 809 tool_dict["description"] = desc 810 elif not quiet: 811 console.print( 812 f"[dim yellow]Tool '{tool.name}' has no description[/dim yellow]" 813 ) 814 815 if hasattr(tool, "annotations") and tool.annotations: 816 tool_dict["annotations"] = tool.annotations 817 818 tool_defs.append(tool_dict) 819 820 # Print description statistics 821 if not quiet: 822 console.print( 823 f"[green]{has_descriptions} out of {tool_count} tools have descriptions[/green]" 824 ) 825 if has_descriptions > 0: 826 console.print( 827 f"[green]Average description length: {total_desc_length / has_descriptions:,.1f} chars[/green]" 828 ) 829 830 # Include server info in the result to be used for creating the complete LLM prompt 831 return { 832 "tools": tool_defs, 833 "server_name": server_name, 834 "server_version": server_version, 835 "server_instructions": server_instructions, 836 } 837 except Exception as e: 838 console.print(f"[bold red]Error listing tools:[/bold red] {str(e)}") 839 if not quiet: 840 console.print("[bold yellow]Stack trace:[/bold yellow]") 841 console.print(traceback.format_exc()) 842 843 # Try retrieving server details to help diagnose 844 if not quiet: 845 try: 846 console.print( 847 "[bold blue]Getting additional server information...[/bold blue]" 848 ) 849 if ( 850 hasattr(init_result.capabilities, "prompts") 851 and init_result.capabilities.prompts 852 ): 853 prompts_result = await session.list_prompts() 854 prompt_count = 0 855 if hasattr(prompts_result, "prompts"): 856 prompt_count = len(prompts_result.prompts) 857 console.print(f"Server has {prompt_count} prompts available") 858 except Exception: 859 pass 860 861 raise 862 except Exception as e: 863 console.print(f"[bold red]Error connecting to MCP server:[/bold red] {str(e)}") 864 if not quiet: 865 console.print("[bold yellow]Stack trace:[/bold yellow]") 866 console.print(traceback.format_exc()) 867 868 # Provide guidance based on the error 869 if "Connection refused" in str(e): 870 console.print( 871 "[bold yellow]The server doesn't appear to be running at the specified URL.[/bold yellow]" 872 ) 873 console.print( 874 "Make sure your MCP server is running and available at the URL you specified." 875 ) 876 elif "401" in str(e): 877 console.print( 878 "[bold yellow]Authentication error - the server requires credentials.[/bold yellow]" 879 ) 880 elif "404" in str(e): 881 console.print("[bold yellow]The server endpoint was not found.[/bold yellow]") 882 console.print("Check if you need to use a different URL path (e.g., /sse or /mcp)") 883 console.print("Try using /sse instead of just the port number.") 884 885 sys.exit(1) 886 887 888 def create_full_tool_registration_prompt(server_info, tools=None, quiet=False): 889 """ 890 Create a full, realistic prompt as would be sent to an LLM when registering MCP tools. 891 892 This generates the exact format used in the MCP client's format_tools_for_anthropic method 893 which sends tools to the Anthropic API. 894 895 Args: 896 server_info: Dictionary with server information 897 tools: List of tool definitions to include (if None, use all tools) 898 quiet: If True, only show most important output 899 900 Returns: 901 String with the serialized JSON representation of tools as sent to the API 902 """ 903 if tools is None: 904 tools = server_info["tools"] 905 906 # The actual format sent to Anthropic API is just: 907 # { 908 # "name": sanitized_name, 909 # "input_schema": tool.input_schema, 910 # "description": tool.description # only if present 911 # } 912 formatted_tools = [] 913 914 # Track description statistics 915 desc_count = 0 916 total_desc_len = 0 917 918 console = Console() 919 920 for tool in tools: 921 # Create the tool dict exactly as in format_tools_for_anthropic 922 tool_dict_for_api = {"name": tool["name"], "input_schema": tool["inputSchema"]} 923 if SHOW_DESCRIPTIONS: 924 # Add description only if it exists and is not empty 925 if "description" in tool and tool["description"]: 926 desc = tool["description"] 927 tool_dict_for_api["description"] = desc 928 desc_count += 1 929 total_desc_len += len(desc) 930 if not quiet and len(desc) > 100: 931 # Show abbreviated version for long descriptions 932 abbrev = desc[:50] + "..." + desc[-50:] 933 console.print(f"[dim]Including description for {tool['name']}: {abbrev}[/dim]") 934 elif not quiet: 935 console.print(f"[dim]Including description for {tool['name']}: {desc}[/dim]") 936 elif not quiet: 937 console.print(f"[dim yellow]No description for {tool['name']}[/dim yellow]") 938 939 formatted_tools.append(tool_dict_for_api) 940 941 # Final description statistics - ALWAYS show these since they're part of the requested output 942 console.print( 943 f"[green]Included {desc_count} descriptions out of {len(tools)} tools in final output[/green]" 944 ) 945 if desc_count > 0: 946 console.print( 947 f"[green]Average description length in final output: {total_desc_len / desc_count:,.1f} chars[/green]" 948 ) 949 950 # Return the serialized JSON that would be sent to the API 951 return json.dumps(formatted_tools, indent=2) 952 953 954 def format_tool_for_llm(tool: Dict[str, Any]) -> str: 955 """ 956 Format a tool definition exactly as it would be presented to an LLM. 957 This should match the format used in actual LLM prompt construction. 958 """ 959 # This is how tools are typically formatted for LLMs in the JSON format 960 return json.dumps(tool, indent=2) 961 962 963 def analyze_tools_token_usage( 964 current_tools: Dict[str, Any], all_tools: Dict[str, Any], quiet: bool = False 965 ): 966 """ 967 Analyze token usage for a complete MCP tool registration prompt 968 969 Args: 970 current_tools: Current active toolset info 971 all_tools: Complete toolset info (with --load-all-tools) 972 quiet: If True, only show most important output 973 """ 974 console = Console() 975 976 # Format tools as they would be sent to an LLM 977 current_tools_subset = current_tools["tools"] 978 all_tools_subset = all_tools["tools"] 979 980 # Determine if we're likely comparing the same set vs different sets 981 same_toolsets = len(current_tools_subset) == len(all_tools_subset) 982 if same_toolsets and not quiet: 983 console.print("[yellow]Warning: Current tool count equals all tools count.[/yellow]") 984 console.print( 985 "[yellow]This suggests the server is already running with --load-all-tools[/yellow]" 986 ) 987 988 # Adjust column labels based on what we're comparing 989 current_label = "Current Tools" 990 all_label = "All Tools" 991 992 # Get JSON representations 993 current_tools_json = "\n".join(format_tool_for_llm(tool) for tool in current_tools_subset) 994 all_tools_json = "\n".join(format_tool_for_llm(tool) for tool in all_tools_subset) 995 996 # Create the full prompts 997 current_tools_prompt = create_full_tool_registration_prompt( 998 current_tools, current_tools_subset, quiet 999 ) 1000 all_tools_prompt = create_full_tool_registration_prompt(all_tools, all_tools_subset, quiet) 1001 1002 # Calculate sizes for raw JSON 1003 current_tools_size_kb = len(current_tools_json.encode("utf-8")) / 1024 1004 all_tools_size_kb = len(all_tools_json.encode("utf-8")) / 1024 1005 1006 # Calculate sizes for full prompts 1007 current_tools_prompt_size_kb = len(current_tools_prompt.encode("utf-8")) / 1024 1008 all_tools_prompt_size_kb = len(all_tools_prompt.encode("utf-8")) / 1024 1009 1010 # Count tokens for raw JSON 1011 current_tools_tokens = count_tokens(current_tools_json) 1012 all_tools_tokens = count_tokens(all_tools_json) 1013 1014 # Count tokens for full prompts 1015 current_tools_prompt_tokens = count_tokens(current_tools_prompt) 1016 all_tools_prompt_tokens = count_tokens(all_tools_prompt) 1017 1018 # Calculate costs for different models (using full prompt tokens) 1019 current_tools_costs = { 1020 model: (price * current_tools_prompt_tokens / 1000) for model, price in MODEL_PRICES.items() 1021 } 1022 all_tools_costs = { 1023 model: (price * all_tools_prompt_tokens / 1000) for model, price in MODEL_PRICES.items() 1024 } 1025 1026 # Save the complete, untruncated text to files 1027 with open("current_tools_sent_to_llm.json", "w", encoding="utf-8") as f: 1028 f.write(current_tools_prompt) 1029 console.print("[green]Saved current tools JSON to current_tools_sent_to_llm.json[/green]") 1030 1031 with open("all_tools_sent_to_llm.json", "w", encoding="utf-8") as f: 1032 f.write(all_tools_prompt) 1033 console.print("[green]Saved all tools JSON to all_tools_sent_to_llm.json[/green]\n\n") 1034 1035 # Create data for display - ensure the data is correct and consistent 1036 data = { 1037 "current_tools": { 1038 "count": len(current_tools_subset), 1039 "raw_size_kb": current_tools_size_kb, 1040 "raw_tokens": current_tools_tokens, 1041 "full_size_kb": current_tools_prompt_size_kb, 1042 "full_tokens": current_tools_prompt_tokens, 1043 "costs": current_tools_costs, 1044 }, 1045 "all_tools": { 1046 "count": len(all_tools_subset), 1047 "raw_size_kb": all_tools_size_kb, 1048 "raw_tokens": all_tools_tokens, 1049 "full_size_kb": all_tools_prompt_size_kb, 1050 "full_tokens": all_tools_prompt_tokens, 1051 "costs": all_tools_costs, 1052 }, 1053 } 1054 1055 # Create comparison table 1056 table = Table(title="Tool Registration Token Usage") 1057 1058 # Add columns - including percentage column 1059 table.add_column("Metric", style="white") 1060 table.add_column(current_label, style="cyan") 1061 table.add_column(all_label, style="magenta") 1062 table.add_column("Difference", style="yellow") 1063 table.add_column(f"{current_label} as % of {all_label}", style="green") 1064 1065 # SECTION 1: Number of Tools 1066 # Calculate percentage for count 1067 count_percentage = ( 1068 (data["current_tools"]["count"] / data["all_tools"]["count"]) * 100 1069 if data["all_tools"]["count"] > 0 1070 else 100 1071 ) 1072 1073 # Add rows - keep consistent format with other rows for the number of tools 1074 table.add_row( 1075 "Number of Tools", 1076 str(data["current_tools"]["count"]), 1077 str(data["all_tools"]["count"]), 1078 str(data["current_tools"]["count"] - data["all_tools"]["count"]), 1079 f"{count_percentage:.2f}%", 1080 ) 1081 1082 # Add a divider after Number of Tools 1083 table.add_section() 1084 1085 # SECTION 2: Full Prompt stats 1086 # Calculate percentage for full prompt size 1087 full_size_percentage = ( 1088 (data["current_tools"]["full_size_kb"] / data["all_tools"]["full_size_kb"]) * 100 1089 if data["all_tools"]["full_size_kb"] > 0 1090 else 100 1091 ) 1092 1093 table.add_row( 1094 "Full Prompt Size (KB)", 1095 f"{data['current_tools']['full_size_kb']:,.2f}", 1096 f"{data['all_tools']['full_size_kb']:,.2f}", 1097 f"{data['current_tools']['full_size_kb'] - data['all_tools']['full_size_kb']:,.2f}", 1098 f"{full_size_percentage:.2f}%", 1099 ) 1100 1101 # Calculate percentage for full tokens 1102 full_tokens_percentage = ( 1103 (data["current_tools"]["full_tokens"] / data["all_tools"]["full_tokens"]) * 100 1104 if data["all_tools"]["full_tokens"] > 0 1105 else 100 1106 ) 1107 1108 table.add_row( 1109 "Full Prompt Token Count", 1110 f"{data['current_tools']['full_tokens']:,}", 1111 f"{data['all_tools']['full_tokens']:,}", 1112 f"{data['current_tools']['full_tokens'] - data['all_tools']['full_tokens']:,}", 1113 f"{full_tokens_percentage:.2f}%", 1114 ) 1115 1116 # Add a divider after Full Prompt stats 1117 table.add_section() 1118 1119 # SECTION 3: Model costs 1120 # Specify the models to include and their order 1121 models_to_include = [ 1122 "claude-3-7-sonnet-20250219", 1123 "gpt-4.1", 1124 "gemini-2.5-pro-preview-03-25", 1125 "grok-3-latest", 1126 ] 1127 1128 # Add cost rows for selected models only, in specified order 1129 for model in models_to_include: 1130 if model in MODEL_PRICES: 1131 current_cost = data["current_tools"]["costs"][model] 1132 all_cost = data["all_tools"]["costs"][model] 1133 diff_cost = current_cost - all_cost 1134 1135 # Calculate percentage 1136 cost_percentage = (current_cost / all_cost) * 100 if all_cost > 0 else 100 1137 1138 table.add_row( 1139 f"Cost ({model})", 1140 f"${current_cost:.4f}", 1141 f"${all_cost:.4f}", 1142 f"${diff_cost:.4f}", 1143 f"{cost_percentage:.2f}%", 1144 ) 1145 1146 # Print table 1147 console.print(table) 1148 1149 # Print raw data as JSON (only if not in quiet mode) 1150 if not quiet: 1151 console.print("\nRaw token usage data:") 1152 console.print(json.dumps(data, indent=2)) 1153 1154 return data 1155 1156 1157 async def get_complete_toolset(quiet: bool = False) -> List[Dict[str, Any]]: 1158 """ 1159 Generate the complete toolset that would be available with --load-all-tools 1160 1161 This uses a list of tool names read from a file generated by the server. 1162 If the file doesn't exist, it will use a list of common tools from the current server. 1163 1164 Args: 1165 quiet: If True, only show most important output 1166 1167 Returns: 1168 Dictionary with server info and simulated complete toolset 1169 """ 1170 console = Console() 1171 if not quiet: 1172 console.print("[bold blue]Analyzing complete toolset (--load-all-tools)[/bold blue]") 1173 1174 # First get the current server's tools to extract real descriptions where possible 1175 try: 1176 # Get server connection details 1177 host, port = get_server_url_and_transport() 1178 server_url, transport_type = await detect_server_transport(host, port, quiet=quiet) 1179 current_tools_info = await get_mcp_server_tools( 1180 server_url, transport_type, quiet=quiet, command=None, args=None 1181 ) 1182 current_tools = ( 1183 {tool["name"]: tool for tool in current_tools_info["tools"]} 1184 if current_tools_info 1185 else {} 1186 ) 1187 if not quiet: 1188 console.print( 1189 f"[green]Retrieved {len(current_tools)} tools from current server to use their real descriptions[/green]" 1190 ) 1191 except Exception as e: 1192 if not quiet: 1193 console.print(f"[yellow]Could not get current tools: {str(e)}[/yellow]") 1194 current_tools = {} 1195 1196 # Read tool names from file created by the server 1197 all_tool_names = read_tool_names_from_file(quiet=quiet) 1198 1199 # If no tools found in file, use the tools we got from the server 1200 if not all_tool_names and current_tools: 1201 if not quiet: 1202 console.print( 1203 "[yellow]No tools found in file. Using current server tools and adding some common ones.[/yellow]" 1204 ) 1205 all_tool_names = list(current_tools.keys()) 1206 1207 # Add some common tool names that might not be in the current server 1208 additional_tools = [ 1209 "excel_create_workbook", 1210 "excel_open_workbook", 1211 "excel_add_worksheet", 1212 "excel_set_cell_value", 1213 "excel_get_cell_value", 1214 "excel_save_workbook", 1215 "excel_get_worksheet_names", 1216 "excel_create_chart", 1217 "excel_set_range_format", 1218 "smart_browser.autopilot", 1219 "smart_browser.parallel", 1220 "smart_browser.download_site_pdfs", 1221 "generate_image", 1222 "analyze_image", 1223 "transcribe_audio", 1224 ] 1225 1226 # Add them if not already present 1227 for tool in additional_tools: 1228 if tool not in all_tool_names: 1229 all_tool_names.append(tool) 1230 1231 if not quiet: 1232 console.print( 1233 f"[green]Using complete list of {len(all_tool_names)} tools for all-tools mode[/green]" 1234 ) 1235 1236 # Create tool entries based on real data 1237 tool_defs = [] 1238 1239 for tool_name in all_tool_names: 1240 # First check if we have real data for this tool 1241 if tool_name in current_tools: 1242 # Use the actual tool definition from the server 1243 tool_def = current_tools[tool_name] 1244 if not quiet: 1245 console.print(f"[dim]Using real definition for tool '{tool_name}'[/dim]") 1246 else: 1247 # Create a definition with a realistic description based on the tool name 1248 tool_desc = ( 1249 f"The {tool_name} tool provides functionality for {tool_name.replace('_', ' ')}. " 1250 + "This would be the actual docstring from the function when loaded with --load-all-tools." 1251 ) 1252 1253 # Create a basic definition 1254 tool_def = { 1255 "name": tool_name, 1256 "inputSchema": { 1257 "type": "object", 1258 "properties": { 1259 "param1": {"type": "string", "description": "First parameter"}, 1260 "param2": {"type": "string", "description": "Second parameter"}, 1261 }, 1262 "required": ["param1"], 1263 }, 1264 "description": tool_desc, 1265 } 1266 if not quiet: 1267 console.print( 1268 f"[dim yellow]Created placeholder for tool '{tool_name}'[/dim yellow]" 1269 ) 1270 1271 tool_defs.append(tool_def) 1272 1273 # Return a similar structure to what get_mcp_server_tools returns 1274 return { 1275 "tools": tool_defs, 1276 "server_name": "Ultimate MCP Server (with --load-all-tools)", 1277 "server_version": "1.6.0", 1278 "server_instructions": """This server provides access to the complete set of tools available in the Ultimate MCP Server. 1279 When running with --load-all-tools, all tools from all categories are available, including: 1280 - Completion tools for text generation 1281 - Provider tools for model management 1282 - Filesystem tools for file operations 1283 - Optimization tools for cost and performance 1284 - Text processing tools for manipulating text 1285 - Meta tools for accessing tool information 1286 - Search tools for querying databases 1287 - Browser automation tools 1288 - Web research tools 1289 - HTML processing tools 1290 - Extraction tools 1291 - SQL database tools 1292 - Document processing tools 1293 - Audio transcription tools 1294 - Excel spreadsheet tools 1295 - OCR tools 1296 - Sentiment analysis tools 1297 """, 1298 } 1299 1300 1301 def parse_args(): 1302 """Parse command line arguments""" 1303 parser = argparse.ArgumentParser(description="MCP Tool Context Estimator") 1304 parser.add_argument( 1305 "--url", default=None, help="URL of the MCP server (default: auto-detected)" 1306 ) 1307 parser.add_argument( 1308 "--transport", 1309 default=None, 1310 choices=["sse", "streamable-http", "stdio"], 1311 help="Force specific transport type (default: auto-detect)", 1312 ) 1313 parser.add_argument( 1314 "--command", 1315 default=None, 1316 help="Command to run for stdio transport (e.g., 'python -m ultimate_mcp_server')", 1317 ) 1318 parser.add_argument( 1319 "--args", default=None, nargs="*", help="Additional arguments for stdio command" 1320 ) 1321 parser.add_argument( 1322 "--no-all-tools", action="store_true", help="Skip comparison with all tools" 1323 ) 1324 parser.add_argument( 1325 "--quiet", 1326 "-q", 1327 action="store_true", 1328 help="Only show most important information and final table", 1329 ) 1330 return parser.parse_args() 1331 1332 1333 async def main(): 1334 """Main function""" 1335 console = Console() 1336 args = parse_args() 1337 1338 # Handle stdio transport 1339 if args.transport == "stdio": 1340 if not args.command: 1341 console.print("[bold red]Error: --command is required for stdio transport[/bold red]") 1342 console.print("Example: --transport stdio --command 'python -m ultimate_mcp_server'") 1343 sys.exit(1) 1344 1345 server_url = None # Not used for stdio 1346 transport_type = "stdio" 1347 command = args.command 1348 stdio_args = args.args or [] 1349 1350 if not args.quiet: 1351 console.print( 1352 f"[blue]Using stdio transport with command: {command} {' '.join(stdio_args)}[/blue]" 1353 ) 1354 else: 1355 # Get server connection details for HTTP transports 1356 if args.url: 1357 # Parse URL to extract host and port 1358 import urllib.parse 1359 1360 parsed = urllib.parse.urlparse(args.url) 1361 host = parsed.hostname or "localhost" 1362 port = str(parsed.port or 8013) 1363 if args.transport: 1364 transport_type = args.transport 1365 if transport_type == "sse": 1366 server_url = f"http://{host}:{port}/sse" 1367 else: # streamable-http 1368 server_url = f"http://{host}:{port}/mcp/" 1369 else: 1370 # Auto-detect transport for manually specified URL 1371 server_url, transport_type = await detect_server_transport( 1372 host, port, quiet=args.quiet 1373 ) 1374 else: 1375 # Auto-detect everything 1376 host, port = get_server_url_and_transport() 1377 if args.transport: 1378 transport_type = args.transport 1379 if transport_type == "sse": 1380 server_url = f"http://{host}:{port}/sse" 1381 else: # streamable-http 1382 server_url = f"http://{host}:{port}/mcp/" 1383 else: 1384 server_url, transport_type = await detect_server_transport( 1385 host, port, quiet=args.quiet 1386 ) 1387 1388 command = None 1389 stdio_args = None 1390 1391 quiet_mode = args.quiet 1392 1393 try: 1394 # Get the active toolset from the running server 1395 current_tools = await get_mcp_server_tools( 1396 server_url, transport_type, quiet=quiet_mode, command=command, args=stdio_args 1397 ) 1398 1399 if not current_tools or "tools" not in current_tools or not current_tools["tools"]: 1400 console.print("[bold yellow]No tools found on the server.[/bold yellow]") 1401 return 1402 1403 if args.no_all_tools: 1404 # If we're not doing the comparison, create a meaningful subset for comparison 1405 if not quiet_mode: 1406 console.print("[yellow]Skipping comparison with full --load-all-tools[/yellow]") 1407 console.print( 1408 "[green]Creating an artificial subset of current tools for comparison[/green]" 1409 ) 1410 1411 # Create a more meaningful subset by taking half the tools 1412 # If we have 1-4 tools, use all of them to avoid empty subset 1413 total_tools = len(current_tools["tools"]) 1414 subset_size = max(total_tools // 2, min(total_tools, 4)) 1415 subset_tools = current_tools["tools"][:subset_size] 1416 1417 if not quiet_mode: 1418 console.print( 1419 f"[green]Created subset with {subset_size} tools out of {total_tools} total[/green]" 1420 ) 1421 1422 # Create subset version 1423 subset_data = { 1424 "tools": subset_tools, 1425 "server_name": current_tools["server_name"] + " (Subset)", 1426 "server_version": current_tools["server_version"], 1427 "server_instructions": current_tools["server_instructions"], 1428 } 1429 1430 # Analyze token usage with the artificial subset vs full 1431 analyze_tools_token_usage(subset_data, current_tools, quiet=quiet_mode) 1432 else: 1433 # Get the complete toolset that would be available with --load-all-tools 1434 all_tools = await get_complete_toolset(quiet=quiet_mode) 1435 1436 # Check if current server is likely already running with all tools 1437 current_tool_count = len(current_tools["tools"]) 1438 all_tool_count = len(all_tools["tools"]) 1439 1440 if abs(current_tool_count - all_tool_count) <= 2: # Allow small difference 1441 if not quiet_mode: 1442 console.print( 1443 f"[yellow]Warning: Current server has {current_tool_count} tools, " 1444 f"which is very close to the expected all-tools count of {all_tool_count}[/yellow]" 1445 ) 1446 console.print( 1447 "[yellow]This suggests the server is already running with --load-all-tools[/yellow]" 1448 ) 1449 1450 # For accurate comparison when counts are the same, we should just use the same data for both 1451 # to ensure metrics are consistent 1452 same_tools_data = { # noqa: F841 1453 "tools": current_tools["tools"].copy(), 1454 "server_name": "Current Server", 1455 "server_version": current_tools["server_version"], 1456 "server_instructions": current_tools["server_instructions"], 1457 } 1458 1459 # Create a deep copy to ensure they're exactly the same 1460 all_tools = { 1461 "tools": current_tools["tools"].copy(), 1462 "server_name": "All Tools", 1463 "server_version": current_tools["server_version"], 1464 "server_instructions": current_tools["server_instructions"], 1465 } 1466 1467 # Analyze token usage with full prompt simulation 1468 analyze_tools_token_usage(current_tools, all_tools, quiet=quiet_mode) 1469 except KeyboardInterrupt: 1470 console.print("[bold yellow]Operation cancelled by user[/bold yellow]") 1471 except Exception as e: 1472 console.print(f"[bold red]Unexpected error:[/bold red] {str(e)}") 1473 if not quiet_mode: 1474 console.print(traceback.format_exc()) 1475 1476 1477 if __name__ == "__main__": 1478 asyncio.run(main())