mixture_of_agents_tool.py
1 #!/usr/bin/env python3 2 """ 3 Mixture-of-Agents Tool Module 4 5 This module implements the Mixture-of-Agents (MoA) methodology that leverages 6 the collective strengths of multiple LLMs through a layered architecture to 7 achieve state-of-the-art performance on complex reasoning tasks. 8 9 Based on the research paper: "Mixture-of-Agents Enhances Large Language Model Capabilities" 10 by Junlin Wang et al. (arXiv:2406.04692v1) 11 12 Key Features: 13 - Multi-layer LLM collaboration for enhanced reasoning 14 - Parallel processing of reference models for efficiency 15 - Intelligent aggregation and synthesis of diverse responses 16 - Specialized for extremely difficult problems requiring intense reasoning 17 - Optimized for coding, mathematics, and complex analytical tasks 18 19 Available Tool: 20 - mixture_of_agents_tool: Process complex queries using multiple frontier models 21 22 Architecture: 23 1. Reference models generate diverse initial responses in parallel 24 2. Aggregator model synthesizes responses into a high-quality output 25 3. Multiple layers can be used for iterative refinement (future enhancement) 26 27 Models Used (via OpenRouter): 28 - Reference Models: claude-opus-4.6, gemini-3-pro-preview, gpt-5.4-pro, deepseek-v3.2 29 - Aggregator Model: claude-opus-4.6 (highest capability for synthesis) 30 31 Configuration: 32 To customize the MoA setup, modify the configuration constants at the top of this file: 33 - REFERENCE_MODELS: List of models for generating diverse initial responses 34 - AGGREGATOR_MODEL: Model used to synthesize the final response 35 - REFERENCE_TEMPERATURE/AGGREGATOR_TEMPERATURE: Sampling temperatures 36 - MIN_SUCCESSFUL_REFERENCES: Minimum successful models needed to proceed 37 38 Usage: 39 from mixture_of_agents_tool import mixture_of_agents_tool 40 import asyncio 41 42 # Process a complex query 43 result = await mixture_of_agents_tool( 44 user_prompt="Solve this complex mathematical proof..." 45 ) 46 """ 47 48 import json 49 import logging 50 import os 51 import asyncio 52 import datetime 53 from typing import Dict, Any, List, Optional 54 from tools.openrouter_client import get_async_client as _get_openrouter_client, check_api_key as check_openrouter_api_key 55 from agent.auxiliary_client import extract_content_or_reasoning 56 from tools.debug_helpers import DebugSession 57 58 logger = logging.getLogger(__name__) 59 60 # Configuration for MoA processing 61 # Reference models - these generate diverse initial responses in parallel. 62 # Keep this list aligned with current top-tier OpenRouter frontier options. 63 REFERENCE_MODELS = [ 64 "anthropic/claude-opus-4.6", 65 "google/gemini-2.5-pro", 66 "openai/gpt-5.4-pro", 67 "deepseek/deepseek-v3.2", 68 ] 69 70 # Aggregator model - synthesizes reference responses into final output. 71 # Prefer the strongest synthesis model in the current OpenRouter lineup. 72 AGGREGATOR_MODEL = "anthropic/claude-opus-4.6" 73 74 # Temperature settings optimized for MoA performance 75 REFERENCE_TEMPERATURE = 0.6 # Balanced creativity for diverse perspectives 76 AGGREGATOR_TEMPERATURE = 0.4 # Focused synthesis for consistency 77 78 # Failure handling configuration 79 MIN_SUCCESSFUL_REFERENCES = 1 # Minimum successful reference models needed to proceed 80 81 # System prompt for the aggregator model (from the research paper) 82 AGGREGATOR_SYSTEM_PROMPT = """You have been provided with a set of responses from various open-source models to the latest user query. Your task is to synthesize these responses into a single, high-quality response. It is crucial to critically evaluate the information provided in these responses, recognizing that some of it may be biased or incorrect. Your response should not simply replicate the given answers but should offer a refined, accurate, and comprehensive reply to the instruction. Ensure your response is well-structured, coherent, and adheres to the highest standards of accuracy and reliability. 83 84 Responses from models:""" 85 86 _debug = DebugSession("moa_tools", env_var="MOA_TOOLS_DEBUG") 87 88 89 def _construct_aggregator_prompt(system_prompt: str, responses: List[str]) -> str: 90 """ 91 Construct the final system prompt for the aggregator including all model responses. 92 93 Args: 94 system_prompt (str): Base system prompt for aggregation 95 responses (List[str]): List of responses from reference models 96 97 Returns: 98 str: Complete system prompt with enumerated responses 99 """ 100 response_text = "\n".join([f"{i+1}. {response}" for i, response in enumerate(responses)]) 101 return f"{system_prompt}\n\n{response_text}" 102 103 104 async def _run_reference_model_safe( 105 model: str, 106 user_prompt: str, 107 temperature: float = REFERENCE_TEMPERATURE, 108 max_tokens: int = 32000, 109 max_retries: int = 6 110 ) -> tuple[str, str, bool]: 111 """ 112 Run a single reference model with retry logic and graceful failure handling. 113 114 Args: 115 model (str): Model identifier to use 116 user_prompt (str): The user's query 117 temperature (float): Sampling temperature for response generation 118 max_tokens (int): Maximum tokens in response 119 max_retries (int): Maximum number of retry attempts 120 121 Returns: 122 tuple[str, str, bool]: (model_name, response_content_or_error, success_flag) 123 """ 124 for attempt in range(max_retries): 125 try: 126 logger.info("Querying %s (attempt %s/%s)", model, attempt + 1, max_retries) 127 128 # Build parameters for the API call 129 api_params = { 130 "model": model, 131 "messages": [{"role": "user", "content": user_prompt}], 132 "max_tokens": max_tokens, 133 "extra_body": { 134 "reasoning": { 135 "enabled": True, 136 "effort": "xhigh" 137 } 138 } 139 } 140 141 # GPT models (especially gpt-4o-mini) don't support custom temperature values 142 # Only include temperature for non-GPT models 143 if not model.lower().startswith('gpt-'): 144 api_params["temperature"] = temperature 145 146 response = await _get_openrouter_client().chat.completions.create(**api_params) 147 148 content = extract_content_or_reasoning(response) 149 if not content: 150 # Reasoning-only response ā let the retry loop handle it 151 logger.warning("%s returned empty content (attempt %s/%s), retrying", model, attempt + 1, max_retries) 152 if attempt < max_retries - 1: 153 await asyncio.sleep(min(2 ** (attempt + 1), 60)) 154 continue 155 logger.info("%s responded (%s characters)", model, len(content)) 156 return model, content, True 157 158 except Exception as e: 159 error_str = str(e) 160 # Keep retry-path logging concise; full tracebacks are reserved for 161 # terminal failure paths so long-running MoA retries don't flood logs. 162 if "invalid" in error_str.lower(): 163 logger.warning("%s invalid request error (attempt %s): %s", model, attempt + 1, error_str) 164 elif "rate" in error_str.lower() or "limit" in error_str.lower(): 165 logger.warning("%s rate limit error (attempt %s): %s", model, attempt + 1, error_str) 166 else: 167 logger.warning("%s unknown error (attempt %s): %s", model, attempt + 1, error_str) 168 169 if attempt < max_retries - 1: 170 # Exponential backoff for rate limiting: 2s, 4s, 8s, 16s, 32s, 60s 171 sleep_time = min(2 ** (attempt + 1), 60) 172 logger.info("Retrying in %ss...", sleep_time) 173 await asyncio.sleep(sleep_time) 174 else: 175 error_msg = f"{model} failed after {max_retries} attempts: {error_str}" 176 logger.error("%s", error_msg, exc_info=True) 177 return model, error_msg, False 178 179 180 async def _run_aggregator_model( 181 system_prompt: str, 182 user_prompt: str, 183 temperature: float = AGGREGATOR_TEMPERATURE, 184 max_tokens: int = None 185 ) -> str: 186 """ 187 Run the aggregator model to synthesize the final response. 188 189 Args: 190 system_prompt (str): System prompt with all reference responses 191 user_prompt (str): Original user query 192 temperature (float): Focused temperature for consistent aggregation 193 max_tokens (int): Maximum tokens in final response 194 195 Returns: 196 str: Synthesized final response 197 """ 198 logger.info("Running aggregator model: %s", AGGREGATOR_MODEL) 199 200 # Build parameters for the API call 201 api_params = { 202 "model": AGGREGATOR_MODEL, 203 "messages": [ 204 {"role": "system", "content": system_prompt}, 205 {"role": "user", "content": user_prompt} 206 ], 207 "max_tokens": max_tokens, 208 "extra_body": { 209 "reasoning": { 210 "enabled": True, 211 "effort": "xhigh" 212 } 213 } 214 } 215 216 # GPT models (especially gpt-4o-mini) don't support custom temperature values 217 # Only include temperature for non-GPT models 218 if not AGGREGATOR_MODEL.lower().startswith('gpt-'): 219 api_params["temperature"] = temperature 220 221 response = await _get_openrouter_client().chat.completions.create(**api_params) 222 223 content = extract_content_or_reasoning(response) 224 225 # Retry once on empty content (reasoning-only response) 226 if not content: 227 logger.warning("Aggregator returned empty content, retrying once") 228 response = await _get_openrouter_client().chat.completions.create(**api_params) 229 content = extract_content_or_reasoning(response) 230 231 logger.info("Aggregation complete (%s characters)", len(content)) 232 return content 233 234 235 async def mixture_of_agents_tool( 236 user_prompt: str, 237 reference_models: Optional[List[str]] = None, 238 aggregator_model: Optional[str] = None 239 ) -> str: 240 """ 241 Process a complex query using the Mixture-of-Agents methodology. 242 243 This tool leverages multiple frontier language models to collaboratively solve 244 extremely difficult problems requiring intense reasoning. It's particularly 245 effective for: 246 - Complex mathematical proofs and calculations 247 - Advanced coding problems and algorithm design 248 - Multi-step analytical reasoning tasks 249 - Problems requiring diverse domain expertise 250 - Tasks where single models show limitations 251 252 The MoA approach uses a fixed 2-layer architecture: 253 1. Layer 1: Multiple reference models generate diverse responses in parallel (temp=0.6) 254 2. Layer 2: Aggregator model synthesizes the best elements into final response (temp=0.4) 255 256 Args: 257 user_prompt (str): The complex query or problem to solve 258 reference_models (Optional[List[str]]): Custom reference models to use 259 aggregator_model (Optional[str]): Custom aggregator model to use 260 261 Returns: 262 str: JSON string containing the MoA results with the following structure: 263 { 264 "success": bool, 265 "response": str, 266 "models_used": { 267 "reference_models": List[str], 268 "aggregator_model": str 269 }, 270 "processing_time": float 271 } 272 273 Raises: 274 Exception: If MoA processing fails or API key is not set 275 """ 276 start_time = datetime.datetime.now() 277 278 debug_call_data = { 279 "parameters": { 280 "user_prompt": user_prompt[:200] + "..." if len(user_prompt) > 200 else user_prompt, 281 "reference_models": reference_models or REFERENCE_MODELS, 282 "aggregator_model": aggregator_model or AGGREGATOR_MODEL, 283 "reference_temperature": REFERENCE_TEMPERATURE, 284 "aggregator_temperature": AGGREGATOR_TEMPERATURE, 285 "min_successful_references": MIN_SUCCESSFUL_REFERENCES 286 }, 287 "error": None, 288 "success": False, 289 "reference_responses_count": 0, 290 "failed_models_count": 0, 291 "failed_models": [], 292 "final_response_length": 0, 293 "processing_time_seconds": 0, 294 "models_used": {} 295 } 296 297 try: 298 logger.info("Starting Mixture-of-Agents processing...") 299 logger.info("Query: %s", user_prompt[:100]) 300 301 # Validate API key availability 302 if not os.getenv("OPENROUTER_API_KEY"): 303 raise ValueError("OPENROUTER_API_KEY environment variable not set") 304 305 # Use provided models or defaults 306 ref_models = reference_models or REFERENCE_MODELS 307 agg_model = aggregator_model or AGGREGATOR_MODEL 308 309 logger.info("Using %s reference models in 2-layer MoA architecture", len(ref_models)) 310 311 # Layer 1: Generate diverse responses from reference models (with failure handling) 312 logger.info("Layer 1: Generating reference responses...") 313 model_results = await asyncio.gather(*[ 314 _run_reference_model_safe(model, user_prompt, REFERENCE_TEMPERATURE) 315 for model in ref_models 316 ]) 317 318 # Separate successful and failed responses 319 successful_responses = [] 320 failed_models = [] 321 322 for model_name, content, success in model_results: 323 if success: 324 successful_responses.append(content) 325 else: 326 failed_models.append(model_name) 327 328 successful_count = len(successful_responses) 329 failed_count = len(failed_models) 330 331 logger.info("Reference model results: %s successful, %s failed", successful_count, failed_count) 332 333 if failed_models: 334 logger.warning("Failed models: %s", ', '.join(failed_models)) 335 336 # Check if we have enough successful responses to proceed 337 if successful_count < MIN_SUCCESSFUL_REFERENCES: 338 raise ValueError(f"Insufficient successful reference models ({successful_count}/{len(ref_models)}). Need at least {MIN_SUCCESSFUL_REFERENCES} successful responses.") 339 340 debug_call_data["reference_responses_count"] = successful_count 341 debug_call_data["failed_models_count"] = failed_count 342 debug_call_data["failed_models"] = failed_models 343 344 # Layer 2: Aggregate responses using the aggregator model 345 logger.info("Layer 2: Synthesizing final response...") 346 aggregator_system_prompt = _construct_aggregator_prompt( 347 AGGREGATOR_SYSTEM_PROMPT, 348 successful_responses 349 ) 350 351 final_response = await _run_aggregator_model( 352 aggregator_system_prompt, 353 user_prompt, 354 AGGREGATOR_TEMPERATURE 355 ) 356 357 # Calculate processing time 358 end_time = datetime.datetime.now() 359 processing_time = (end_time - start_time).total_seconds() 360 361 logger.info("MoA processing completed in %.2f seconds", processing_time) 362 363 # Prepare successful response (only final aggregated result, minimal fields) 364 result = { 365 "success": True, 366 "response": final_response, 367 "models_used": { 368 "reference_models": ref_models, 369 "aggregator_model": agg_model 370 } 371 } 372 373 debug_call_data["success"] = True 374 debug_call_data["final_response_length"] = len(final_response) 375 debug_call_data["processing_time_seconds"] = processing_time 376 debug_call_data["models_used"] = result["models_used"] 377 378 # Log debug information 379 _debug.log_call("mixture_of_agents_tool", debug_call_data) 380 _debug.save() 381 382 return json.dumps(result, indent=2, ensure_ascii=False) 383 384 except Exception as e: 385 error_msg = f"Error in MoA processing: {str(e)}" 386 logger.error("%s", error_msg, exc_info=True) 387 388 # Calculate processing time even for errors 389 end_time = datetime.datetime.now() 390 processing_time = (end_time - start_time).total_seconds() 391 392 # Prepare error response (minimal fields) 393 result = { 394 "success": False, 395 "response": "MoA processing failed. Please try again or use a single model for this query.", 396 "models_used": { 397 "reference_models": reference_models or REFERENCE_MODELS, 398 "aggregator_model": aggregator_model or AGGREGATOR_MODEL 399 }, 400 "error": error_msg 401 } 402 403 debug_call_data["error"] = error_msg 404 debug_call_data["processing_time_seconds"] = processing_time 405 _debug.log_call("mixture_of_agents_tool", debug_call_data) 406 _debug.save() 407 408 return json.dumps(result, indent=2, ensure_ascii=False) 409 410 411 def check_moa_requirements() -> bool: 412 """ 413 Check if all requirements for MoA tools are met. 414 415 Returns: 416 bool: True if requirements are met, False otherwise 417 """ 418 return check_openrouter_api_key() 419 420 421 422 def get_moa_configuration() -> Dict[str, Any]: 423 """ 424 Get the current MoA configuration settings. 425 426 Returns: 427 Dict[str, Any]: Dictionary containing all configuration parameters 428 """ 429 return { 430 "reference_models": REFERENCE_MODELS, 431 "aggregator_model": AGGREGATOR_MODEL, 432 "reference_temperature": REFERENCE_TEMPERATURE, 433 "aggregator_temperature": AGGREGATOR_TEMPERATURE, 434 "min_successful_references": MIN_SUCCESSFUL_REFERENCES, 435 "total_reference_models": len(REFERENCE_MODELS), 436 "failure_tolerance": f"{len(REFERENCE_MODELS) - MIN_SUCCESSFUL_REFERENCES}/{len(REFERENCE_MODELS)} models can fail" 437 } 438 439 440 if __name__ == "__main__": 441 """ 442 Simple test/demo when run directly 443 """ 444 print("š¤ Mixture-of-Agents Tool Module") 445 print("=" * 50) 446 447 # Check if API key is available 448 api_available = check_openrouter_api_key() 449 450 if not api_available: 451 print("ā OPENROUTER_API_KEY environment variable not set") 452 print("Please set your API key: export OPENROUTER_API_KEY='your-key-here'") 453 print("Get API key at: https://openrouter.ai/") 454 exit(1) 455 else: 456 print("ā OpenRouter API key found") 457 458 print("š ļø MoA tools ready for use!") 459 460 # Show current configuration 461 config = get_moa_configuration() 462 print("\nāļø Current Configuration:") 463 print(f" š¤ Reference models ({len(config['reference_models'])}): {', '.join(config['reference_models'])}") 464 print(f" š§ Aggregator model: {config['aggregator_model']}") 465 print(f" š”ļø Reference temperature: {config['reference_temperature']}") 466 print(f" š”ļø Aggregator temperature: {config['aggregator_temperature']}") 467 print(f" š”ļø Failure tolerance: {config['failure_tolerance']}") 468 print(f" š Minimum successful models: {config['min_successful_references']}") 469 470 # Show debug mode status 471 if _debug.active: 472 print(f"\nš Debug mode ENABLED - Session ID: {_debug.session_id}") 473 print(f" Debug logs will be saved to: ./logs/moa_tools_debug_{_debug.session_id}.json") 474 else: 475 print("\nš Debug mode disabled (set MOA_TOOLS_DEBUG=true to enable)") 476 477 print("\nBasic usage:") 478 print(" from mixture_of_agents_tool import mixture_of_agents_tool") 479 print(" import asyncio") 480 print("") 481 print(" async def main():") 482 print(" result = await mixture_of_agents_tool(") 483 print(" user_prompt='Solve this complex mathematical proof...'") 484 print(" )") 485 print(" print(result)") 486 print(" asyncio.run(main())") 487 488 print("\nBest use cases:") 489 print(" - Complex mathematical proofs and calculations") 490 print(" - Advanced coding problems and algorithm design") 491 print(" - Multi-step analytical reasoning tasks") 492 print(" - Problems requiring diverse domain expertise") 493 print(" - Tasks where single models show limitations") 494 495 print("\nPerformance characteristics:") 496 print(" - Higher latency due to multiple model calls") 497 print(" - Significantly improved quality for complex tasks") 498 print(" - Parallel processing for efficiency") 499 print(f" - Optimized temperatures: {REFERENCE_TEMPERATURE} for reference models, {AGGREGATOR_TEMPERATURE} for aggregation") 500 print(" - Token-efficient: only returns final aggregated response") 501 print(" - Resilient: continues with partial model failures") 502 print(" - Configurable: easy to modify models and settings at top of file") 503 print(" - State-of-the-art results on challenging benchmarks") 504 505 print("\nDebug mode:") 506 print(" # Enable debug logging") 507 print(" export MOA_TOOLS_DEBUG=true") 508 print(" # Debug logs capture all MoA processing steps and metrics") 509 print(" # Logs saved to: ./logs/moa_tools_debug_UUID.json") 510 511 512 # --------------------------------------------------------------------------- 513 # Registry 514 # --------------------------------------------------------------------------- 515 from tools.registry import registry 516 517 MOA_SCHEMA = { 518 "name": "mixture_of_agents", 519 "description": "Route a hard problem through multiple frontier LLMs collaboratively. Makes 5 API calls (4 reference models + 1 aggregator) with maximum reasoning effort ā use sparingly for genuinely difficult problems. Best for: complex math, advanced algorithms, multi-step analytical reasoning, problems benefiting from diverse perspectives.", 520 "parameters": { 521 "type": "object", 522 "properties": { 523 "user_prompt": { 524 "type": "string", 525 "description": "The complex query or problem to solve using multiple AI models. Should be a challenging problem that benefits from diverse perspectives and collaborative reasoning." 526 } 527 }, 528 "required": ["user_prompt"] 529 } 530 } 531 532 registry.register( 533 name="mixture_of_agents", 534 toolset="moa", 535 schema=MOA_SCHEMA, 536 handler=lambda args, **kw: mixture_of_agents_tool(user_prompt=args.get("user_prompt", "")), 537 check_fn=check_moa_requirements, 538 requires_env=["OPENROUTER_API_KEY"], 539 is_async=True, 540 emoji="š§ ", 541 )