/ hybrid_orchestrator_v2.py
hybrid_orchestrator_v2.py
  1  #!/usr/bin/env python3
  2  """
  3  IMPROVED HYBRID ORCHESTRATOR
  4  Better response cleaning and error handling
  5  """
  6  
  7  import os
  8  import sys
  9  import asyncio
 10  import aiohttp
 11  import json
 12  import subprocess
 13  import time
 14  import re
 15  from datetime import datetime
 16  
 17  def clean_ai_response(text: str) -> str:
 18      """Clean and normalize AI responses"""
 19      if not text or not isinstance(text, str):
 20          return "No valid response generated."
 21      
 22      # Remove common AI formatting tags
 23      tags_to_remove = [
 24          r'<s>', r'</s>', r'\[OUT\]', r'\[/OUT\]', r'\[/s\]',
 25          r'<\|.*?\|>', r'\[.*?\]', r'\(.*?\)'
 26      ]
 27      
 28      cleaned = text
 29      for pattern in tags_to_remove:
 30          cleaned = re.sub(pattern, '', cleaned)
 31      
 32      # Remove extra whitespace
 33      cleaned = re.sub(r'\s+', ' ', cleaned).strip()
 34      
 35      # Capitalize first letter if needed
 36      if cleaned and cleaned[0].islower():
 37          cleaned = cleaned[0].upper() + cleaned[1:]
 38      
 39      # Ensure it ends with proper punctuation
 40      if cleaned and cleaned[-1] not in '.!?':
 41          cleaned += '.'
 42      
 43      return cleaned if len(cleaned) > 10 else "Response was too brief. Please try rephrasing your question."
 44  
 45  class ImprovedHybridOrchestrator:
 46      def __init__(self):
 47          self.openrouter_key = os.getenv('OPENROUTER_API_KEY')
 48          self.groq_key = os.getenv('GROQ_API_KEY')
 49          
 50          # Updated Groq models based on deprecation info
 51          self.groq_models = [
 52              "llama-3.3-70b-versatile",
 53              "llama-3.2-90b-vision-preview",
 54              "gemma2-9b-it",
 55              "mixtral-8x7b-32768"  # Keep as last fallback
 56          ]
 57      
 58      async def call_openrouter(self, query: str) -> dict:
 59          """Call OpenRouter with better prompting"""
 60          url = "https://openrouter.ai/api/v1/chat/completions"
 61          headers = {
 62              "Authorization": f"Bearer {self.openrouter_key}",
 63              "Content-Type": "application/json",
 64              "HTTP-Referer": "https://github.com/your-repo",  # Optional but good practice
 65              "X-Title": "AI Orchestrator"
 66          }
 67          
 68          # Better system prompt for cleaner responses
 69          messages = [
 70              {"role": "system", "content": "You are a helpful AI assistant. Provide clear, concise answers without markdown formatting, XML tags, or special tokens. Just give the plain text answer."},
 71              {"role": "user", "content": query}
 72          ]
 73          
 74          data = {
 75              "model": "mistralai/mistral-7b-instruct:free",
 76              "messages": messages,
 77              "max_tokens": 300,
 78              "temperature": 0.7
 79          }
 80          
 81          try:
 82              timeout = aiohttp.ClientTimeout(total=15)
 83              async with aiohttp.ClientSession(timeout=timeout) as session:
 84                  async with session.post(url, headers=headers, json=data) as response:
 85                      if response.status == 200:
 86                          result = await response.json()
 87                          raw_text = result['choices'][0]['message']['content']
 88                          cleaned_text = clean_ai_response(raw_text)
 89                          
 90                          return {
 91                              "success": True,
 92                              "api": "openrouter",
 93                              "raw_response": raw_text,
 94                              "response": cleaned_text,
 95                              "tokens": result.get('usage', {}).get('total_tokens', 0)
 96                          }
 97                      else:
 98                          error_text = await response.text()
 99                          return {"success": False, "error": f"HTTP {response.status}: {error_text[:100]}"}
