/ hybrid_orchestrator.py
hybrid_orchestrator.py
1 #!/usr/bin/env python3 2 """ 3 HYBRID AI ORCHESTRATOR - Cloud first, local fallback 4 """ 5 6 import os 7 import sys 8 import asyncio 9 import aiohttp 10 import json 11 import subprocess 12 import time 13 from datetime import datetime 14 15 class HybridOrchestrator: 16 def __init__(self): 17 # Set API keys from environment 18 self.openrouter_key = os.getenv('OPENROUTER_API_KEY') 19 self.groq_key = os.getenv('GROQ_API_KEY') 20 21 self.cloud_apis = [] 22 if self.openrouter_key: 23 self.cloud_apis.append('openrouter') 24 if self.groq_key: 25 self.cloud_apis.append('groq') 26 27 async def call_openrouter(self, query: str) -> dict: 28 """Call OpenRouter API""" 29 url = "https://openrouter.ai/api/v1/chat/completions" 30 headers = { 31 "Authorization": f"Bearer {self.openrouter_key}", 32 "Content-Type": "application/json" 33 } 34 data = { 35 "model": "mistralai/mistral-7b-instruct:free", 36 "messages": [{"role": "user", "content": query}], 37 "max_tokens": 500 38 } 39 40 try: 41 timeout = aiohttp.ClientTimeout(total=15) 42 async with aiohttp.ClientSession(timeout=timeout) as session: 43 async with session.post(url, headers=headers, json=data) as response: 44 if response.status == 200: 45 result = await response.json() 46 return { 47 "success": True, 48 "api": "openrouter", 49 "response": result['choices'][0]['message']['content'], 50 "tokens": result.get('usage', {}).get('total_tokens', 0) 51 } 52 else: 53 return {"success": False, "error": f"HTTP {response.status}"} 54 except Exception as e: 55 return {"success": False, "error": str(e)} 56 57 async def call_groq(self, query: str) -> dict: 58 """Call Groq API (with updated model)""" 59 url = "https://api.groq.com/openai/v1/chat/completions" 60 headers = { 61 "Authorization": f"Bearer {self.groq_key}", 62 "Content-Type": "application/json" 63 } 64 # Try updated models based on deprecation info 65 models_to_try = ["llama-3.3-70b-versatile", "llama-3.2-11b-vision-preview", "gemma2-9b-it"] 66 67 for model in models_to_try: 68 data = { 69 "model": model, 70 "messages": [{"role": "user", "content": query}], 71 "max_tokens": 500, 72 "temperature": 0.7 73 } 74 75 try: 76 timeout = aiohttp.ClientTimeout(total=15) 77 async with aiohttp.ClientSession(timeout=timeout) as session: 78 async with session.post(url, headers=headers, json=data) as response: 79 if response.status == 200: 80 result = await response.json() 81 return { 82 "success": True, 83 "api": "groq", 84 "model": model, 85 "response": result['choices'][0]['message']['content'], 86 "tokens": result.get('usage', {}).get('total_tokens', 0) 87 } 88 elif response.status == 400: 89 # Try next model 90 continue 91 except: 92 continue 93 94 return {"success": False, "error": "All models failed"} 95 96 def call_local(self, query: str, model: str = "tinyllama") -> dict: 97 """Call local model""" 98 try: 99 cmd = ["python3", "ultimate_orchestrator.py", model, query] 100 result = subprocess.run( 101 cmd, 102 capture_output=True, 103 text=True, 104 timeout=120 105 ) 106 107 if result.returncode == 0: 108 # Extract answer 109 lines = result.stdout.split('\n') 110 answer = "" 111 in_answer = False 112 113 for line in lines: 114 if "FINAL ANSWER" in line or "šÆ FINAL ANSWER" in line: 115 in_answer = True 116 continue 117 if in_answer and line.strip() and "====" not in line: 118 answer += line.strip() + "\n" 119 if in_answer and "====" in line and answer: 120 break 121 122 if not answer: 123 # Fallback: get last meaningful content 124 for line in reversed(lines): 125 if line.strip() and len(line.strip()) > 20 and "====" not in line: 126 answer = line.strip() 127 break 128 129 return { 130 "success": True, 131 "api": "local", 132 "model": model, 133 "response": answer.strip(), 134 "tokens": 0 135 } 136 else: 137 return {"success": False, "error": f"Exit code {result.returncode}"} 138 except subprocess.TimeoutExpired: 139 return {"success": False, "error": "Timeout"} 140 except Exception as e: 141 return {"success": False, "error": str(e)} 142 143 async def orchestrate(self, query: str, force_local: bool = False) -> dict: 144 """Main orchestration method""" 145 print(f"\nš¤ HYBRID ORCHESTRATOR") 146 print("="*60) 147 print(f"Query: {query[:80]}...") 148 print("="*60) 149 150 start_time = time.time() 151 results = [] 152 153 if force_local: 154 print("\nš§ Forced local mode") 155 # Try local models in order of quality 156 for model in ["mistral", "llama2", "tinyllama"]: 157 print(f" Trying {model}...") 158 result = self.call_local(query, model) 159 if result["success"]: 160 results.append(result) 161 print(f" ā {model} succeeded") 162 break 163 else: 164 print(f" ā {model} failed: {result['error'][:30]}") 165 else: 166 print("\nāļø Trying cloud APIs...") 167 # Try cloud APIs in parallel 168 cloud_tasks = [] 169 if 'openrouter' in self.cloud_apis: 170 cloud_tasks.append(self.call_openrouter(query)) 171 if 'groq' in self.cloud_apis: 172 cloud_tasks.append(self.call_groq(query)) 173 174 if cloud_tasks: 175 cloud_results = await asyncio.gather(*cloud_tasks) 176 cloud_success = False 177 178 for result in cloud_results: 179 if result["success"]: 180 results.append(result) 181 cloud_success = True 182 print(f" ā {result['api']} succeeded") 183 184 if not cloud_success: 185 print(" ā ļø All cloud APIs failed, falling back to local...") 186 # Fallback to local 187 for model in ["tinyllama", "llama2", "mistral"]: 188 print(f" Trying {model}...") 189 result = self.call_local(query, model) 190 if result["success"]: 191 results.append(result) 192 print(f" ā {model} succeeded") 193 break 194 else: 195 print(" ā ļø No cloud APIs configured, using local...") 196 result = self.call_local(query, "tinyllama") 197 if result["success"]: 198 results.append(result) 199 200 # Process results 201 elapsed = time.time() - start_time 202 203 if results: 204 best_result = results[0] # First successful result 205 return { 206 "success": True, 207 "query": query, 208 "response": best_result["response"], 209 "source": best_result["api"], 210 "model": best_result.get("model", ""), 211 "tokens": best_result.get("tokens", 0), 212 "time": elapsed, 213 "timestamp": datetime.now().isoformat() 214 } 215 else: 216 return { 217 "success": False, 218 "query": query, 219 "error": "All orchestration methods failed", 220 "time": elapsed, 221 "timestamp": datetime.now().isoformat() 222 } 223 224 async def main(): 225 if len(sys.argv) < 2: 226 print("Usage: python hybrid_orchestrator.py \"Your query\"") 227 print(" python hybrid_orchestrator.py --local \"Query\"") 228 sys.exit(1) 229 230 force_local = False 231 query_start = 1 232 233 if sys.argv[1] == "--local": 234 force_local = True 235 query_start = 2 236 237 query = " ".join(sys.argv[query_start:]) 238 239 orchestrator = HybridOrchestrator() 240 result = await orchestrator.orchestrate(query, force_local) 241 242 print("\n" + "="*60) 243 if result["success"]: 244 print("ā ORCHESTRATION SUCCESSFUL") 245 print("="*60) 246 print(f"Source: {result['source']}") 247 if result.get('model'): 248 print(f"Model: {result['model']}") 249 print(f"Time: {result['time']:.1f}s") 250 if result.get('tokens', 0) > 0: 251 print(f"Tokens: {result['tokens']}") 252 253 print("\nšÆ ANSWER:") 254 print("="*60) 255 print(result['response']) 256 print("="*60) 257 258 # Save result 259 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 260 filename = f"hybrid_result_{timestamp}.json" 261 with open(filename, "w") as f: 262 json.dump(result, f, indent=2) 263 print(f"\nš¾ Saved to: {filename}") 264 else: 265 print("ā ORCHESTRATION FAILED") 266 print("="*60) 267 print(f"Error: {result['error']}") 268 269 if __name__ == "__main__": 270 asyncio.run(main())