/ mcp_tool_context_estimator.py
mcp_tool_context_estimator.py
   1  #!/usr/bin/env python
   2  """
   3  MCP Tool Context Estimator
   4  
   5  This script connects to an already running MCP server and estimates how much
   6  of an LLM's context window would be consumed by the registered tools when
   7  they're sent to the model via the Model Context Protocol.
   8  """
   9  
  10  import argparse
  11  import asyncio
  12  import json
  13  import os
  14  import sys
  15  import traceback
  16  from typing import Any, Dict, List, Optional
  17  
  18  import aiohttp
  19  import tiktoken
  20  from mcp import ClientSession
  21  from mcp.client.sse import sse_client
  22  from mcp.client.stdio import stdio_client
  23  from rich.console import Console
  24  from rich.table import Table
  25  
  26  # Add the current directory to the Python path to ensure we can import modules
  27  sys.path.append("/data/projects/ultimate_mcp_server")
  28  
  29  # Import the existing decouple configuration from the project
  30  from ultimate_mcp_server.config import decouple_config
  31  
  32  # Import actual model pricing from constants
  33  from ultimate_mcp_server.constants import COST_PER_MILLION_TOKENS
  34  
  35  # Removed dependency on STANDALONE_TOOL_FUNCTIONS to avoid circular imports
  36  # from ultimate_mcp_server.tools import STANDALONE_TOOL_FUNCTIONS
  37  
  38  
  39  # Define a function to read tool names from a file generated by the server
  40  def read_tool_names_from_file(filename="tools_list.json", quiet=False):
  41      """Read tool names from a JSON file generated by the server"""
  42      console = Console()
  43      try:
  44          if os.path.exists(filename):
  45              with open(filename, "r") as f:
  46                  tool_data = json.load(f)
  47                  if not quiet:
  48                      console.print(
  49                          f"[green]Successfully loaded {len(tool_data)} tools from {filename}[/green]"
  50                      )
  51                  return tool_data
  52          else:
  53              if not quiet:
  54                  console.print(
  55                      f"[yellow]Tool list file {filename} not found. Will use server-provided tools only.[/yellow]"
  56                  )
  57              return []
  58      except Exception as e:
  59          if not quiet:
  60              console.print(f"[red]Error reading tool list: {str(e)}[/red]")
  61          return []
  62  
  63  
  64  # Run another server with --load-all-tools for comparison
  65  RUN_LOAD_ALL_TOOLS_COMPARISON = True
  66  SHOW_DESCRIPTIONS = True
  67  
  68  
  69  async def detect_server_transport(host: str, port: str, quiet: bool = False) -> tuple[str, str]:
  70      """
  71      Detect what transport mode the server is running and return the appropriate URL and transport type.
  72  
  73      Args:
  74          host: Server hostname
  75          port: Server port
  76          quiet: If True, suppress detection messages
  77  
  78      Returns:
  79          Tuple of (url, transport_type) where transport_type is 'sse', 'streamable-http', or 'stdio'
  80      """
  81      console = Console()
  82  
  83      if not quiet:
  84          console.print(f"[blue]Detecting transport mode for server at {host}:{port}...[/blue]")
  85  
  86      # Test MCP protocol endpoints with proper requests
  87      endpoints_to_try = [
  88          (f"http://{host}:{port}/mcp/", "streamable-http"),
  89          (f"http://{host}:{port}/sse", "sse"),
  90          (f"http://{host}:{port}", "sse"),  # fallback for sse
  91      ]
  92  
  93      # Create a simple MCP initialization message for testing
  94      test_message = {
  95          "jsonrpc": "2.0",
  96          "id": 1,
  97          "method": "initialize",
  98          "params": {
  99              "protocolVersion": "2024-11-05",
 100              "capabilities": {},
 101              "clientInfo": {"name": "mcp-detector", "version": "1.0.0"},
 102          },
 103      }
 104  
 105      for url, transport in endpoints_to_try:
 106          try:
 107              timeout = aiohttp.ClientTimeout(total=5)
 108              async with aiohttp.ClientSession(timeout=timeout) as session:
 109                  if transport == "streamable-http":
 110                      # Test streamable-http with POST + MCP message
 111                      headers = {
 112                          "Content-Type": "application/json",
 113                          "Accept": "application/json, text/event-stream",
 114                      }
 115                      async with session.post(url, json=test_message, headers=headers) as response:
 116                          if response.status == 200:
 117                              # Check if response looks like MCP
 118                              try:
 119                                  data = await response.text()
 120                                  if '"jsonrpc":"2.0"' in data or '"result"' in data:
 121                                      if not quiet:
 122                                          console.print(
 123                                              f"[green]Detected {transport} transport at {url}[/green]"
 124                                          )
 125                                      return url, transport
 126                              except Exception:
 127                                  pass
 128                          elif response.status in [400, 404, 405, 406]:
 129                              # Server exists but doesn't support this transport
 130                              if not quiet:
 131                                  console.print(
 132                                      f"[dim]Endpoint {url} returned {response.status}[/dim]"
 133                                  )
 134                              continue
 135                  else:
 136                      # Test SSE endpoints - they might respond to GET or POST
 137                      # Try GET first for SSE
 138                      try:
 139                          async with session.get(url) as response:
 140                              if response.status == 200:
 141                                  content_type = response.headers.get("content-type", "").lower()
 142                                  if "text/event-stream" in content_type:
 143                                      if not quiet:
 144                                          console.print(
 145                                              f"[green]Detected {transport} transport at {url}[/green]"
 146                                          )
 147                                      return url, transport
 148                      except Exception:
 149                          pass
 150  
 151                      # If GET failed, try POST for SSE (some servers might expect it)
 152                      try:
 153                          async with session.post(url, json=test_message) as response:
 154                              if response.status == 200:
 155                                  content_type = response.headers.get("content-type", "").lower()
 156                                  if (
 157                                      "text/event-stream" in content_type
 158                                      or "application/json" in content_type
 159                                  ):
 160                                      if not quiet:
 161                                          console.print(
 162                                              f"[green]Detected {transport} transport at {url}[/green]"
 163                                          )
 164                                      return url, transport
 165                      except Exception:
 166                          pass
 167  
 168          except Exception as e:
 169              if not quiet:
 170                  console.print(f"[dim]Could not connect to {url}: {str(e)}[/dim]")
 171              continue
 172  
 173      # If HTTP detection fails, try to guess based on what we know
 174      # Check if port 8013 responds at all
 175      try:
 176          timeout = aiohttp.ClientTimeout(total=2)
 177          async with aiohttp.ClientSession(timeout=timeout) as session:
 178              async with session.get(f"http://{host}:{port}/") as response:
 179                  if response.status == 200:
 180                      # Server is running, probably streamable-http since that's the new default
 181                      default_url = f"http://{host}:{port}/mcp/"
 182                      if not quiet:
 183                          console.print(
 184                              f"[yellow]Server detected but transport unclear, defaulting to streamable-http at {default_url}[/yellow]"
 185                          )
 186                      return default_url, "streamable-http"
 187      except Exception:
 188          pass
 189  
 190      # Final fallback to SSE for backwards compatibility
 191      fallback_url = f"http://{host}:{port}/sse"
 192      if not quiet:
 193          console.print(
 194              f"[yellow]Could not detect transport mode, defaulting to SSE at {fallback_url}[/yellow]"
 195          )
 196      return fallback_url, "sse"
 197  
 198  
 199  def get_server_url_and_transport() -> tuple[str, str]:
 200      """
 201      Get the MCP server URL and transport type from .env file or environment variables
 202  
 203      Returns:
 204          Tuple of (server_url, transport_type)
 205      """
 206      # Try to get from python-decouple (.env file)
 207      try:
 208          host = decouple_config("MCP_SERVER_HOST", default="localhost")
 209          port = decouple_config("MCP_SERVER_PORT", default="8013")
 210  
 211          # Try to detect transport type - this will be resolved in the async context
 212          return host, port
 213      except Exception:
 214          # Fallback to environment variables if decouple fails
 215          if "MCP_SERVER_HOST" in os.environ and "MCP_SERVER_PORT" in os.environ:
 216              host = os.environ["MCP_SERVER_HOST"]
 217              port = os.environ["MCP_SERVER_PORT"]
 218              return host, port
 219  
 220          # Default fallback
 221          return "localhost", "8013"
 222  
 223  
 224  # Calculate token counts for different models
 225  def count_tokens(text: str) -> int:
 226      """Count tokens using tiktoken with cl100k_base encoding (used by most modern models)"""
 227      encoding = tiktoken.get_encoding("cl100k_base")
 228      return len(encoding.encode(text))
 229  
 230  
 231  # Use real pricing imported from constants.py
 232  # Convert from dollars per million tokens to dollars per 1000 tokens for our calculations
 233  MODEL_PRICES = {
 234      model: price_info["input"] / 1000  # Convert from per million to per thousand
 235      for model, price_info in COST_PER_MILLION_TOKENS.items()
 236  }
 237  
 238  
 239  def format_capabilities(capabilities):
 240      """Safely format capabilities object to string for display"""
 241      result = {}
 242      # Check for specific capabilities we know about
 243      if hasattr(capabilities, "tools"):
 244          result["tools"] = "Available" if capabilities.tools else "Not available"
 245      if hasattr(capabilities, "prompts"):
 246          result["prompts"] = "Available" if capabilities.prompts else "Not available"
 247      if hasattr(capabilities, "resources"):
 248          result["resources"] = "Available" if capabilities.resources else "Not available"
 249      if hasattr(capabilities, "logging"):
 250          result["logging"] = "Available" if capabilities.logging else "Not available"
 251      if hasattr(capabilities, "completions"):
 252          result["completions"] = "Available" if capabilities.completions else "Not available"
 253      if hasattr(capabilities, "experimental"):
 254          result["experimental"] = "Available" if capabilities.experimental else "Not available"
 255  
 256      return json.dumps(result, indent=2)
 257  
 258  
 259  async def get_mcp_server_tools_streamable_http(
 260      server_url: str,
 261      include_tools: Optional[List[str]] = None,
 262      console: Console = None,
 263      quiet: bool = False,
 264  ) -> Dict[str, Any]:
 265      """
 266      Connect to an MCP server running in streamable-http mode and fetch all registered tools.
 267  
 268      Args:
 269          server_url: The URL of the running MCP server (should be http://host:port/mcp)
 270          include_tools: Optional list of tool names to include (if None, get all tools)
 271          console: Optional console for output
 272          quiet: If True, only show most important output
 273  
 274      Returns:
 275          Dictionary with server info and tool definitions
 276      """
 277      if console is None:
 278          console = Console()
 279  
 280      if not quiet:
 281          console.print(
 282              f"[bold blue]Connecting to streamable-http MCP server at {server_url}...[/bold blue]"
 283          )
 284  
 285      try:
 286          timeout = aiohttp.ClientTimeout(total=30)
 287          async with aiohttp.ClientSession(timeout=timeout) as session:
 288              # First, try to initialize the MCP connection
 289              init_data = {
 290                  "jsonrpc": "2.0",
 291                  "id": 1,
 292                  "method": "initialize",
 293                  "params": {
 294                      "protocolVersion": "2024-11-05",
 295                      "capabilities": {"roots": {"listChanged": True}},
 296                      "clientInfo": {"name": "mcp-tool-context-estimator", "version": "1.0.0"},
 297                  },
 298              }
 299  
 300              headers = {
 301                  "Content-Type": "application/json",
 302                  "Accept": "application/json, text/event-stream",
 303              }
 304  
 305              if not quiet:
 306                  console.print(
 307                      "[bold blue]Initializing MCP protocol via streamable-http...[/bold blue]"
 308                  )
 309  
 310              async with session.post(server_url, json=init_data, headers=headers) as response:
 311                  if response.status != 200:
 312                      raise Exception(f"Failed to initialize: HTTP {response.status}")
 313  
 314                  # Capture session ID from response headers
 315                  session_id = response.headers.get("mcp-session-id")
 316                  if not session_id:
 317                      raise Exception("No session ID returned from server")
 318  
 319                  # Handle SSE-formatted response
 320                  response_text = await response.text()
 321                  if response.content_type == "text/event-stream":
 322                      # Parse SSE format
 323                      lines = response_text.strip().split("\n")
 324                      json_data = None
 325                      for line in lines:
 326                          if line.startswith("data: "):
 327                              json_data = line[6:]  # Remove 'data: ' prefix
 328                              break
 329                      if json_data:
 330                          init_result = json.loads(json_data)
 331                      else:
 332                          raise Exception("No JSON data found in SSE response")
 333                  else:
 334                      init_result = await response.json()
 335  
 336                  if "error" in init_result:
 337                      raise Exception(f"MCP initialization error: {init_result['error']}")
 338  
 339                  if "result" not in init_result:
 340                      raise Exception("Invalid MCP initialization response")
 341  
 342                  result = init_result["result"]
 343                  server_info = result.get("serverInfo", {})
 344                  server_name = server_info.get("name", "Unknown Server")
 345                  server_version = server_info.get("version", "Unknown Version")
 346  
 347                  if not quiet:
 348                      console.print(
 349                          f"[green]Connected to server:[/green] {server_name} v{server_version}"
 350                      )
 351  
 352                  # Show server capabilities
 353                  capabilities = result.get("capabilities", {})
 354                  if not quiet:
 355                      console.print("[bold blue]Server capabilities:[/bold blue]")
 356                      console.print(json.dumps(capabilities, indent=2))
 357  
 358                  # Check if tools capability is present
 359                  has_tools = capabilities.get("tools", False)
 360  
 361                  if not quiet and not has_tools:
 362                      console.print(
 363                          "[bold yellow]Warning: This server does not advertise tools capability![/bold yellow]"
 364                      )
 365                      console.print(
 366                          "The server might not support tool listing, but we'll try anyway."
 367                      )
 368  
 369                  # Get server instructions (from server info)
 370                  server_instructions = server_info.get("instructions", "")
 371                  if server_instructions and not quiet:
 372                      console.print(
 373                          f"[green]Server provides instructions of length {len(server_instructions):,} chars[/green]"
 374                      )
 375                  elif not quiet:
 376                      console.print("[yellow]Server does not provide instructions[/yellow]")
 377  
 378              # Update headers to include session ID for subsequent requests
 379              headers["mcp-session-id"] = session_id
 380  
 381              # Send initialized notification
 382              init_notify_data = {"jsonrpc": "2.0", "method": "notifications/initialized"}
 383  
 384              async with session.post(server_url, json=init_notify_data, headers=headers) as response:
 385                  # This is a notification, so we don't expect a response
 386                  pass
 387  
 388              # Now list the tools
 389              if not quiet:
 390                  console.print("[bold blue]Retrieving tool definitions...[/bold blue]")
 391  
 392              list_tools_data = {"jsonrpc": "2.0", "id": 2, "method": "tools/list"}
 393  
 394              async with session.post(server_url, json=list_tools_data, headers=headers) as response:
 395                  if response.status != 200:
 396                      raise Exception(f"Failed to list tools: HTTP {response.status}")
 397  
 398                  # Handle SSE-formatted response for tools list
 399                  response_text = await response.text()
 400                  if response.content_type == "text/event-stream":
 401                      # Parse SSE format
 402                      lines = response_text.strip().split("\n")
 403                      json_data = None
 404                      for line in lines:
 405                          if line.startswith("data: "):
 406                              json_data = line[6:]  # Remove 'data: ' prefix
 407                              break
 408                      if json_data:
 409                          tools_result = json.loads(json_data)
 410                      else:
 411                          raise Exception("No JSON data found in SSE response")
 412                  else:
 413                      tools_result = await response.json()
 414  
 415                  if "error" in tools_result:
 416                      raise Exception(f"MCP tools/list error: {tools_result['error']}")
 417  
 418                  if "result" not in tools_result:
 419                      raise Exception("Invalid MCP tools/list response")
 420  
 421                  tools_data = tools_result["result"]
 422                  tools = tools_data.get("tools", [])
 423  
 424                  # Count tools
 425                  tool_count = len(tools) if tools else 0
 426                  if not quiet:
 427                      console.print(f"[green]Found {tool_count} tools[/green]")
 428  
 429                  if tool_count == 0:
 430                      console.print("[bold yellow]No tools found on the server.[/bold yellow]")
 431                      return {}
 432  
 433                  # Convert tools to their JSON representation (exactly as sent to LLMs)
 434                  tool_defs = []
 435  
 436                  # Add debug information about descriptions
 437                  has_descriptions = 0
 438                  total_desc_length = 0
 439  
 440                  for tool in tools:
 441                      # Convert to dict that matches the MCP protocol spec for tool definitions
 442                      tool_dict = {"name": tool.get("name"), "inputSchema": tool.get("inputSchema")}
 443  
 444                      # Debug description handling
 445                      if tool.get("description"):
 446                          desc = tool["description"]
 447                          has_descriptions += 1
 448                          total_desc_length += len(desc)
 449                          if not quiet:
 450                              console.print(
 451                                  f"[dim]Tool '{tool['name']}' has description ({len(desc):,} chars)[/dim]"
 452                              )
 453                          tool_dict["description"] = desc
 454                      elif not quiet:
 455                          console.print(
 456                              f"[dim yellow]Tool '{tool['name']}' has no description[/dim yellow]"
 457                          )
 458  
 459                      if tool.get("annotations"):
 460                          tool_dict["annotations"] = tool["annotations"]
 461  
 462                      tool_defs.append(tool_dict)
 463  
 464                  # Print description statistics
 465                  if not quiet:
 466                      console.print(
 467                          f"[green]{has_descriptions} out of {tool_count} tools have descriptions[/green]"
 468                      )
 469                      if has_descriptions > 0:
 470                          console.print(
 471                              f"[green]Average description length: {total_desc_length / has_descriptions:,.1f} chars[/green]"
 472                          )
 473  
 474                  # Include server info in the result to be used for creating the complete LLM prompt
 475                  return {
 476                      "tools": tool_defs,
 477                      "server_name": server_name,
 478                      "server_version": server_version,
 479                      "server_instructions": server_instructions,
 480                  }
 481  
 482      except Exception as e:
 483          console.print(
 484              f"[bold red]Error connecting to streamable-http MCP server:[/bold red] {str(e)}"
 485          )
 486          if not quiet:
 487              console.print("[bold yellow]Stack trace:[/bold yellow]")
 488              console.print(traceback.format_exc())
 489          raise
 490  
 491  
 492  async def get_mcp_server_tools_stdio(
 493      command: str,
 494      args: Optional[List[str]] = None,
 495      include_tools: Optional[List[str]] = None,
 496      console: Console = None,
 497      quiet: bool = False,
 498  ) -> Dict[str, Any]:
 499      """
 500      Connect to an MCP server via stdio transport and fetch all registered tools.
 501  
 502      Args:
 503          command: Command to run the MCP server
 504          args: Additional arguments for the command
 505          include_tools: Optional list of tool names to include (if None, get all tools)
 506          console: Optional console for output
 507          quiet: If True, only show most important output
 508  
 509      Returns:
 510          Dictionary with server info and tool definitions
 511      """
 512      if console is None:
 513          console = Console()
 514  
 515      if not quiet:
 516          console.print(
 517              f"[bold blue]Connecting to MCP server via stdio: {command} {' '.join(args or [])}[/bold blue]"
 518          )
 519  
 520      try:
 521          # Build the command array
 522          cmd = command.split() if isinstance(command, str) else [command]
 523          if args:
 524              cmd.extend(args)
 525  
 526          async with stdio_client(cmd) as (read, write):
 527              # Create a client session
 528              async with ClientSession(read, write) as session:
 529                  # Initialize connection to server
 530                  if not quiet:
 531                      console.print("[bold blue]Initializing MCP protocol via stdio...[/bold blue]")
 532                  init_result = await session.initialize()
 533  
 534                  # Get server info
 535                  server_name = init_result.serverInfo.name
 536                  server_version = init_result.serverInfo.version
 537                  if not quiet:
 538                      console.print(
 539                          f"[green]Connected to server:[/green] {server_name} v{server_version}"
 540                      )
 541  
 542                  # Show server capabilities safely
 543                  if not quiet:
 544                      console.print("[bold blue]Server capabilities:[/bold blue]")
 545                      console.print(format_capabilities(init_result.capabilities))
 546  
 547                  # Check if tools capability is present
 548                  has_tools = False
 549                  if hasattr(init_result.capabilities, "tools") and init_result.capabilities.tools:
 550                      has_tools = True
 551  
 552                  if not quiet and not has_tools:
 553                      console.print(
 554                          "[bold yellow]Warning: This server does not advertise tools capability![/bold yellow]"
 555                      )
 556                      console.print(
 557                          "The server might not support tool listing, but we'll try anyway."
 558                      )
 559  
 560                  # Get server instructions (will be used in the LLM prompt)
 561                  server_instructions = ""
 562                  if hasattr(init_result, "instructions") and init_result.instructions:
 563                      server_instructions = init_result.instructions
 564                      if not quiet:
 565                          console.print(
 566                              f"[green]Server provides instructions of length {len(server_instructions):,} chars[/green]"
 567                          )
 568                  elif not quiet:
 569                      console.print("[yellow]Server does not provide instructions[/yellow]")
 570  
 571                  # List available tools
 572                  if not quiet:
 573                      console.print("[bold blue]Retrieving tool definitions...[/bold blue]")
 574                  try:
 575                      tools_result = await session.list_tools()
 576  
 577                      # Handle ListToolsResult object
 578                      tools = []
 579                      if hasattr(tools_result, "tools"):
 580                          tools = tools_result.tools
 581                      else:
 582                          if not quiet:
 583                              console.print(
 584                                  "[bold yellow]Tools result doesn't have expected structure. Trying alternatives...[/bold yellow]"
 585                              )
 586                          if hasattr(tools_result, "__iter__"):
 587                              tools = list(tools_result)
 588                          else:
 589                              if not quiet:
 590                                  console.print(
 591                                      f"[bold yellow]Tools result type: {type(tools_result)}[/bold yellow]"
 592                                  )
 593                                  console.print(f"Tools result attributes: {dir(tools_result)}")
 594                              raise ValueError("Unable to extract tools from server response")
 595  
 596                      # Count tools
 597                      tool_count = len(tools) if tools else 0
 598                      if not quiet:
 599                          console.print(f"[green]Found {tool_count} tools[/green]")
 600  
 601                      if tool_count == 0:
 602                          console.print("[bold yellow]No tools found on the server.[/bold yellow]")
 603                          return {}
 604  
 605                      # Convert tools to their JSON representation (exactly as sent to LLMs)
 606                      tool_defs = []
 607  
 608                      # Add debug information about descriptions
 609                      has_descriptions = 0
 610                      total_desc_length = 0
 611  
 612                      for tool in tools:
 613                          # Convert to dict that matches the MCP protocol spec for tool definitions
 614                          tool_dict = {"name": tool.name, "inputSchema": tool.inputSchema}
 615  
 616                          # Debug description handling
 617                          if hasattr(tool, "description") and tool.description:
 618                              desc = tool.description
 619                              has_descriptions += 1
 620                              total_desc_length += len(desc)
 621                              if not quiet:
 622                                  console.print(
 623                                      f"[dim]Tool '{tool.name}' has description ({len(desc):,} chars)[/dim]"
 624                                  )
 625                              tool_dict["description"] = desc
 626                          elif not quiet:
 627                              console.print(
 628                                  f"[dim yellow]Tool '{tool.name}' has no description[/dim yellow]"
 629                              )
 630  
 631                          if hasattr(tool, "annotations") and tool.annotations:
 632                              tool_dict["annotations"] = tool.annotations
 633  
 634                          tool_defs.append(tool_dict)
 635  
 636                      # Print description statistics
 637                      if not quiet:
 638                          console.print(
 639                              f"[green]{has_descriptions} out of {tool_count} tools have descriptions[/green]"
 640                          )
 641                          if has_descriptions > 0:
 642                              console.print(
 643                                  f"[green]Average description length: {total_desc_length / has_descriptions:,.1f} chars[/green]"
 644                              )
 645  
 646                      # Include server info in the result to be used for creating the complete LLM prompt
 647                      return {
 648                          "tools": tool_defs,
 649                          "server_name": server_name,
 650                          "server_version": server_version,
 651                          "server_instructions": server_instructions,
 652                      }
 653                  except Exception as e:
 654                      console.print(f"[bold red]Error listing tools:[/bold red] {str(e)}")
 655                      if not quiet:
 656                          console.print("[bold yellow]Stack trace:[/bold yellow]")
 657                          console.print(traceback.format_exc())
 658                      raise
 659      except Exception as e:
 660          console.print(f"[bold red]Error connecting to MCP server via stdio:[/bold red] {str(e)}")
 661          if not quiet:
 662              console.print("[bold yellow]Stack trace:[/bold yellow]")
 663              console.print(traceback.format_exc())
 664          raise
 665  
 666  
 667  async def get_mcp_server_tools(
 668      server_url: str,
 669      transport_type: str,
 670      include_tools: Optional[List[str]] = None,
 671      console: Console = None,
 672      quiet: bool = False,
 673      command: Optional[str] = None,
 674      args: Optional[List[str]] = None,
 675  ) -> Dict[str, Any]:
 676      """
 677      Connect to an already running MCP server and fetch all registered tools.
 678  
 679      Args:
 680          server_url: The URL of the running MCP server (ignored for stdio)
 681          transport_type: The transport type ('sse', 'streamable-http', or 'stdio')
 682          include_tools: Optional list of tool names to include (if None, get all tools)
 683          console: Optional console for output
 684          quiet: If True, only show most important output
 685          command: Command to run for stdio transport
 686          args: Additional arguments for stdio command
 687  
 688      Returns:
 689          Dictionary with server info and tool definitions
 690      """
 691      if console is None:
 692          console = Console()
 693  
 694      if transport_type == "streamable-http":
 695          return await get_mcp_server_tools_streamable_http(server_url, include_tools, console, quiet)
 696      elif transport_type == "stdio":
 697          if not command:
 698              raise ValueError("Command must be provided for stdio transport")
 699          return await get_mcp_server_tools_stdio(command, args, include_tools, console, quiet)
 700  
 701      # Original SSE implementation
 702      if not quiet:
 703          console.print(f"[bold blue]Connecting to MCP server at {server_url}...[/bold blue]")
 704  
 705      try:
 706          async with sse_client(server_url) as (read, write):
 707              # Create a client session
 708              async with ClientSession(read, write) as session:
 709                  # Initialize connection to server
 710                  if not quiet:
 711                      console.print("[bold blue]Initializing MCP protocol...[/bold blue]")
 712                  init_result = await session.initialize()
 713  
 714                  # Get server info
 715                  server_name = init_result.serverInfo.name
 716                  server_version = init_result.serverInfo.version
 717                  if not quiet:
 718                      console.print(
 719                          f"[green]Connected to server:[/green] {server_name} v{server_version}"
 720                      )
 721  
 722                  # Show server capabilities safely
 723                  if not quiet:
 724                      console.print("[bold blue]Server capabilities:[/bold blue]")
 725                      console.print(format_capabilities(init_result.capabilities))
 726  
 727                  # Check if tools capability is present
 728                  has_tools = False
 729                  if hasattr(init_result.capabilities, "tools") and init_result.capabilities.tools:
 730                      has_tools = True
 731  
 732                  if not quiet and not has_tools:
 733                      console.print(
 734                          "[bold yellow]Warning: This server does not advertise tools capability![/bold yellow]"
 735                      )
 736                      console.print(
 737                          "The server might not support tool listing, but we'll try anyway."
 738                      )
 739  
 740                  # Get server instructions (will be used in the LLM prompt)
 741                  server_instructions = ""
 742                  if hasattr(init_result, "instructions") and init_result.instructions:
 743                      server_instructions = init_result.instructions
 744                      if not quiet:
 745                          console.print(
 746                              f"[green]Server provides instructions of length {len(server_instructions):,} chars[/green]"
 747                          )
 748                  elif not quiet:
 749                      console.print("[yellow]Server does not provide instructions[/yellow]")
 750  
 751                  # List available tools
 752                  if not quiet:
 753                      console.print("[bold blue]Retrieving tool definitions...[/bold blue]")
 754                  try:
 755                      tools_result = await session.list_tools()
 756  
 757                      # Handle ListToolsResult object
 758                      # The result should have a 'tools' attribute which is the actual list
 759                      tools = []
 760                      if hasattr(tools_result, "tools"):
 761                          tools = tools_result.tools
 762                      else:
 763                          # If it doesn't have a tools attribute, try to access it as a list directly
 764                          # or check other common patterns
 765                          if not quiet:
 766                              console.print(
 767                                  "[bold yellow]Tools result doesn't have expected structure. Trying alternatives...[/bold yellow]"
 768                              )
 769                          if hasattr(tools_result, "__iter__"):
 770                              tools = list(tools_result)
 771                          else:
 772                              # Print the object to help diagnose
 773                              if not quiet:
 774                                  console.print(
 775                                      f"[bold yellow]Tools result type: {type(tools_result)}[/bold yellow]"
 776                                  )
 777                                  console.print(f"Tools result attributes: {dir(tools_result)}")
 778                              raise ValueError("Unable to extract tools from server response")
 779  
 780                      # Count tools
 781                      tool_count = len(tools) if tools else 0
 782                      if not quiet:
 783                          console.print(f"[green]Found {tool_count} tools[/green]")
 784  
 785                      if tool_count == 0:
 786                          console.print("[bold yellow]No tools found on the server.[/bold yellow]")
 787                          return {}
 788  
 789                      # Convert tools to their JSON representation (exactly as sent to LLMs)
 790                      tool_defs = []
 791  
 792                      # Add debug information about descriptions
 793                      has_descriptions = 0
 794                      total_desc_length = 0
 795  
 796                      for tool in tools:
 797                          # Convert to dict that matches the MCP protocol spec for tool definitions
 798                          tool_dict = {"name": tool.name, "inputSchema": tool.inputSchema}
 799  
 800                          # Debug description handling
 801                          if hasattr(tool, "description") and tool.description:
 802                              desc = tool.description
 803                              has_descriptions += 1
 804                              total_desc_length += len(desc)
 805                              if not quiet:
 806                                  console.print(
 807                                      f"[dim]Tool '{tool.name}' has description ({len(desc):,} chars)[/dim]"
 808                                  )
 809                              tool_dict["description"] = desc
 810                          elif not quiet:
 811                              console.print(
 812                                  f"[dim yellow]Tool '{tool.name}' has no description[/dim yellow]"
 813                              )
 814  
 815                          if hasattr(tool, "annotations") and tool.annotations:
 816                              tool_dict["annotations"] = tool.annotations
 817  
 818                          tool_defs.append(tool_dict)
 819  
 820                      # Print description statistics
 821                      if not quiet:
 822                          console.print(
 823                              f"[green]{has_descriptions} out of {tool_count} tools have descriptions[/green]"
 824                          )
 825                          if has_descriptions > 0:
 826                              console.print(
 827                                  f"[green]Average description length: {total_desc_length / has_descriptions:,.1f} chars[/green]"
 828                              )
 829  
 830                      # Include server info in the result to be used for creating the complete LLM prompt
 831                      return {
 832                          "tools": tool_defs,
 833                          "server_name": server_name,
 834                          "server_version": server_version,
 835                          "server_instructions": server_instructions,
 836                      }
 837                  except Exception as e:
 838                      console.print(f"[bold red]Error listing tools:[/bold red] {str(e)}")
 839                      if not quiet:
 840                          console.print("[bold yellow]Stack trace:[/bold yellow]")
 841                          console.print(traceback.format_exc())
 842  
 843                      # Try retrieving server details to help diagnose
 844                      if not quiet:
 845                          try:
 846                              console.print(
 847                                  "[bold blue]Getting additional server information...[/bold blue]"
 848                              )
 849                              if (
 850                                  hasattr(init_result.capabilities, "prompts")
 851                                  and init_result.capabilities.prompts
 852                              ):
 853                                  prompts_result = await session.list_prompts()
 854                                  prompt_count = 0
 855                                  if hasattr(prompts_result, "prompts"):
 856                                      prompt_count = len(prompts_result.prompts)
 857                                  console.print(f"Server has {prompt_count} prompts available")
 858                          except Exception:
 859                              pass
 860  
 861                      raise
 862      except Exception as e:
 863          console.print(f"[bold red]Error connecting to MCP server:[/bold red] {str(e)}")
 864          if not quiet:
 865              console.print("[bold yellow]Stack trace:[/bold yellow]")
 866              console.print(traceback.format_exc())
 867  
 868          # Provide guidance based on the error
 869          if "Connection refused" in str(e):
 870              console.print(
 871                  "[bold yellow]The server doesn't appear to be running at the specified URL.[/bold yellow]"
 872              )
 873              console.print(
 874                  "Make sure your MCP server is running and available at the URL you specified."
 875              )
 876          elif "401" in str(e):
 877              console.print(
 878                  "[bold yellow]Authentication error - the server requires credentials.[/bold yellow]"
 879              )
 880          elif "404" in str(e):
 881              console.print("[bold yellow]The server endpoint was not found.[/bold yellow]")
 882              console.print("Check if you need to use a different URL path (e.g., /sse or /mcp)")
 883              console.print("Try using /sse instead of just the port number.")
 884  
 885          sys.exit(1)
 886  
 887  
 888  def create_full_tool_registration_prompt(server_info, tools=None, quiet=False):
 889      """
 890      Create a full, realistic prompt as would be sent to an LLM when registering MCP tools.
 891  
 892      This generates the exact format used in the MCP client's format_tools_for_anthropic method
 893      which sends tools to the Anthropic API.
 894  
 895      Args:
 896          server_info: Dictionary with server information
 897          tools: List of tool definitions to include (if None, use all tools)
 898          quiet: If True, only show most important output
 899  
 900      Returns:
 901          String with the serialized JSON representation of tools as sent to the API
 902      """
 903      if tools is None:
 904          tools = server_info["tools"]
 905  
 906      # The actual format sent to Anthropic API is just:
 907      # {
 908      #   "name": sanitized_name,
 909      #   "input_schema": tool.input_schema,
 910      #   "description": tool.description  # only if present
 911      # }
 912      formatted_tools = []
 913  
 914      # Track description statistics
 915      desc_count = 0
 916      total_desc_len = 0
 917  
 918      console = Console()
 919  
 920      for tool in tools:
 921          # Create the tool dict exactly as in format_tools_for_anthropic
 922          tool_dict_for_api = {"name": tool["name"], "input_schema": tool["inputSchema"]}
 923          if SHOW_DESCRIPTIONS:
 924              # Add description only if it exists and is not empty
 925              if "description" in tool and tool["description"]:
 926                  desc = tool["description"]
 927                  tool_dict_for_api["description"] = desc
 928                  desc_count += 1
 929                  total_desc_len += len(desc)
 930                  if not quiet and len(desc) > 100:
 931                      # Show abbreviated version for long descriptions
 932                      abbrev = desc[:50] + "..." + desc[-50:]
 933                      console.print(f"[dim]Including description for {tool['name']}: {abbrev}[/dim]")
 934                  elif not quiet:
 935                      console.print(f"[dim]Including description for {tool['name']}: {desc}[/dim]")
 936              elif not quiet:
 937                  console.print(f"[dim yellow]No description for {tool['name']}[/dim yellow]")
 938  
 939          formatted_tools.append(tool_dict_for_api)
 940  
 941      # Final description statistics - ALWAYS show these since they're part of the requested output
 942      console.print(
 943          f"[green]Included {desc_count} descriptions out of {len(tools)} tools in final output[/green]"
 944      )
 945      if desc_count > 0:
 946          console.print(
 947              f"[green]Average description length in final output: {total_desc_len / desc_count:,.1f} chars[/green]"
 948          )
 949  
 950      # Return the serialized JSON that would be sent to the API
 951      return json.dumps(formatted_tools, indent=2)
 952  
 953  
 954  def format_tool_for_llm(tool: Dict[str, Any]) -> str:
 955      """
 956      Format a tool definition exactly as it would be presented to an LLM.
 957      This should match the format used in actual LLM prompt construction.
 958      """
 959      # This is how tools are typically formatted for LLMs in the JSON format
 960      return json.dumps(tool, indent=2)
 961  
 962  
 963  def analyze_tools_token_usage(
 964      current_tools: Dict[str, Any], all_tools: Dict[str, Any], quiet: bool = False
 965  ):
 966      """
 967      Analyze token usage for a complete MCP tool registration prompt
 968  
 969      Args:
 970          current_tools: Current active toolset info
 971          all_tools: Complete toolset info (with --load-all-tools)
 972          quiet: If True, only show most important output
 973      """
 974      console = Console()
 975  
 976      # Format tools as they would be sent to an LLM
 977      current_tools_subset = current_tools["tools"]
 978      all_tools_subset = all_tools["tools"]
 979  
 980      # Determine if we're likely comparing the same set vs different sets
 981      same_toolsets = len(current_tools_subset) == len(all_tools_subset)
 982      if same_toolsets and not quiet:
 983          console.print("[yellow]Warning: Current tool count equals all tools count.[/yellow]")
 984          console.print(
 985              "[yellow]This suggests the server is already running with --load-all-tools[/yellow]"
 986          )
 987  
 988      # Adjust column labels based on what we're comparing
 989      current_label = "Current Tools"
 990      all_label = "All Tools"
 991  
 992      # Get JSON representations
 993      current_tools_json = "\n".join(format_tool_for_llm(tool) for tool in current_tools_subset)
 994      all_tools_json = "\n".join(format_tool_for_llm(tool) for tool in all_tools_subset)
 995  
 996      # Create the full prompts
 997      current_tools_prompt = create_full_tool_registration_prompt(
 998          current_tools, current_tools_subset, quiet
 999      )