100          except Exception as e:
101              return {"success": False, "error": str(e)[:100]}
102      
103      async def call_groq(self, query: str) -> dict:
104          """Call Groq API with updated models"""
105          url = "https://api.groq.com/openai/v1/chat/completions"
106          headers = {
107              "Authorization": f"Bearer {self.groq_key}",
108              "Content-Type": "application/json"
109          }
110          
111          for model in self.groq_models:
112              messages = [
113                  {"role": "system", "content": "Provide clear, direct answers without special formatting or tokens."},
114                  {"role": "user", "content": query}
115              ]
116              
117              data = {
118                  "model": model,
119                  "messages": messages,
120                  "max_tokens": 300,
121                  "temperature": 0.7
122              }
123              
124              try:
125                  timeout = aiohttp.ClientTimeout(total=15)
126                  async with aiohttp.ClientSession(timeout=timeout) as session:
127                      async with session.post(url, headers=headers, json=data) as response:
128                          if response.status == 200:
129                              result = await response.json()
130                              raw_text = result['choices'][0]['message']['content']
131                              cleaned_text = clean_ai_response(raw_text)
132                              
133                              return {
134                                  "success": True,
135                                  "api": "groq",
136                                  "model": model,
137                                  "raw_response": raw_text,
138                                  "response": cleaned_text,
139                                  "tokens": result.get('usage', {}).get('total_tokens', 0)
140                              }
141                          elif response.status == 400:
142                              # Try next model
143                              continue
144              except:
145                  continue
146          
147          return {"success": False, "error": "All Groq models failed"}
148      
149      def call_local(self, query: str, model: str = "tinyllama") -> dict:
150          """Call local model with improved parsing"""
151          try:
152              cmd = ["python3", "ultimate_orchestrator.py", model, query]
153              result = subprocess.run(
154                  cmd,
155                  capture_output=True,
156                  text=True,
157                  timeout=180
158              )
159              
160              if result.returncode == 0:
161                  # Improved answer extraction
162                  lines = result.stdout.split('\n')
163                  answer_lines = []
164                  capture = False
165                  
166                  for line in lines:
167                      if "FINAL ANSWER" in line or "šŸŽÆ FINAL ANSWER" in line:
168                          capture = True
169                          continue
170                      if capture and ("====" in line or "ORCHESTRATION COMPLETE" in line):
171                          break
172                      if capture and line.strip():
173                          answer_lines.append(line.strip())
174                  
175                  raw_text = ' '.join(answer_lines) if answer_lines else result.stdout[-500:]
176                  cleaned_text = clean_ai_response(raw_text)
177                  
178                  return {
179                      "success": True,
180                      "api": "local",
181                      "model": model,
182                      "raw_response": raw_text,
183                      "response": cleaned_text,
184                      "tokens": 0
185                  }
186              else:
187                  return {"success": False, "error": f"Exit code {result.returncode}"}
188          except subprocess.TimeoutExpired:
189              return {"success": False, "error": "Timeout (3 minutes)"}
190          except Exception as e:
191              return {"success": False, "error": str(e)}
192      
193      async def orchestrate(self, query: str, strategy: str = "auto") -> dict:
194          """Improved orchestration with strategy selection"""
195          print(f"\nšŸ¤– IMPROVED HYBRID ORCHESTRATOR")
196          print("="*60)
197          print(f"Query: {query[:80]}...")
198          print(f"Strategy: {strategy}")
199          print("="*60)
200          
201          start_time = time.time()
202          results = []
203          
204          if strategy == "local":
205              print("\nšŸ”§ Local-only strategy")
206              for model in ["mistral", "llama2", "tinyllama"]:
207                  print(f"  Trying {model}...")
208                  result = self.call_local(query, model)
209                  if result["success"]:
210                      results.append(result)
211                      print(f"  āœ… {model}: Success")
212                      break
213                  else:
214                      print(f"  āŒ {model}: {result['error'][:30]}")
215          
216          elif strategy == "cloud":
217              print("\nā˜ļø  Cloud-only strategy")
218              cloud_tasks = []
219              cloud_tasks.append(self.call_openrouter(query))
220              cloud_tasks.append(self.call_groq(query))
221              
222              cloud_results = await asyncio.gather(*cloud_tasks)
223              
224              for result in cloud_results:
225                  if result["success"]:
226                      results.append(result)
227                      print(f"  āœ… {result['api']}: Success")
228              
229              if not results:
230                  print("  āš ļø  All cloud APIs failed")
231          
232          else:  # auto (hybrid)
233              print("\nšŸ¤ Auto strategy (Cloud first → Local fallback)")
234              
235              # Try cloud first
236              cloud_tasks = []
237              cloud_tasks.append(self.call_openrouter(query))
238              cloud_tasks.append(self.call_groq(query))
239              
240              cloud_results = await asyncio.gather(*cloud_tasks)
241              cloud_success = False
242              
243              for result in cloud_results:
244                  if result["success"]:
245                      results.append(result)
246                      cloud_success = True
247                      print(f"  ā˜ļø  {result['api']}: Success")
248              
249              if not cloud_success:
250                  print("  āš ļø  Cloud APIs failed, trying local...")
251                  for model in ["tinyllama", "llama2", "mistral"]:
252                      print(f"    šŸ¤– Trying {model}...")
253                      result = self.call_local(query, model)
254                      if result["success"]:
255                          results.append(result)
256                          print(f"    āœ… {model}: Success")
257                          break
258          
259          # Process results
260          elapsed = time.time() - start_time
261          
262          if results:
263              best_result = results[0]
264              return {
265                  "success": True,
266                  "query": query,
267                  "strategy": strategy,
268                  "response": best_result["response"],
269                  "raw_response": best_result.get("raw_response", ""),
270                  "source": best_result["api"],
271                  "model": best_result.get("model", ""),
272                  "tokens": best_result.get("tokens", 0),
273                  "time": elapsed,
274                  "responses_tried": len(results),
275                  "timestamp": datetime.now().isoformat()
276              }
277          else:
278              return {
279                  "success": False,
280                  "query": query,
281                  "strategy": strategy,
282                  "error": "All orchestration methods failed",
283                  "time": elapsed,
284                  "timestamp": datetime.now().isoformat()
285              }
286  
287  async def main():
288      if len(sys.argv) < 2:
289          print("Usage: python hybrid_orchestrator_v2.py \"Your query\"")
290          print("       python hybrid_orchestrator_v2.py --strategy [auto|cloud|local] \"Query\"")
291          sys.exit(1)
292      
293      # Parse arguments
294      strategy = "auto"
295      query_start = 1
296      
297      if sys.argv[1] == "--strategy" and len(sys.argv) > 3:
298          strategy = sys.argv[2]
299          query_start = 3
300      
301      query = " ".join(sys.argv[query_start:])
302      
303      orchestrator = ImprovedHybridOrchestrator()
304      result = await orchestrator.orchestrate(query, strategy)
305      
306      print("\n" + "="*60)
307      if result["success"]:
308          print("āœ… ORCHESTRATION SUCCESSFUL")
309          print("="*60)
310          print(f"Strategy: {result['strategy']}")
311          print(f"Source: {result['source']}")
312          if result.get('model'):
313              print(f"Model: {result['model']}")
314          print(f"Time: {result['time']:.1f}s")
315          print(f"Responses tried: {result['responses_tried']}")
316          if result.get('tokens', 0) > 0:
317              print(f"Tokens: {result['tokens']}")
318          
319          print("\nšŸŽÆ ANSWER:")
320          print("="*60)
321          print(result['response'])
322          print("="*60)
323          
324          # Save both raw and cleaned
325          timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
326          filename = f"improved_result_{timestamp}.json"
327          with open(filename, "w") as f:
328              json.dump(result, f, indent=2)
329          print(f"\nšŸ’¾ Saved to: {filename}")
330      else:
331          print("āŒ ORCHESTRATION FAILED")
332          print("="*60)
333          print(f"Error: {result['error']}")
334          print(f"Strategy: {result['strategy']}")
335          print(f"Time: {result['time']:.1f}s")
336  
337  if __name__ == "__main__":
338      asyncio.run(main())