/ tools / mixture_of_agents_tool.py
mixture_of_agents_tool.py
  1  #!/usr/bin/env python3
  2  """
  3  Mixture-of-Agents Tool Module
  4  
  5  This module implements the Mixture-of-Agents (MoA) methodology that leverages
  6  the collective strengths of multiple LLMs through a layered architecture to
  7  achieve state-of-the-art performance on complex reasoning tasks.
  8  
  9  Based on the research paper: "Mixture-of-Agents Enhances Large Language Model Capabilities"
 10  by Junlin Wang et al. (arXiv:2406.04692v1)
 11  
 12  Key Features:
 13  - Multi-layer LLM collaboration for enhanced reasoning
 14  - Parallel processing of reference models for efficiency
 15  - Intelligent aggregation and synthesis of diverse responses
 16  - Specialized for extremely difficult problems requiring intense reasoning
 17  - Optimized for coding, mathematics, and complex analytical tasks
 18  
 19  Available Tool:
 20  - mixture_of_agents_tool: Process complex queries using multiple frontier models
 21  
 22  Architecture:
 23  1. Reference models generate diverse initial responses in parallel
 24  2. Aggregator model synthesizes responses into a high-quality output
 25  3. Multiple layers can be used for iterative refinement (future enhancement)
 26  
 27  Models Used (via OpenRouter):
 28  - Reference Models: claude-opus-4.6, gemini-3-pro-preview, gpt-5.4-pro, deepseek-v3.2
 29  - Aggregator Model: claude-opus-4.6 (highest capability for synthesis)
 30  
 31  Configuration:
 32      To customize the MoA setup, modify the configuration constants at the top of this file:
 33      - REFERENCE_MODELS: List of models for generating diverse initial responses
 34      - AGGREGATOR_MODEL: Model used to synthesize the final response
 35      - REFERENCE_TEMPERATURE/AGGREGATOR_TEMPERATURE: Sampling temperatures
 36      - MIN_SUCCESSFUL_REFERENCES: Minimum successful models needed to proceed
 37  
 38  Usage:
 39      from mixture_of_agents_tool import mixture_of_agents_tool
 40      import asyncio
 41      
 42      # Process a complex query
 43      result = await mixture_of_agents_tool(
 44          user_prompt="Solve this complex mathematical proof..."
 45      )
 46  """
 47  
 48  import json
 49  import logging
 50  import os
 51  import asyncio
 52  import datetime
 53  from typing import Dict, Any, List, Optional
 54  from tools.openrouter_client import get_async_client as _get_openrouter_client, check_api_key as check_openrouter_api_key
 55  from agent.auxiliary_client import extract_content_or_reasoning
 56  from tools.debug_helpers import DebugSession
 57  
 58  logger = logging.getLogger(__name__)
 59  
 60  # Configuration for MoA processing
 61  # Reference models - these generate diverse initial responses in parallel.
 62  # Keep this list aligned with current top-tier OpenRouter frontier options.
 63  REFERENCE_MODELS = [
 64      "anthropic/claude-opus-4.6",
 65      "google/gemini-2.5-pro",
 66      "openai/gpt-5.4-pro",
 67      "deepseek/deepseek-v3.2",
 68  ]
 69  
 70  # Aggregator model - synthesizes reference responses into final output.
 71  # Prefer the strongest synthesis model in the current OpenRouter lineup.
 72  AGGREGATOR_MODEL = "anthropic/claude-opus-4.6"
 73  
 74  # Temperature settings optimized for MoA performance
 75  REFERENCE_TEMPERATURE = 0.6  # Balanced creativity for diverse perspectives
 76  AGGREGATOR_TEMPERATURE = 0.4  # Focused synthesis for consistency
 77  
 78  # Failure handling configuration
 79  MIN_SUCCESSFUL_REFERENCES = 1  # Minimum successful reference models needed to proceed
 80  
 81  # System prompt for the aggregator model (from the research paper)
 82  AGGREGATOR_SYSTEM_PROMPT = """You have been provided with a set of responses from various open-source models to the latest user query. Your task is to synthesize these responses into a single, high-quality response. It is crucial to critically evaluate the information provided in these responses, recognizing that some of it may be biased or incorrect. Your response should not simply replicate the given answers but should offer a refined, accurate, and comprehensive reply to the instruction. Ensure your response is well-structured, coherent, and adheres to the highest standards of accuracy and reliability.
 83  
 84  Responses from models:"""
 85  
 86  _debug = DebugSession("moa_tools", env_var="MOA_TOOLS_DEBUG")
 87  
 88  
 89  def _construct_aggregator_prompt(system_prompt: str, responses: List[str]) -> str:
 90      """
 91      Construct the final system prompt for the aggregator including all model responses.
 92      
 93      Args:
 94          system_prompt (str): Base system prompt for aggregation
 95          responses (List[str]): List of responses from reference models
 96          
 97      Returns:
 98          str: Complete system prompt with enumerated responses
 99      """