1000      all_tools_prompt = create_full_tool_registration_prompt(all_tools, all_tools_subset, quiet)
1001  
1002      # Calculate sizes for raw JSON
1003      current_tools_size_kb = len(current_tools_json.encode("utf-8")) / 1024
1004      all_tools_size_kb = len(all_tools_json.encode("utf-8")) / 1024
1005  
1006      # Calculate sizes for full prompts
1007      current_tools_prompt_size_kb = len(current_tools_prompt.encode("utf-8")) / 1024
1008      all_tools_prompt_size_kb = len(all_tools_prompt.encode("utf-8")) / 1024
1009  
1010      # Count tokens for raw JSON
1011      current_tools_tokens = count_tokens(current_tools_json)
1012      all_tools_tokens = count_tokens(all_tools_json)
1013  
1014      # Count tokens for full prompts
1015      current_tools_prompt_tokens = count_tokens(current_tools_prompt)
1016      all_tools_prompt_tokens = count_tokens(all_tools_prompt)
1017  
1018      # Calculate costs for different models (using full prompt tokens)
1019      current_tools_costs = {
1020          model: (price * current_tools_prompt_tokens / 1000) for model, price in MODEL_PRICES.items()
1021      }
1022      all_tools_costs = {
1023          model: (price * all_tools_prompt_tokens / 1000) for model, price in MODEL_PRICES.items()
1024      }
1025  
1026      # Save the complete, untruncated text to files
1027      with open("current_tools_sent_to_llm.json", "w", encoding="utf-8") as f:
1028          f.write(current_tools_prompt)
1029      console.print("[green]Saved current tools JSON to current_tools_sent_to_llm.json[/green]")
1030  
1031      with open("all_tools_sent_to_llm.json", "w", encoding="utf-8") as f:
1032          f.write(all_tools_prompt)
1033      console.print("[green]Saved all tools JSON to all_tools_sent_to_llm.json[/green]\n\n")
1034  
1035      # Create data for display - ensure the data is correct and consistent
1036      data = {
1037          "current_tools": {
1038              "count": len(current_tools_subset),
1039              "raw_size_kb": current_tools_size_kb,
1040              "raw_tokens": current_tools_tokens,
1041              "full_size_kb": current_tools_prompt_size_kb,
1042              "full_tokens": current_tools_prompt_tokens,
1043              "costs": current_tools_costs,
1044          },
1045          "all_tools": {
1046              "count": len(all_tools_subset),
1047              "raw_size_kb": all_tools_size_kb,
1048              "raw_tokens": all_tools_tokens,
1049              "full_size_kb": all_tools_prompt_size_kb,
1050              "full_tokens": all_tools_prompt_tokens,
1051              "costs": all_tools_costs,
1052          },
1053      }
1054  
1055      # Create comparison table
1056      table = Table(title="Tool Registration Token Usage")
1057  
1058      # Add columns - including percentage column
1059      table.add_column("Metric", style="white")
1060      table.add_column(current_label, style="cyan")
1061      table.add_column(all_label, style="magenta")
1062      table.add_column("Difference", style="yellow")
1063      table.add_column(f"{current_label} as % of {all_label}", style="green")
1064  
1065      # SECTION 1: Number of Tools
1066      # Calculate percentage for count
1067      count_percentage = (
1068          (data["current_tools"]["count"] / data["all_tools"]["count"]) * 100
1069          if data["all_tools"]["count"] > 0
1070          else 100
1071      )
1072  
1073      # Add rows - keep consistent format with other rows for the number of tools
1074      table.add_row(
1075          "Number of Tools",
1076          str(data["current_tools"]["count"]),
1077          str(data["all_tools"]["count"]),
1078          str(data["current_tools"]["count"] - data["all_tools"]["count"]),
1079          f"{count_percentage:.2f}%",
1080      )
1081  
1082      # Add a divider after Number of Tools
1083      table.add_section()
1084  
1085      # SECTION 2: Full Prompt stats
1086      # Calculate percentage for full prompt size
1087      full_size_percentage = (
1088          (data["current_tools"]["full_size_kb"] / data["all_tools"]["full_size_kb"]) * 100
1089          if data["all_tools"]["full_size_kb"] > 0
1090          else 100
1091      )
1092  
1093      table.add_row(
1094          "Full Prompt Size (KB)",
1095          f"{data['current_tools']['full_size_kb']:,.2f}",
1096          f"{data['all_tools']['full_size_kb']:,.2f}",
1097          f"{data['current_tools']['full_size_kb'] - data['all_tools']['full_size_kb']:,.2f}",
1098          f"{full_size_percentage:.2f}%",
1099      )
1100  
1101      # Calculate percentage for full tokens
1102      full_tokens_percentage = (
1103          (data["current_tools"]["full_tokens"] / data["all_tools"]["full_tokens"]) * 100
1104          if data["all_tools"]["full_tokens"] > 0
1105          else 100
1106      )
1107  
1108      table.add_row(
1109          "Full Prompt Token Count",
1110          f"{data['current_tools']['full_tokens']:,}",
1111          f"{data['all_tools']['full_tokens']:,}",
1112          f"{data['current_tools']['full_tokens'] - data['all_tools']['full_tokens']:,}",
1113          f"{full_tokens_percentage:.2f}%",
1114      )
1115  
1116      # Add a divider after Full Prompt stats
1117      table.add_section()
1118  
1119      # SECTION 3: Model costs
1120      # Specify the models to include and their order
1121      models_to_include = [
1122          "claude-3-7-sonnet-20250219",
1123          "gpt-4.1",
1124          "gemini-2.5-pro-preview-03-25",
1125          "grok-3-latest",
1126      ]
1127  
1128      # Add cost rows for selected models only, in specified order
1129      for model in models_to_include:
1130          if model in MODEL_PRICES:
1131              current_cost = data["current_tools"]["costs"][model]
1132              all_cost = data["all_tools"]["costs"][model]
1133              diff_cost = current_cost - all_cost
1134  
1135              # Calculate percentage
1136              cost_percentage = (current_cost / all_cost) * 100 if all_cost > 0 else 100
1137  
1138              table.add_row(
1139                  f"Cost ({model})",
1140                  f"${current_cost:.4f}",
1141                  f"${all_cost:.4f}",
1142                  f"${diff_cost:.4f}",
1143                  f"{cost_percentage:.2f}%",
1144              )
1145  
1146      # Print table
1147      console.print(table)
1148  
1149      # Print raw data as JSON (only if not in quiet mode)
1150      if not quiet:
1151          console.print("\nRaw token usage data:")
1152          console.print(json.dumps(data, indent=2))
1153  
1154      return data
1155  
1156  
1157  async def get_complete_toolset(quiet: bool = False) -> List[Dict[str, Any]]:
1158      """
1159      Generate the complete toolset that would be available with --load-all-tools
1160  
1161      This uses a list of tool names read from a file generated by the server.
1162      If the file doesn't exist, it will use a list of common tools from the current server.
1163  
1164      Args:
1165          quiet: If True, only show most important output
1166  
1167      Returns:
1168          Dictionary with server info and simulated complete toolset
1169      """
1170      console = Console()
1171      if not quiet:
1172          console.print("[bold blue]Analyzing complete toolset (--load-all-tools)[/bold blue]")
1173  
1174      # First get the current server's tools to extract real descriptions where possible
1175      try:
1176          # Get server connection details
1177          host, port = get_server_url_and_transport()
1178          server_url, transport_type = await detect_server_transport(host, port, quiet=quiet)
1179          current_tools_info = await get_mcp_server_tools(
1180              server_url, transport_type, quiet=quiet, command=None, args=None
1181          )
1182          current_tools = (
1183              {tool["name"]: tool for tool in current_tools_info["tools"]}
1184              if current_tools_info
1185              else {}
1186          )
1187          if not quiet:
1188              console.print(
1189                  f"[green]Retrieved {len(current_tools)} tools from current server to use their real descriptions[/green]"
1190              )
1191      except Exception as e:
1192          if not quiet:
1193              console.print(f"[yellow]Could not get current tools: {str(e)}[/yellow]")
1194          current_tools = {}
1195  
1196      # Read tool names from file created by the server
1197      all_tool_names = read_tool_names_from_file(quiet=quiet)
1198  
1199      # If no tools found in file, use the tools we got from the server
1200      if not all_tool_names and current_tools:
1201          if not quiet:
1202              console.print(
1203                  "[yellow]No tools found in file. Using current server tools and adding some common ones.[/yellow]"
1204              )
1205          all_tool_names = list(current_tools.keys())
1206  
1207          # Add some common tool names that might not be in the current server
1208          additional_tools = [
1209              "excel_create_workbook",
1210              "excel_open_workbook",
1211              "excel_add_worksheet",
1212              "excel_set_cell_value",
1213              "excel_get_cell_value",
1214              "excel_save_workbook",
1215              "excel_get_worksheet_names",
1216              "excel_create_chart",
1217              "excel_set_range_format",
1218              "smart_browser.autopilot",
1219              "smart_browser.parallel",
1220              "smart_browser.download_site_pdfs",
1221              "generate_image",
1222              "analyze_image",
1223              "transcribe_audio",
1224          ]
1225  
1226          # Add them if not already present
1227          for tool in additional_tools:
1228              if tool not in all_tool_names:
1229                  all_tool_names.append(tool)
1230  
1231      if not quiet:
1232          console.print(
1233              f"[green]Using complete list of {len(all_tool_names)} tools for all-tools mode[/green]"
1234          )
1235  
1236      # Create tool entries based on real data
1237      tool_defs = []
1238  
1239      for tool_name in all_tool_names:
1240          # First check if we have real data for this tool
1241          if tool_name in current_tools:
1242              # Use the actual tool definition from the server
1243              tool_def = current_tools[tool_name]
1244              if not quiet:
1245                  console.print(f"[dim]Using real definition for tool '{tool_name}'[/dim]")
1246          else:
1247              # Create a definition with a realistic description based on the tool name
1248              tool_desc = (
1249                  f"The {tool_name} tool provides functionality for {tool_name.replace('_', ' ')}. "
1250                  + "This would be the actual docstring from the function when loaded with --load-all-tools."
1251              )
1252  
1253              # Create a basic definition
1254              tool_def = {
1255                  "name": tool_name,
1256                  "inputSchema": {
1257                      "type": "object",
1258                      "properties": {
1259                          "param1": {"type": "string", "description": "First parameter"},
1260                          "param2": {"type": "string", "description": "Second parameter"},
1261                      },
1262                      "required": ["param1"],
1263                  },
1264                  "description": tool_desc,
1265              }
1266              if not quiet:
1267                  console.print(
1268                      f"[dim yellow]Created placeholder for tool '{tool_name}'[/dim yellow]"
1269                  )
1270  
1271          tool_defs.append(tool_def)
1272  
1273      # Return a similar structure to what get_mcp_server_tools returns
1274      return {
1275          "tools": tool_defs,
1276          "server_name": "Ultimate MCP Server (with --load-all-tools)",
1277          "server_version": "1.6.0",
1278          "server_instructions": """This server provides access to the complete set of tools available in the Ultimate MCP Server.
1279  When running with --load-all-tools, all tools from all categories are available, including:
1280  - Completion tools for text generation
1281  - Provider tools for model management
1282  - Filesystem tools for file operations
1283  - Optimization tools for cost and performance
1284  - Text processing tools for manipulating text
1285  - Meta tools for accessing tool information
1286  - Search tools for querying databases
1287  - Browser automation tools
1288  - Web research tools
1289  - HTML processing tools
1290  - Extraction tools
1291  - SQL database tools
1292  - Document processing tools
1293  - Audio transcription tools
1294  - Excel spreadsheet tools
1295  - OCR tools
1296  - Sentiment analysis tools
1297  """,
1298      }
1299  
1300  
1301  def parse_args():
1302      """Parse command line arguments"""
1303      parser = argparse.ArgumentParser(description="MCP Tool Context Estimator")
1304      parser.add_argument(
1305          "--url", default=None, help="URL of the MCP server (default: auto-detected)"
1306      )
1307      parser.add_argument(
1308          "--transport",
1309          default=None,
1310          choices=["sse", "streamable-http", "stdio"],
1311          help="Force specific transport type (default: auto-detect)",
1312      )
1313      parser.add_argument(
1314          "--command",
1315          default=None,
1316          help="Command to run for stdio transport (e.g., 'python -m ultimate_mcp_server')",
1317      )
1318      parser.add_argument(
1319          "--args", default=None, nargs="*", help="Additional arguments for stdio command"
1320      )
1321      parser.add_argument(
1322          "--no-all-tools", action="store_true", help="Skip comparison with all tools"
1323      )
1324      parser.add_argument(
1325          "--quiet",
1326          "-q",
1327          action="store_true",
1328          help="Only show most important information and final table",
1329      )
1330      return parser.parse_args()
1331  
1332  
1333  async def main():
1334      """Main function"""
1335      console = Console()
1336      args = parse_args()
1337  
1338      # Handle stdio transport
1339      if args.transport == "stdio":
1340          if not args.command:
1341              console.print("[bold red]Error: --command is required for stdio transport[/bold red]")
1342              console.print("Example: --transport stdio --command 'python -m ultimate_mcp_server'")
1343              sys.exit(1)
1344  
1345          server_url = None  # Not used for stdio
1346          transport_type = "stdio"
1347          command = args.command
1348          stdio_args = args.args or []
1349  
1350          if not args.quiet:
1351              console.print(
1352                  f"[blue]Using stdio transport with command: {command} {' '.join(stdio_args)}[/blue]"
1353              )
1354      else:
1355          # Get server connection details for HTTP transports
1356          if args.url:
1357              # Parse URL to extract host and port
1358              import urllib.parse
1359  
1360              parsed = urllib.parse.urlparse(args.url)
1361              host = parsed.hostname or "localhost"
1362              port = str(parsed.port or 8013)
1363              if args.transport:
1364                  transport_type = args.transport
1365                  if transport_type == "sse":
1366                      server_url = f"http://{host}:{port}/sse"
1367                  else:  # streamable-http
1368                      server_url = f"http://{host}:{port}/mcp/"
1369              else:
1370                  # Auto-detect transport for manually specified URL
1371                  server_url, transport_type = await detect_server_transport(
1372                      host, port, quiet=args.quiet
1373                  )
1374          else:
1375              # Auto-detect everything
1376              host, port = get_server_url_and_transport()
1377              if args.transport:
1378                  transport_type = args.transport
1379                  if transport_type == "sse":
1380                      server_url = f"http://{host}:{port}/sse"
1381                  else:  # streamable-http
1382                      server_url = f"http://{host}:{port}/mcp/"
1383              else:
1384                  server_url, transport_type = await detect_server_transport(
1385                      host, port, quiet=args.quiet
1386                  )
1387  
1388          command = None
1389          stdio_args = None
1390  
1391      quiet_mode = args.quiet
1392  
1393      try:
1394          # Get the active toolset from the running server
1395          current_tools = await get_mcp_server_tools(
1396              server_url, transport_type, quiet=quiet_mode, command=command, args=stdio_args
1397          )
1398  
1399          if not current_tools or "tools" not in current_tools or not current_tools["tools"]:
1400              console.print("[bold yellow]No tools found on the server.[/bold yellow]")
1401              return
1402  
1403          if args.no_all_tools:
1404              # If we're not doing the comparison, create a meaningful subset for comparison
1405              if not quiet_mode:
1406                  console.print("[yellow]Skipping comparison with full --load-all-tools[/yellow]")
1407                  console.print(
1408                      "[green]Creating an artificial subset of current tools for comparison[/green]"
1409                  )
1410  
1411              # Create a more meaningful subset by taking half the tools
1412              # If we have 1-4 tools, use all of them to avoid empty subset
1413              total_tools = len(current_tools["tools"])
1414              subset_size = max(total_tools // 2, min(total_tools, 4))
1415              subset_tools = current_tools["tools"][:subset_size]
1416  
1417              if not quiet_mode:
1418                  console.print(
1419                      f"[green]Created subset with {subset_size} tools out of {total_tools} total[/green]"
1420                  )
1421  
1422              # Create subset version
1423              subset_data = {
1424                  "tools": subset_tools,
1425                  "server_name": current_tools["server_name"] + " (Subset)",
1426                  "server_version": current_tools["server_version"],
1427                  "server_instructions": current_tools["server_instructions"],
1428              }
1429  
1430              # Analyze token usage with the artificial subset vs full
1431              analyze_tools_token_usage(subset_data, current_tools, quiet=quiet_mode)
1432          else:
1433              # Get the complete toolset that would be available with --load-all-tools
1434              all_tools = await get_complete_toolset(quiet=quiet_mode)
1435  
1436              # Check if current server is likely already running with all tools
1437              current_tool_count = len(current_tools["tools"])
1438              all_tool_count = len(all_tools["tools"])
1439  
1440              if abs(current_tool_count - all_tool_count) <= 2:  # Allow small difference
1441                  if not quiet_mode:
1442                      console.print(
1443                          f"[yellow]Warning: Current server has {current_tool_count} tools, "
1444                          f"which is very close to the expected all-tools count of {all_tool_count}[/yellow]"
1445                      )
1446                      console.print(
1447                          "[yellow]This suggests the server is already running with --load-all-tools[/yellow]"
1448                      )
1449  
1450                  # For accurate comparison when counts are the same, we should just use the same data for both
1451                  # to ensure metrics are consistent
1452                  same_tools_data = {  # noqa: F841
1453                      "tools": current_tools["tools"].copy(),
1454                      "server_name": "Current Server",
1455                      "server_version": current_tools["server_version"],
1456                      "server_instructions": current_tools["server_instructions"],
1457                  }
1458  
1459                  # Create a deep copy to ensure they're exactly the same
1460                  all_tools = {
1461                      "tools": current_tools["tools"].copy(),
1462                      "server_name": "All Tools",
1463                      "server_version": current_tools["server_version"],
1464                      "server_instructions": current_tools["server_instructions"],
1465                  }
1466  
1467              # Analyze token usage with full prompt simulation
1468              analyze_tools_token_usage(current_tools, all_tools, quiet=quiet_mode)
1469      except KeyboardInterrupt:
1470          console.print("[bold yellow]Operation cancelled by user[/bold yellow]")
1471      except Exception as e:
1472          console.print(f"[bold red]Unexpected error:[/bold red] {str(e)}")
1473          if not quiet_mode:
1474              console.print(traceback.format_exc())
1475  
1476  
1477  if __name__ == "__main__":
1478      asyncio.run(main())