/ hybrid_orchestrator_v2.py
hybrid_orchestrator_v2.py
1 #!/usr/bin/env python3 2 """ 3 IMPROVED HYBRID ORCHESTRATOR 4 Better response cleaning and error handling 5 """ 6 7 import os 8 import sys 9 import asyncio 10 import aiohttp 11 import json 12 import subprocess 13 import time 14 import re 15 from datetime import datetime 16 17 def clean_ai_response(text: str) -> str: 18 """Clean and normalize AI responses""" 19 if not text or not isinstance(text, str): 20 return "No valid response generated." 21 22 # Remove common AI formatting tags 23 tags_to_remove = [ 24 r'<s>', r'</s>', r'\[OUT\]', r'\[/OUT\]', r'\[/s\]', 25 r'<\|.*?\|>', r'\[.*?\]', r'\(.*?\)' 26 ] 27 28 cleaned = text 29 for pattern in tags_to_remove: 30 cleaned = re.sub(pattern, '', cleaned) 31 32 # Remove extra whitespace 33 cleaned = re.sub(r'\s+', ' ', cleaned).strip() 34 35 # Capitalize first letter if needed 36 if cleaned and cleaned[0].islower(): 37 cleaned = cleaned[0].upper() + cleaned[1:] 38 39 # Ensure it ends with proper punctuation 40 if cleaned and cleaned[-1] not in '.!?': 41 cleaned += '.' 42 43 return cleaned if len(cleaned) > 10 else "Response was too brief. Please try rephrasing your question." 44 45 class ImprovedHybridOrchestrator: 46 def __init__(self): 47 self.openrouter_key = os.getenv('OPENROUTER_API_KEY') 48 self.groq_key = os.getenv('GROQ_API_KEY') 49 50 # Updated Groq models based on deprecation info 51 self.groq_models = [ 52 "llama-3.3-70b-versatile", 53 "llama-3.2-90b-vision-preview", 54 "gemma2-9b-it", 55 "mixtral-8x7b-32768" # Keep as last fallback 56 ] 57 58 async def call_openrouter(self, query: str) -> dict: 59 """Call OpenRouter with better prompting""" 60 url = "https://openrouter.ai/api/v1/chat/completions" 61 headers = { 62 "Authorization": f"Bearer {self.openrouter_key}", 63 "Content-Type": "application/json", 64 "HTTP-Referer": "https://github.com/your-repo", # Optional but good practice 65 "X-Title": "AI Orchestrator" 66 } 67 68 # Better system prompt for cleaner responses 69 messages = [ 70 {"role": "system", "content": "You are a helpful AI assistant. Provide clear, concise answers without markdown formatting, XML tags, or special tokens. Just give the plain text answer."}, 71 {"role": "user", "content": query} 72 ] 73 74 data = { 75 "model": "mistralai/mistral-7b-instruct:free", 76 "messages": messages, 77 "max_tokens": 300, 78 "temperature": 0.7 79 } 80 81 try: 82 timeout = aiohttp.ClientTimeout(total=15) 83 async with aiohttp.ClientSession(timeout=timeout) as session: 84 async with session.post(url, headers=headers, json=data) as response: 85 if response.status == 200: 86 result = await response.json() 87 raw_text = result['choices'][0]['message']['content'] 88 cleaned_text = clean_ai_response(raw_text) 89 90 return { 91 "success": True, 92 "api": "openrouter", 93 "raw_response": raw_text, 94 "response": cleaned_text, 95 "tokens": result.get('usage', {}).get('total_tokens', 0) 96 } 97 else: 98 error_text = await response.text() 99 return {"success": False, "error": f"HTTP {response.status}: {error_text[:100]}"} 100 except Exception as e: 101 return {"success": False, "error": str(e)[:100]} 102 103 async def call_groq(self, query: str) -> dict: 104 """Call Groq API with updated models""" 105 url = "https://api.groq.com/openai/v1/chat/completions" 106 headers = { 107 "Authorization": f"Bearer {self.groq_key}", 108 "Content-Type": "application/json" 109 } 110 111 for model in self.groq_models: 112 messages = [ 113 {"role": "system", "content": "Provide clear, direct answers without special formatting or tokens."}, 114 {"role": "user", "content": query} 115 ] 116 117 data = { 118 "model": model, 119 "messages": messages, 120 "max_tokens": 300, 121 "temperature": 0.7 122 } 123 124 try: 125 timeout = aiohttp.ClientTimeout(total=15) 126 async with aiohttp.ClientSession(timeout=timeout) as session: 127 async with session.post(url, headers=headers, json=data) as response: 128 if response.status == 200: 129 result = await response.json() 130 raw_text = result['choices'][0]['message']['content'] 131 cleaned_text = clean_ai_response(raw_text) 132 133 return { 134 "success": True, 135 "api": "groq", 136 "model": model, 137 "raw_response": raw_text, 138 "response": cleaned_text, 139 "tokens": result.get('usage', {}).get('total_tokens', 0) 140 } 141 elif response.status == 400: 142 # Try next model 143 continue 144 except: 145 continue 146 147 return {"success": False, "error": "All Groq models failed"} 148 149 def call_local(self, query: str, model: str = "tinyllama") -> dict: 150 """Call local model with improved parsing""" 151 try: 152 cmd = ["python3", "ultimate_orchestrator.py", model, query] 153 result = subprocess.run( 154 cmd, 155 capture_output=True, 156 text=True, 157 timeout=180 158 ) 159 160 if result.returncode == 0: 161 # Improved answer extraction 162 lines = result.stdout.split('\n') 163 answer_lines = [] 164 capture = False 165 166 for line in lines: 167 if "FINAL ANSWER" in line or "šÆ FINAL ANSWER" in line: 168 capture = True 169 continue 170 if capture and ("====" in line or "ORCHESTRATION COMPLETE" in line): 171 break 172 if capture and line.strip(): 173 answer_lines.append(line.strip()) 174 175 raw_text = ' '.join(answer_lines) if answer_lines else result.stdout[-500:] 176 cleaned_text = clean_ai_response(raw_text) 177 178 return { 179 "success": True, 180 "api": "local", 181 "model": model, 182 "raw_response": raw_text, 183 "response": cleaned_text, 184 "tokens": 0 185 } 186 else: 187 return {"success": False, "error": f"Exit code {result.returncode}"} 188 except subprocess.TimeoutExpired: 189 return {"success": False, "error": "Timeout (3 minutes)"} 190 except Exception as e: 191 return {"success": False, "error": str(e)} 192 193 async def orchestrate(self, query: str, strategy: str = "auto") -> dict: 194 """Improved orchestration with strategy selection""" 195 print(f"\nš¤ IMPROVED HYBRID ORCHESTRATOR") 196 print("="*60) 197 print(f"Query: {query[:80]}...") 198 print(f"Strategy: {strategy}") 199 print("="*60) 200 201 start_time = time.time() 202 results = [] 203 204 if strategy == "local": 205 print("\nš§ Local-only strategy") 206 for model in ["mistral", "llama2", "tinyllama"]: 207 print(f" Trying {model}...") 208 result = self.call_local(query, model) 209 if result["success"]: 210 results.append(result) 211 print(f" ā {model}: Success") 212 break 213 else: 214 print(f" ā {model}: {result['error'][:30]}") 215 216 elif strategy == "cloud": 217 print("\nāļø Cloud-only strategy") 218 cloud_tasks = [] 219 cloud_tasks.append(self.call_openrouter(query)) 220 cloud_tasks.append(self.call_groq(query)) 221 222 cloud_results = await asyncio.gather(*cloud_tasks) 223 224 for result in cloud_results: 225 if result["success"]: 226 results.append(result) 227 print(f" ā {result['api']}: Success") 228 229 if not results: 230 print(" ā ļø All cloud APIs failed") 231 232 else: # auto (hybrid) 233 print("\nš¤ Auto strategy (Cloud first ā Local fallback)") 234 235 # Try cloud first 236 cloud_tasks = [] 237 cloud_tasks.append(self.call_openrouter(query)) 238 cloud_tasks.append(self.call_groq(query)) 239 240 cloud_results = await asyncio.gather(*cloud_tasks) 241 cloud_success = False 242 243 for result in cloud_results: 244 if result["success"]: 245 results.append(result) 246 cloud_success = True 247 print(f" āļø {result['api']}: Success") 248 249 if not cloud_success: 250 print(" ā ļø Cloud APIs failed, trying local...") 251 for model in ["tinyllama", "llama2", "mistral"]: 252 print(f" š¤ Trying {model}...") 253 result = self.call_local(query, model) 254 if result["success"]: 255 results.append(result) 256 print(f" ā {model}: Success") 257 break 258 259 # Process results 260 elapsed = time.time() - start_time 261 262 if results: 263 best_result = results[0] 264 return { 265 "success": True, 266 "query": query, 267 "strategy": strategy, 268 "response": best_result["response"], 269 "raw_response": best_result.get("raw_response", ""), 270 "source": best_result["api"], 271 "model": best_result.get("model", ""), 272 "tokens": best_result.get("tokens", 0), 273 "time": elapsed, 274 "responses_tried": len(results), 275 "timestamp": datetime.now().isoformat() 276 } 277 else: 278 return { 279 "success": False, 280 "query": query, 281 "strategy": strategy, 282 "error": "All orchestration methods failed", 283 "time": elapsed, 284 "timestamp": datetime.now().isoformat() 285 } 286 287 async def main(): 288 if len(sys.argv) < 2: 289 print("Usage: python hybrid_orchestrator_v2.py \"Your query\"") 290 print(" python hybrid_orchestrator_v2.py --strategy [auto|cloud|local] \"Query\"") 291 sys.exit(1) 292 293 # Parse arguments 294 strategy = "auto" 295 query_start = 1 296 297 if sys.argv[1] == "--strategy" and len(sys.argv) > 3: 298 strategy = sys.argv[2] 299 query_start = 3 300 301 query = " ".join(sys.argv[query_start:]) 302 303 orchestrator = ImprovedHybridOrchestrator() 304 result = await orchestrator.orchestrate(query, strategy) 305 306 print("\n" + "="*60) 307 if result["success"]: 308 print("ā ORCHESTRATION SUCCESSFUL") 309 print("="*60) 310 print(f"Strategy: {result['strategy']}") 311 print(f"Source: {result['source']}") 312 if result.get('model'): 313 print(f"Model: {result['model']}") 314 print(f"Time: {result['time']:.1f}s") 315 print(f"Responses tried: {result['responses_tried']}") 316 if result.get('tokens', 0) > 0: 317 print(f"Tokens: {result['tokens']}") 318 319 print("\nšÆ ANSWER:") 320 print("="*60) 321 print(result['response']) 322 print("="*60) 323 324 # Save both raw and cleaned 325 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 326 filename = f"improved_result_{timestamp}.json" 327 with open(filename, "w") as f: 328 json.dump(result, f, indent=2) 329 print(f"\nš¾ Saved to: {filename}") 330 else: 331 print("ā ORCHESTRATION FAILED") 332 print("="*60) 333 print(f"Error: {result['error']}") 334 print(f"Strategy: {result['strategy']}") 335 print(f"Time: {result['time']:.1f}s") 336 337 if __name__ == "__main__": 338 asyncio.run(main())