100      response_text = "\n".join([f"{i+1}. {response}" for i, response in enumerate(responses)])
101      return f"{system_prompt}\n\n{response_text}"
102  
103  
104  async def _run_reference_model_safe(
105      model: str,
106      user_prompt: str,
107      temperature: float = REFERENCE_TEMPERATURE,
108      max_tokens: int = 32000,
109      max_retries: int = 6
110  ) -> tuple[str, str, bool]:
111      """
112      Run a single reference model with retry logic and graceful failure handling.
113      
114      Args:
115          model (str): Model identifier to use
116          user_prompt (str): The user's query
117          temperature (float): Sampling temperature for response generation
118          max_tokens (int): Maximum tokens in response
119          max_retries (int): Maximum number of retry attempts
120          
121      Returns:
122          tuple[str, str, bool]: (model_name, response_content_or_error, success_flag)
123      """
124      for attempt in range(max_retries):
125          try:
126              logger.info("Querying %s (attempt %s/%s)", model, attempt + 1, max_retries)
127              
128              # Build parameters for the API call
129              api_params = {
130                  "model": model,
131                  "messages": [{"role": "user", "content": user_prompt}],
132                  "max_tokens": max_tokens,
133                  "extra_body": {
134                      "reasoning": {
135                          "enabled": True,
136                          "effort": "xhigh"
137                      }
138                  }
139              }
140              
141              # GPT models (especially gpt-4o-mini) don't support custom temperature values
142              # Only include temperature for non-GPT models
143              if not model.lower().startswith('gpt-'):
144                  api_params["temperature"] = temperature
145              
146              response = await _get_openrouter_client().chat.completions.create(**api_params)
147              
148              content = extract_content_or_reasoning(response)
149              if not content:
150                  # Reasoning-only response — let the retry loop handle it
151                  logger.warning("%s returned empty content (attempt %s/%s), retrying", model, attempt + 1, max_retries)
152                  if attempt < max_retries - 1:
153                      await asyncio.sleep(min(2 ** (attempt + 1), 60))
154                      continue
155              logger.info("%s responded (%s characters)", model, len(content))
156              return model, content, True
157              
158          except Exception as e:
159              error_str = str(e)
160              # Keep retry-path logging concise; full tracebacks are reserved for
161              # terminal failure paths so long-running MoA retries don't flood logs.
162              if "invalid" in error_str.lower():
163                  logger.warning("%s invalid request error (attempt %s): %s", model, attempt + 1, error_str)
164              elif "rate" in error_str.lower() or "limit" in error_str.lower():
165                  logger.warning("%s rate limit error (attempt %s): %s", model, attempt + 1, error_str)
166              else:
167                  logger.warning("%s unknown error (attempt %s): %s", model, attempt + 1, error_str)
168  
169              if attempt < max_retries - 1:
170                  # Exponential backoff for rate limiting: 2s, 4s, 8s, 16s, 32s, 60s
171                  sleep_time = min(2 ** (attempt + 1), 60)
172                  logger.info("Retrying in %ss...", sleep_time)
173                  await asyncio.sleep(sleep_time)
174              else:
175                  error_msg = f"{model} failed after {max_retries} attempts: {error_str}"
176                  logger.error("%s", error_msg, exc_info=True)
177                  return model, error_msg, False
178  
179  
180  async def _run_aggregator_model(
181      system_prompt: str,
182      user_prompt: str,
183      temperature: float = AGGREGATOR_TEMPERATURE,
184      max_tokens: int = None
185  ) -> str:
186      """
187      Run the aggregator model to synthesize the final response.
188      
189      Args:
190          system_prompt (str): System prompt with all reference responses
191          user_prompt (str): Original user query
192          temperature (float): Focused temperature for consistent aggregation
193          max_tokens (int): Maximum tokens in final response
194          
195      Returns:
196          str: Synthesized final response
197      """
198      logger.info("Running aggregator model: %s", AGGREGATOR_MODEL)
199  
200      # Build parameters for the API call
201      api_params = {
202          "model": AGGREGATOR_MODEL,
203          "messages": [
204              {"role": "system", "content": system_prompt},
205              {"role": "user", "content": user_prompt}
206          ],
207          "max_tokens": max_tokens,
208          "extra_body": {
209              "reasoning": {
210                  "enabled": True,
211                  "effort": "xhigh"
212              }
213          }
214      }
215  
216      # GPT models (especially gpt-4o-mini) don't support custom temperature values
217      # Only include temperature for non-GPT models
218      if not AGGREGATOR_MODEL.lower().startswith('gpt-'):
219          api_params["temperature"] = temperature
220  
221      response = await _get_openrouter_client().chat.completions.create(**api_params)
222  
223      content = extract_content_or_reasoning(response)
224  
225      # Retry once on empty content (reasoning-only response)
226      if not content:
227          logger.warning("Aggregator returned empty content, retrying once")
228          response = await _get_openrouter_client().chat.completions.create(**api_params)
229          content = extract_content_or_reasoning(response)
230  
231      logger.info("Aggregation complete (%s characters)", len(content))
232      return content
233  
234  
235  async def mixture_of_agents_tool(
236      user_prompt: str,
237      reference_models: Optional[List[str]] = None,
238      aggregator_model: Optional[str] = None
239  ) -> str:
240      """
241      Process a complex query using the Mixture-of-Agents methodology.
242      
243      This tool leverages multiple frontier language models to collaboratively solve
244      extremely difficult problems requiring intense reasoning. It's particularly
245      effective for:
246      - Complex mathematical proofs and calculations
247      - Advanced coding problems and algorithm design
248      - Multi-step analytical reasoning tasks
249      - Problems requiring diverse domain expertise
250      - Tasks where single models show limitations
251      
252      The MoA approach uses a fixed 2-layer architecture:
253      1. Layer 1: Multiple reference models generate diverse responses in parallel (temp=0.6)
254      2. Layer 2: Aggregator model synthesizes the best elements into final response (temp=0.4)
255      
256      Args:
257          user_prompt (str): The complex query or problem to solve
258          reference_models (Optional[List[str]]): Custom reference models to use
259          aggregator_model (Optional[str]): Custom aggregator model to use
260      
261      Returns:
262          str: JSON string containing the MoA results with the following structure:
263               {
264                   "success": bool,
265                   "response": str,
266                   "models_used": {
267                       "reference_models": List[str],
268                       "aggregator_model": str
269                   },
270                   "processing_time": float
271               }
272      
273      Raises:
274          Exception: If MoA processing fails or API key is not set
275      """
276      start_time = datetime.datetime.now()
277      
278      debug_call_data = {
279          "parameters": {
280              "user_prompt": user_prompt[:200] + "..." if len(user_prompt) > 200 else user_prompt,
281              "reference_models": reference_models or REFERENCE_MODELS,
282              "aggregator_model": aggregator_model or AGGREGATOR_MODEL,
283              "reference_temperature": REFERENCE_TEMPERATURE,
284              "aggregator_temperature": AGGREGATOR_TEMPERATURE,
285              "min_successful_references": MIN_SUCCESSFUL_REFERENCES
286          },
287          "error": None,
288          "success": False,
289          "reference_responses_count": 0,
290          "failed_models_count": 0,
291          "failed_models": [],
292          "final_response_length": 0,
293          "processing_time_seconds": 0,
294          "models_used": {}
295      }
296      
297      try:
298          logger.info("Starting Mixture-of-Agents processing...")
299          logger.info("Query: %s", user_prompt[:100])
300          
301          # Validate API key availability
302          if not os.getenv("OPENROUTER_API_KEY"):
303              raise ValueError("OPENROUTER_API_KEY environment variable not set")
304          
305          # Use provided models or defaults
306          ref_models = reference_models or REFERENCE_MODELS
307          agg_model = aggregator_model or AGGREGATOR_MODEL
308          
309          logger.info("Using %s reference models in 2-layer MoA architecture", len(ref_models))
310          
311          # Layer 1: Generate diverse responses from reference models (with failure handling)
312          logger.info("Layer 1: Generating reference responses...")
313          model_results = await asyncio.gather(*[
314              _run_reference_model_safe(model, user_prompt, REFERENCE_TEMPERATURE)
315              for model in ref_models
316          ])
317          
318          # Separate successful and failed responses
319          successful_responses = []
320          failed_models = []
321          
322          for model_name, content, success in model_results:
323              if success:
324                  successful_responses.append(content)
325              else:
326                  failed_models.append(model_name)
327          
328          successful_count = len(successful_responses)
329          failed_count = len(failed_models)
330          
331          logger.info("Reference model results: %s successful, %s failed", successful_count, failed_count)
332          
333          if failed_models:
334              logger.warning("Failed models: %s", ', '.join(failed_models))
335          
336          # Check if we have enough successful responses to proceed
337          if successful_count < MIN_SUCCESSFUL_REFERENCES:
338              raise ValueError(f"Insufficient successful reference models ({successful_count}/{len(ref_models)}). Need at least {MIN_SUCCESSFUL_REFERENCES} successful responses.")
339          
340          debug_call_data["reference_responses_count"] = successful_count
341          debug_call_data["failed_models_count"] = failed_count
342          debug_call_data["failed_models"] = failed_models
343          
344          # Layer 2: Aggregate responses using the aggregator model
345          logger.info("Layer 2: Synthesizing final response...")
346          aggregator_system_prompt = _construct_aggregator_prompt(
347              AGGREGATOR_SYSTEM_PROMPT, 
348              successful_responses
349          )
350          
351          final_response = await _run_aggregator_model(
352              aggregator_system_prompt,
353              user_prompt,
354              AGGREGATOR_TEMPERATURE
355          )
356          
357          # Calculate processing time
358          end_time = datetime.datetime.now()
359          processing_time = (end_time - start_time).total_seconds()
360          
361          logger.info("MoA processing completed in %.2f seconds", processing_time)
362          
363          # Prepare successful response (only final aggregated result, minimal fields)
364          result = {
365              "success": True,
366              "response": final_response,
367              "models_used": {
368                  "reference_models": ref_models,
369                  "aggregator_model": agg_model
370              }
371          }
372          
373          debug_call_data["success"] = True
374          debug_call_data["final_response_length"] = len(final_response)
375          debug_call_data["processing_time_seconds"] = processing_time
376          debug_call_data["models_used"] = result["models_used"]
377          
378          # Log debug information
379          _debug.log_call("mixture_of_agents_tool", debug_call_data)
380          _debug.save()
381          
382          return json.dumps(result, indent=2, ensure_ascii=False)
383          
384      except Exception as e:
385          error_msg = f"Error in MoA processing: {str(e)}"
386          logger.error("%s", error_msg, exc_info=True)
387          
388          # Calculate processing time even for errors
389          end_time = datetime.datetime.now()
390          processing_time = (end_time - start_time).total_seconds()
391          
392          # Prepare error response (minimal fields)
393          result = {
394              "success": False,
395              "response": "MoA processing failed. Please try again or use a single model for this query.",
396              "models_used": {
397                  "reference_models": reference_models or REFERENCE_MODELS,
398                  "aggregator_model": aggregator_model or AGGREGATOR_MODEL
399              },
400              "error": error_msg
401          }
402          
403          debug_call_data["error"] = error_msg
404          debug_call_data["processing_time_seconds"] = processing_time
405          _debug.log_call("mixture_of_agents_tool", debug_call_data)
406          _debug.save()
407          
408          return json.dumps(result, indent=2, ensure_ascii=False)
409  
410  
411  def check_moa_requirements() -> bool:
412      """
413      Check if all requirements for MoA tools are met.
414      
415      Returns:
416          bool: True if requirements are met, False otherwise
417      """
418      return check_openrouter_api_key()
419  
420  
421  
422  def get_moa_configuration() -> Dict[str, Any]:
423      """
424      Get the current MoA configuration settings.
425      
426      Returns:
427          Dict[str, Any]: Dictionary containing all configuration parameters
428      """
429      return {
430          "reference_models": REFERENCE_MODELS,
431          "aggregator_model": AGGREGATOR_MODEL,
432          "reference_temperature": REFERENCE_TEMPERATURE,
433          "aggregator_temperature": AGGREGATOR_TEMPERATURE,
434          "min_successful_references": MIN_SUCCESSFUL_REFERENCES,
435          "total_reference_models": len(REFERENCE_MODELS),
436          "failure_tolerance": f"{len(REFERENCE_MODELS) - MIN_SUCCESSFUL_REFERENCES}/{len(REFERENCE_MODELS)} models can fail"
437      }
438  
439  
440  if __name__ == "__main__":
441      """
442      Simple test/demo when run directly
443      """
444      print("šŸ¤– Mixture-of-Agents Tool Module")
445      print("=" * 50)
446      
447      # Check if API key is available
448      api_available = check_openrouter_api_key()
449      
450      if not api_available:
451          print("āŒ OPENROUTER_API_KEY environment variable not set")
452          print("Please set your API key: export OPENROUTER_API_KEY='your-key-here'")
453          print("Get API key at: https://openrouter.ai/")
454          exit(1)
455      else:
456          print("āœ… OpenRouter API key found")
457      
458      print("šŸ› ļø  MoA tools ready for use!")
459      
460      # Show current configuration
461      config = get_moa_configuration()
462      print("\nāš™ļø  Current Configuration:")
463      print(f"  šŸ¤– Reference models ({len(config['reference_models'])}): {', '.join(config['reference_models'])}")
464      print(f"  🧠 Aggregator model: {config['aggregator_model']}")
465      print(f"  šŸŒ”ļø  Reference temperature: {config['reference_temperature']}")
466      print(f"  šŸŒ”ļø  Aggregator temperature: {config['aggregator_temperature']}")
467      print(f"  šŸ›”ļø  Failure tolerance: {config['failure_tolerance']}")
468      print(f"  šŸ“Š Minimum successful models: {config['min_successful_references']}")
469      
470      # Show debug mode status
471      if _debug.active:
472          print(f"\nšŸ› Debug mode ENABLED - Session ID: {_debug.session_id}")
473          print(f"   Debug logs will be saved to: ./logs/moa_tools_debug_{_debug.session_id}.json")
474      else:
475          print("\nšŸ› Debug mode disabled (set MOA_TOOLS_DEBUG=true to enable)")
476      
477      print("\nBasic usage:")
478      print("  from mixture_of_agents_tool import mixture_of_agents_tool")
479      print("  import asyncio")
480      print("")
481      print("  async def main():")
482      print("      result = await mixture_of_agents_tool(")
483      print("          user_prompt='Solve this complex mathematical proof...'")
484      print("      )")
485      print("      print(result)")
486      print("  asyncio.run(main())")
487      
488      print("\nBest use cases:")
489      print("  - Complex mathematical proofs and calculations")
490      print("  - Advanced coding problems and algorithm design")
491      print("  - Multi-step analytical reasoning tasks")
492      print("  - Problems requiring diverse domain expertise")
493      print("  - Tasks where single models show limitations")
494      
495      print("\nPerformance characteristics:")
496      print("  - Higher latency due to multiple model calls")
497      print("  - Significantly improved quality for complex tasks")
498      print("  - Parallel processing for efficiency")
499      print(f"  - Optimized temperatures: {REFERENCE_TEMPERATURE} for reference models, {AGGREGATOR_TEMPERATURE} for aggregation")
500      print("  - Token-efficient: only returns final aggregated response")
501      print("  - Resilient: continues with partial model failures")
502      print("  - Configurable: easy to modify models and settings at top of file")
503      print("  - State-of-the-art results on challenging benchmarks")
504      
505      print("\nDebug mode:")
506      print("  # Enable debug logging")
507      print("  export MOA_TOOLS_DEBUG=true")
508      print("  # Debug logs capture all MoA processing steps and metrics")
509      print("  # Logs saved to: ./logs/moa_tools_debug_UUID.json")
510  
511  
512  # ---------------------------------------------------------------------------
513  # Registry
514  # ---------------------------------------------------------------------------
515  from tools.registry import registry
516  
517  MOA_SCHEMA = {
518      "name": "mixture_of_agents",
519      "description": "Route a hard problem through multiple frontier LLMs collaboratively. Makes 5 API calls (4 reference models + 1 aggregator) with maximum reasoning effort — use sparingly for genuinely difficult problems. Best for: complex math, advanced algorithms, multi-step analytical reasoning, problems benefiting from diverse perspectives.",
520      "parameters": {
521          "type": "object",
522          "properties": {
523              "user_prompt": {
524                  "type": "string",
525                  "description": "The complex query or problem to solve using multiple AI models. Should be a challenging problem that benefits from diverse perspectives and collaborative reasoning."
526              }
527          },
528          "required": ["user_prompt"]
529      }
530  }
531  
532  registry.register(
533      name="mixture_of_agents",
534      toolset="moa",
535      schema=MOA_SCHEMA,
536      handler=lambda args, **kw: mixture_of_agents_tool(user_prompt=args.get("user_prompt", "")),
537      check_fn=check_moa_requirements,
538      requires_env=["OPENROUTER_API_KEY"],
539      is_async=True,
540      emoji="🧠",
541  )