/ ai_orchestrator_final.py
ai_orchestrator_final.py
1 #!/usr/bin/env python3 2 """ 3 FINAL AI ORCHESTRATOR - Reliable, clean, handles edge cases 4 """ 5 6 import os 7 import sys 8 import asyncio 9 import aiohttp 10 import json 11 import subprocess 12 import time 13 import re 14 from datetime import datetime 15 16 class FinalAIOrchestrator: 17 def __init__(self): 18 self.openrouter_key = os.getenv('OPENROUTER_API_KEY') 19 self.groq_key = os.getenv('GROQ_API_KEY') 20 21 def improve_prompt(self, query: str) -> str: 22 """Improve prompts for better responses""" 23 query_lower = query.lower() 24 25 # Math queries 26 if any(pattern in query_lower for pattern in ['+', '-', '*', '/', '=', 'calculate', 'math']): 27 if '?' in query: 28 return query 29 return f"{query}?" 30 31 # Very short/simple queries 32 if len(query.split()) <= 3 and '?' not in query: 33 return f"{query}?" 34 35 return query 36 37 def create_system_prompt(self, query: str) -> str: 38 """Create dynamic system prompt based on query type""" 39 query_lower = query.lower() 40 41 if any(word in query_lower for word in ['math', 'calculate', '+', '-', '*', '/', '=']): 42 return """You are a precise calculator. Provide only the numerical answer or simple factual response.""" 43 elif len(query.split()) <= 5: 44 return """You are a concise assistant. Answer questions directly with minimal words.""" 45 else: 46 return """You are a helpful AI assistant. Provide clear, informative answers without unnecessary formatting.""" 47 48 async def call_openrouter(self, query: str) -> dict: 49 """Call OpenRouter with optimized prompting""" 50 if not self.openrouter_key: 51 return {"success": False, "error": "No API key"} 52 53 url = "https://openrouter.ai/api/v1/chat/completions" 54 headers = { 55 "Authorization": f"Bearer {self.openrouter_key}", 56 "Content-Type": "application/json", 57 "HTTP-Referer": "https://github.com/ai-orchestrator" 58 } 59 60 improved_query = self.improve_prompt(query) 61 62 messages = [ 63 {"role": "system", "content": self.create_system_prompt(query)}, 64 {"role": "user", "content": improved_query} 65 ] 66 67 data = { 68 "model": "mistralai/mistral-7b-instruct:free", 69 "messages": messages, 70 "max_tokens": 300, 71 "temperature": 0.3, # Lower for more consistent answers 72 "top_p": 0.9 73 } 74 75 try: 76 timeout = aiohttp.ClientTimeout(total=10) 77 async with aiohttp.ClientSession(timeout=timeout) as session: 78 async with session.post(url, headers=headers, json=data) as response: 79 if response.status == 200: 80 result = await response.json() 81 answer = result['choices'][0]['message']['content'] 82 83 # Clean and validate 84 answer = self.clean_response(answer) 85 86 if not answer or len(answer.strip()) < 1: 87 return {"success": False, "error": "Empty response"} 88 89 return { 90 "success": True, 91 "api": "openrouter", 92 "response": answer, 93 "tokens": result.get('usage', {}).get('total_tokens', 0) 94 } 95 else: 96 return {"success": False, "error": f"HTTP {response.status}"} 97 except Exception as e: 98 return {"success": False, "error": str(e)[:100]} 99 100 async def call_groq(self, query: str) -> dict: 101 """Call Groq as backup""" 102 if not self.groq_key: 103 return {"success": False, "error": "No API key"} 104 105 url = "https://api.groq.com/openai/v1/chat/completions" 106 headers = { 107 "Authorization": f"Bearer {self.groq_key}", 108 "Content-Type": "application/json" 109 } 110 111 models = ["llama-3.3-70b-versatile", "gemma2-9b-it"] 112 113 for model in models: 114 messages = [ 115 {"role": "system", "content": "Answer questions directly and concisely."}, 116 {"role": "user", "content": self.improve_prompt(query)} 117 ] 118 119 data = { 120 "model": model, 121 "messages": messages, 122 "max_tokens": 100, 123 "temperature": 0.3 124 } 125 126 try: 127 timeout = aiohttp.ClientTimeout(total=10) 128 async with aiohttp.ClientSession(timeout=timeout) as session: 129 async with session.post(url, headers=headers, json=data) as response: 130 if response.status == 200: 131 result = await response.json() 132 answer = result['choices'][0]['message']['content'] 133 answer = self.clean_response(answer) 134 135 if answer and len(answer.strip()) > 0: 136 return { 137 "success": True, 138 "api": "groq", 139 "model": model, 140 "response": answer, 141 "tokens": result.get('usage', {}).get('total_tokens', 0) 142 } 143 except: 144 continue 145 146 return {"success": False, "error": "All Groq models failed"} 147 148 def clean_response(self, text: str) -> str: 149 """Clean AI responses""" 150 if not text: 151 return "" 152 153 # Remove tags 154 text = re.sub(r'<s>|</s>|\[OUT\]|\[/OUT\]|\[/s\]|<\|.*?\|>', '', text) 155 156 # Remove "Answer:" prefixes 157 text = re.sub(r'^(Answer|Response|A|The answer is)\s*[:.-]\s*', '', text, flags=re.IGNORECASE) 158 159 # Clean whitespace 160 text = re.sub(r'\s+', ' ', text).strip() 161 162 # Ensure it ends with proper punctuation for statements 163 if text and text[-1] not in '.!?' and len(text.split()) > 3: 164 text += '.' 165 166 return text 167 168 async def orchestrate(self, query: str, mode: str = "auto") -> dict: 169 """Main orchestration with mode selection""" 170 print(f"\n🤖 AI ORCHESTRATOR") 171 print("="*60) 172 print(f"Query: {query}") 173 print(f"Mode: {mode}") 174 print("="*60) 175 176 start_time = time.time() 177 result = None 178 179 if mode == "cloud": 180 print("\n☁️ Cloud-only mode") 181 result = await self.call_openrouter(query) 182 if not result["success"]: 183 result = await self.call_groq(query) 184 185 elif mode == "local": 186 print("\n🤖 Local-only mode (placeholder)") 187 # For now, just use cloud since local is unreliable 188 result = await self.call_openrouter(query) 189 190 else: # auto 191 print("\n⚡ Auto mode (OpenRouter → Groq)") 192 result = await self.call_openrouter(query) 193 if not result["success"]: 194 print(" ⚠️ OpenRouter failed, trying Groq...") 195 result = await self.call_groq(query) 196 197 elapsed = time.time() - start_time 198 199 if result and result["success"]: 200 return { 201 "success": True, 202 "query": query, 203 "mode": mode, 204 "response": result["response"], 205 "source": result["api"], 206 "model": result.get("model", "mistral-7b-instruct"), 207 "tokens": result.get("tokens", 0), 208 "time": elapsed, 209 "timestamp": datetime.now().isoformat() 210 } 211 else: 212 error_msg = result["error"] if result else "Unknown error" 213 return { 214 "success": False, 215 "query": query, 216 "mode": mode, 217 "error": error_msg, 218 "time": elapsed, 219 "timestamp": datetime.now().isoformat() 220 } 221 222 async def main(): 223 if len(sys.argv) < 2: 224 print("Usage: python ai_orchestrator_final.py \"Your query\"") 225 print(" python ai_orchestrator_final.py --mode [auto|cloud|local] \"Query\"") 226 sys.exit(1) 227 228 # Parse arguments 229 mode = "auto" 230 query_start = 1 231 232 if sys.argv[1] == "--mode" and len(sys.argv) > 3: 233 mode = sys.argv[2] 234 query_start = 3 235 236 query = " ".join(sys.argv[query_start:]) 237 238 orchestrator = FinalAIOrchestrator() 239 result = await orchestrator.orchestrate(query, mode) 240 241 print("\n" + "="*60) 242 if result["success"]: 243 print("✅ SUCCESS") 244 print("="*60) 245 print(f"Source: {result['source']}") 246 print(f"Time: {result['time']:.1f}s") 247 248 print("\n🎯 ANSWER:") 249 print("="*60) 250 print(result['response']) 251 print("="*60) 252 253 # Save result 254 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 255 filename = f"ai_result_{timestamp}.json" 256 with open(filename, "w") as f: 257 json.dump(result, f, indent=2) 258 print(f"\n💾 Saved to: {filename}") 259 else: 260 print("❌ FAILED") 261 print("="*60) 262 print(f"Error: {result['error']}") 263 print(f"Mode: {result['mode']}") 264 print(f"Time: {result['time']:.1f}s") 265 266 if __name__ == "__main__": 267 asyncio.run(main())