/ cloud_orchestrator_v2.py
cloud_orchestrator_v2.py
  1  #!/usr/bin/env python3
  2  """
  3  ENHANCED CLOUD AI ORCHESTRATOR
  4  Better error handling, model fallbacks, and response synthesis
  5  """
  6  
  7  import os
  8  import sys
  9  import asyncio
 10  import aiohttp
 11  import json
 12  import time
 13  from datetime import datetime
 14  from typing import List, Dict, Optional
 15  
 16  class EnhancedCloudOrchestrator:
 17      def __init__(self):
 18          self.apis = {
 19              "openrouter": {
 20                  "url": "https://openrouter.ai/api/v1/chat/completions",
 21                  "headers": {"Authorization": f"Bearer {os.getenv('OPENROUTER_API_KEY')}"},
 22                  "models": [
 23                      "mistralai/mistral-7b-instruct:free",
 24                      "google/gemma-7b-it:free",
 25                      "meta-llama/llama-3.1-8b-instruct:free"
 26                  ],
 27                  "timeout": 15
 28              },
 29              "groq": {
 30                  "url": "https://api.groq.com/openai/v1/chat/completions",
 31                  "headers": {"Authorization": f"Bearer {os.getenv('GROQ_API_KEY')}"},
 32                  "models": [
 33                      "llama-3.1-8b-instant",  # Try this first
 34                      "mixtral-8x7b-32768",
 35                      "gemma-7b-it"
 36                  ],
 37                  "timeout": 15
 38              }
 39          }
 40          
 41      async def call_api_with_fallback(self, api_name: str, prompt: str) -> Optional[Dict]:
 42          """Call API with model fallback"""
 43          if api_name not in self.apis:
 44              return None
 45              
 46          config = self.apis[api_name]
 47          headers = config["headers"].copy()
 48          headers["Content-Type"] = "application/json"
 49          
 50          for model in config["models"]:
 51              data = {
 52                  "model": model,
 53                  "messages": [{"role": "user", "content": prompt}],
 54                  "max_tokens": 500,
 55                  "temperature": 0.7
 56              }
 57              
 58              try:
 59                  timeout = aiohttp.ClientTimeout(total=config["timeout"])
 60                  async with aiohttp.ClientSession(timeout=timeout) as session:
 61                      async with session.post(config["url"], headers=headers, json=data) as response:
 62                          if response.status == 200:
 63                              result = await response.json()
 64                              return {
 65                                  "api": api_name,
 66                                  "model": model,
 67                                  "response": result["choices"][0]["message"]["content"],
 68                                  "tokens": result.get("usage", {}).get("total_tokens", 0)
 69                              }
 70                          elif response.status == 400:
 71                              # Try next model
 72                              continue
 73                          else:
 74                              print(f"   āš ļø  {api_name} ({model}): Error {response.status}")
 75                              return None
 76              except Exception as e:
 77                  print(f"   āš ļø  {api_name} ({model}): {str(e)[:50]}")
 78                  continue
 79          
 80          return None
 81      
 82      def synthesize_responses(self, responses: List[Dict]) -> str:
 83          """Intelligently combine multiple API responses"""
 84          if not responses:
 85              return "No cloud APIs responded successfully."
 86          
 87          if len(responses) == 1:
 88              resp = responses[0]
 89              return f"[{resp['api'].upper()}] {resp['response']}"
 90          
 91          # Multiple responses - create a synthesized answer
 92          synthesis = "## Synthesized Answer (from multiple AI models)\n\n"
 93          
 94          # Extract key points from each response
 95          key_points = []
 96          for resp in responses:
 97              text = resp['response']
 98              # Simple extraction: take first 2-3 sentences as key points
 99              sentences = text.split('. ')
100              key = '. '.join(sentences[:3]) + '.' if len(sentences) > 3 else text[:300]
101              key_points.append(f"**{resp['api'].upper()}** ({resp['model']}): {key}")
102          
103          for point in key_points:
104              synthesis += f"- {point}\n\n"
105          
106          # Add a combined summary
107          synthesis += "\n### Summary\n"
108          synthesis += "Based on analysis from multiple AI models, "
109          synthesis += responses[0]['response'][:200] + "..."
110          
111          return synthesis
112      
113      async def orchestrate(self, query: str) -> dict:
114          """Enhanced orchestration with better synthesis"""
115          print(f"\nšŸš€ Processing: {query[:80]}...")
116          
117          # Try all APIs in parallel
118          tasks = []
119          for api_name in self.apis.keys():
120              task = self.call_api_with_fallback(api_name, query)
121              tasks.append(task)
122          
123          results = await asyncio.gather(*tasks)
124          successful_responses = [r for r in results if r is not None]
125          
126          # Synthesize
127          final_answer = self.synthesize_responses(successful_responses)
128          
129          # Generate metadata
130          metadata = {
131              "query": query,
132              "timestamp": datetime.now().isoformat(),
133              "responses": successful_responses,
134              "apis_used": [r["api"] for r in successful_responses],
135              "models_used": [r["model"] for r in successful_responses],
136              "total_tokens": sum(r.get("tokens", 0) for r in successful_responses),
137              "response_count": len(successful_responses)
138          }
139          
140          return {
141              **metadata,
142              "final_answer": final_answer
143          }
144  
145  async def main():
146      if len(sys.argv) < 2:
147          print("Usage: python cloud_orchestrator_v2.py \"Your query here\"")
148          sys.exit(1)
149      
150      query = " ".join(sys.argv[1:])
151      
152      print("\n" + "="*60)
153      print("🌐 ENHANCED CLOUD AI ORCHESTRATOR")
154      print("="*60)
155      print("Features: Model fallback, Response synthesis, Better error handling")
156      print("="*60)
157      
158      orchestrator = EnhancedCloudOrchestrator()
159      start_time = time.time()
160      result = await orchestrator.orchestrate(query)
161      elapsed = time.time() - start_time
162      
163      print("\n" + "="*60)
164      print("āœ… ORCHESTRATION COMPLETE")
165      print("="*60)
166      print(f"Query: {result['query'][:80]}...")
167      print(f"Time: {elapsed:.1f}s")
168      print(f"APIs: {', '.join(result['apis_used']) if result['apis_used'] else 'None'}")
169      print(f"Models: {', '.join(result['models_used']) if result['models_used'] else 'None'}")
170      print(f"Responses synthesized: {result['response_count']}")
171      
172      print("\nšŸŽÆ ANSWER:")
173      print("="*60)
174      print(result['final_answer'])
175      print("="*60)
176      
177      # Save enhanced results
178      timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
179      filename = f"enhanced_result_{timestamp}.json"
180      with open(filename, "w") as f:
181          json.dump(result, f, indent=2)
182      
183      # Also save a readable version
184      txt_filename = f"enhanced_result_{timestamp}.txt"
185      with open(txt_filename, "w") as f:
186          f.write(f"Query: {result['query']}\n")
187          f.write(f"Time: {elapsed:.1f}s\n")
188          f.write(f"APIs used: {', '.join(result['apis_used'])}\n")
189          f.write(f"Models: {', '.join(result['models_used'])}\n")
190          f.write(f"Total tokens: {result['total_tokens']}\n")
191          f.write("\n" + "="*60 + "\n")
192          f.write("ANSWER:\n")
193          f.write("="*60 + "\n")
194          f.write(result['final_answer'])
195      
196      print(f"\nšŸ’¾ Saved:")
197      print(f"  šŸ“Š JSON: {filename}")
198      print(f"  šŸ“ Text: {txt_filename}")
199  
200  if __name__ == "__main__":
201      asyncio.run(main())