/ cloud_orchestrator_v2.py
cloud_orchestrator_v2.py
1 #!/usr/bin/env python3 2 """ 3 ENHANCED CLOUD AI ORCHESTRATOR 4 Better error handling, model fallbacks, and response synthesis 5 """ 6 7 import os 8 import sys 9 import asyncio 10 import aiohttp 11 import json 12 import time 13 from datetime import datetime 14 from typing import List, Dict, Optional 15 16 class EnhancedCloudOrchestrator: 17 def __init__(self): 18 self.apis = { 19 "openrouter": { 20 "url": "https://openrouter.ai/api/v1/chat/completions", 21 "headers": {"Authorization": f"Bearer {os.getenv('OPENROUTER_API_KEY')}"}, 22 "models": [ 23 "mistralai/mistral-7b-instruct:free", 24 "google/gemma-7b-it:free", 25 "meta-llama/llama-3.1-8b-instruct:free" 26 ], 27 "timeout": 15 28 }, 29 "groq": { 30 "url": "https://api.groq.com/openai/v1/chat/completions", 31 "headers": {"Authorization": f"Bearer {os.getenv('GROQ_API_KEY')}"}, 32 "models": [ 33 "llama-3.1-8b-instant", # Try this first 34 "mixtral-8x7b-32768", 35 "gemma-7b-it" 36 ], 37 "timeout": 15 38 } 39 } 40 41 async def call_api_with_fallback(self, api_name: str, prompt: str) -> Optional[Dict]: 42 """Call API with model fallback""" 43 if api_name not in self.apis: 44 return None 45 46 config = self.apis[api_name] 47 headers = config["headers"].copy() 48 headers["Content-Type"] = "application/json" 49 50 for model in config["models"]: 51 data = { 52 "model": model, 53 "messages": [{"role": "user", "content": prompt}], 54 "max_tokens": 500, 55 "temperature": 0.7 56 } 57 58 try: 59 timeout = aiohttp.ClientTimeout(total=config["timeout"]) 60 async with aiohttp.ClientSession(timeout=timeout) as session: 61 async with session.post(config["url"], headers=headers, json=data) as response: 62 if response.status == 200: 63 result = await response.json() 64 return { 65 "api": api_name, 66 "model": model, 67 "response": result["choices"][0]["message"]["content"], 68 "tokens": result.get("usage", {}).get("total_tokens", 0) 69 } 70 elif response.status == 400: 71 # Try next model 72 continue 73 else: 74 print(f" ā ļø {api_name} ({model}): Error {response.status}") 75 return None 76 except Exception as e: 77 print(f" ā ļø {api_name} ({model}): {str(e)[:50]}") 78 continue 79 80 return None 81 82 def synthesize_responses(self, responses: List[Dict]) -> str: 83 """Intelligently combine multiple API responses""" 84 if not responses: 85 return "No cloud APIs responded successfully." 86 87 if len(responses) == 1: 88 resp = responses[0] 89 return f"[{resp['api'].upper()}] {resp['response']}" 90 91 # Multiple responses - create a synthesized answer 92 synthesis = "## Synthesized Answer (from multiple AI models)\n\n" 93 94 # Extract key points from each response 95 key_points = [] 96 for resp in responses: 97 text = resp['response'] 98 # Simple extraction: take first 2-3 sentences as key points 99 sentences = text.split('. ') 100 key = '. '.join(sentences[:3]) + '.' if len(sentences) > 3 else text[:300] 101 key_points.append(f"**{resp['api'].upper()}** ({resp['model']}): {key}") 102 103 for point in key_points: 104 synthesis += f"- {point}\n\n" 105 106 # Add a combined summary 107 synthesis += "\n### Summary\n" 108 synthesis += "Based on analysis from multiple AI models, " 109 synthesis += responses[0]['response'][:200] + "..." 110 111 return synthesis 112 113 async def orchestrate(self, query: str) -> dict: 114 """Enhanced orchestration with better synthesis""" 115 print(f"\nš Processing: {query[:80]}...") 116 117 # Try all APIs in parallel 118 tasks = [] 119 for api_name in self.apis.keys(): 120 task = self.call_api_with_fallback(api_name, query) 121 tasks.append(task) 122 123 results = await asyncio.gather(*tasks) 124 successful_responses = [r for r in results if r is not None] 125 126 # Synthesize 127 final_answer = self.synthesize_responses(successful_responses) 128 129 # Generate metadata 130 metadata = { 131 "query": query, 132 "timestamp": datetime.now().isoformat(), 133 "responses": successful_responses, 134 "apis_used": [r["api"] for r in successful_responses], 135 "models_used": [r["model"] for r in successful_responses], 136 "total_tokens": sum(r.get("tokens", 0) for r in successful_responses), 137 "response_count": len(successful_responses) 138 } 139 140 return { 141 **metadata, 142 "final_answer": final_answer 143 } 144 145 async def main(): 146 if len(sys.argv) < 2: 147 print("Usage: python cloud_orchestrator_v2.py \"Your query here\"") 148 sys.exit(1) 149 150 query = " ".join(sys.argv[1:]) 151 152 print("\n" + "="*60) 153 print("š ENHANCED CLOUD AI ORCHESTRATOR") 154 print("="*60) 155 print("Features: Model fallback, Response synthesis, Better error handling") 156 print("="*60) 157 158 orchestrator = EnhancedCloudOrchestrator() 159 start_time = time.time() 160 result = await orchestrator.orchestrate(query) 161 elapsed = time.time() - start_time 162 163 print("\n" + "="*60) 164 print("ā ORCHESTRATION COMPLETE") 165 print("="*60) 166 print(f"Query: {result['query'][:80]}...") 167 print(f"Time: {elapsed:.1f}s") 168 print(f"APIs: {', '.join(result['apis_used']) if result['apis_used'] else 'None'}") 169 print(f"Models: {', '.join(result['models_used']) if result['models_used'] else 'None'}") 170 print(f"Responses synthesized: {result['response_count']}") 171 172 print("\nšÆ ANSWER:") 173 print("="*60) 174 print(result['final_answer']) 175 print("="*60) 176 177 # Save enhanced results 178 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 179 filename = f"enhanced_result_{timestamp}.json" 180 with open(filename, "w") as f: 181 json.dump(result, f, indent=2) 182 183 # Also save a readable version 184 txt_filename = f"enhanced_result_{timestamp}.txt" 185 with open(txt_filename, "w") as f: 186 f.write(f"Query: {result['query']}\n") 187 f.write(f"Time: {elapsed:.1f}s\n") 188 f.write(f"APIs used: {', '.join(result['apis_used'])}\n") 189 f.write(f"Models: {', '.join(result['models_used'])}\n") 190 f.write(f"Total tokens: {result['total_tokens']}\n") 191 f.write("\n" + "="*60 + "\n") 192 f.write("ANSWER:\n") 193 f.write("="*60 + "\n") 194 f.write(result['final_answer']) 195 196 print(f"\nš¾ Saved:") 197 print(f" š JSON: {filename}") 198 print(f" š Text: {txt_filename}") 199 200 if __name__ == "__main__": 201 asyncio.run(main())