/ hybrid_orchestrator.py
hybrid_orchestrator.py
  1  #!/usr/bin/env python3
  2  """
  3  HYBRID AI ORCHESTRATOR - Cloud first, local fallback
  4  """
  5  
  6  import os
  7  import sys
  8  import asyncio
  9  import aiohttp
 10  import json
 11  import subprocess
 12  import time
 13  from datetime import datetime
 14  
 15  class HybridOrchestrator:
 16      def __init__(self):
 17          # Set API keys from environment
 18          self.openrouter_key = os.getenv('OPENROUTER_API_KEY')
 19          self.groq_key = os.getenv('GROQ_API_KEY')
 20          
 21          self.cloud_apis = []
 22          if self.openrouter_key:
 23              self.cloud_apis.append('openrouter')
 24          if self.groq_key:
 25              self.cloud_apis.append('groq')
 26      
 27      async def call_openrouter(self, query: str) -> dict:
 28          """Call OpenRouter API"""
 29          url = "https://openrouter.ai/api/v1/chat/completions"
 30          headers = {
 31              "Authorization": f"Bearer {self.openrouter_key}",
 32              "Content-Type": "application/json"
 33          }
 34          data = {
 35              "model": "mistralai/mistral-7b-instruct:free",
 36              "messages": [{"role": "user", "content": query}],
 37              "max_tokens": 500
 38          }
 39          
 40          try:
 41              timeout = aiohttp.ClientTimeout(total=15)
 42              async with aiohttp.ClientSession(timeout=timeout) as session:
 43                  async with session.post(url, headers=headers, json=data) as response:
 44                      if response.status == 200:
 45                          result = await response.json()
 46                          return {
 47                              "success": True,
 48                              "api": "openrouter",
 49                              "response": result['choices'][0]['message']['content'],
 50                              "tokens": result.get('usage', {}).get('total_tokens', 0)
 51                          }
 52                      else:
 53                          return {"success": False, "error": f"HTTP {response.status}"}
 54          except Exception as e:
 55              return {"success": False, "error": str(e)}
 56      
 57      async def call_groq(self, query: str) -> dict:
 58          """Call Groq API (with updated model)"""
 59          url = "https://api.groq.com/openai/v1/chat/completions"
 60          headers = {
 61              "Authorization": f"Bearer {self.groq_key}",
 62              "Content-Type": "application/json"
 63          }
 64          # Try updated models based on deprecation info
 65          models_to_try = ["llama-3.3-70b-versatile", "llama-3.2-11b-vision-preview", "gemma2-9b-it"]
 66          
 67          for model in models_to_try:
 68              data = {
 69                  "model": model,
 70                  "messages": [{"role": "user", "content": query}],
 71                  "max_tokens": 500,
 72                  "temperature": 0.7
 73              }
 74              
 75              try:
 76                  timeout = aiohttp.ClientTimeout(total=15)
 77                  async with aiohttp.ClientSession(timeout=timeout) as session:
 78                      async with session.post(url, headers=headers, json=data) as response:
 79                          if response.status == 200:
 80                              result = await response.json()
 81                              return {
 82                                  "success": True,
 83                                  "api": "groq",
 84                                  "model": model,
 85                                  "response": result['choices'][0]['message']['content'],
 86                                  "tokens": result.get('usage', {}).get('total_tokens', 0)
 87                              }
 88                          elif response.status == 400:
 89                              # Try next model
 90                              continue
 91              except:
 92                  continue
 93          
 94          return {"success": False, "error": "All models failed"}
 95      
 96      def call_local(self, query: str, model: str = "tinyllama") -> dict:
 97          """Call local model"""
 98          try:
 99              cmd = ["python3", "ultimate_orchestrator.py", model, query]
100              result = subprocess.run(
101                  cmd,
102                  capture_output=True,
103                  text=True,
104                  timeout=120
105              )
106              
107              if result.returncode == 0:
108                  # Extract answer
109                  lines = result.stdout.split('\n')
110                  answer = ""
111                  in_answer = False
112                  
113                  for line in lines:
114                      if "FINAL ANSWER" in line or "šŸŽÆ FINAL ANSWER" in line:
115                          in_answer = True
116                          continue
117                      if in_answer and line.strip() and "====" not in line:
118                          answer += line.strip() + "\n"
119                      if in_answer and "====" in line and answer:
120                          break
121                  
122                  if not answer:
123                      # Fallback: get last meaningful content
124                      for line in reversed(lines):
125                          if line.strip() and len(line.strip()) > 20 and "====" not in line:
126                              answer = line.strip()
127                              break
128                  
129                  return {
130                      "success": True,
131                      "api": "local",
132                      "model": model,
133                      "response": answer.strip(),
134                      "tokens": 0
135                  }
136              else:
137                  return {"success": False, "error": f"Exit code {result.returncode}"}
138          except subprocess.TimeoutExpired:
139              return {"success": False, "error": "Timeout"}
140          except Exception as e:
141              return {"success": False, "error": str(e)}
142      
143      async def orchestrate(self, query: str, force_local: bool = False) -> dict:
144          """Main orchestration method"""
145          print(f"\nšŸ¤– HYBRID ORCHESTRATOR")
146          print("="*60)
147          print(f"Query: {query[:80]}...")
148          print("="*60)
149          
150          start_time = time.time()
151          results = []
152          
153          if force_local:
154              print("\nšŸ”§ Forced local mode")
155              # Try local models in order of quality
156              for model in ["mistral", "llama2", "tinyllama"]:
157                  print(f"  Trying {model}...")
158                  result = self.call_local(query, model)
159                  if result["success"]:
160                      results.append(result)
161                      print(f"  āœ… {model} succeeded")
162                      break
163                  else:
164                      print(f"  āŒ {model} failed: {result['error'][:30]}")
165          else:
166              print("\nā˜ļø  Trying cloud APIs...")
167              # Try cloud APIs in parallel
168              cloud_tasks = []
169              if 'openrouter' in self.cloud_apis:
170                  cloud_tasks.append(self.call_openrouter(query))
171              if 'groq' in self.cloud_apis:
172                  cloud_tasks.append(self.call_groq(query))
173              
174              if cloud_tasks:
175                  cloud_results = await asyncio.gather(*cloud_tasks)
176                  cloud_success = False
177                  
178                  for result in cloud_results:
179                      if result["success"]:
180                          results.append(result)
181                          cloud_success = True
182                          print(f"  āœ… {result['api']} succeeded")
183                  
184                  if not cloud_success:
185                      print("  āš ļø  All cloud APIs failed, falling back to local...")
186                      # Fallback to local
187                      for model in ["tinyllama", "llama2", "mistral"]:
188                          print(f"    Trying {model}...")
189                          result = self.call_local(query, model)
190                          if result["success"]:
191                              results.append(result)
192                              print(f"    āœ… {model} succeeded")
193                              break
194              else:
195                  print("  āš ļø  No cloud APIs configured, using local...")
196                  result = self.call_local(query, "tinyllama")
197                  if result["success"]:
198                      results.append(result)
199          
200          # Process results
201          elapsed = time.time() - start_time
202          
203          if results:
204              best_result = results[0]  # First successful result
205              return {
206                  "success": True,
207                  "query": query,
208                  "response": best_result["response"],
209                  "source": best_result["api"],
210                  "model": best_result.get("model", ""),
211                  "tokens": best_result.get("tokens", 0),
212                  "time": elapsed,
213                  "timestamp": datetime.now().isoformat()
214              }
215          else:
216              return {
217                  "success": False,
218                  "query": query,
219                  "error": "All orchestration methods failed",
220                  "time": elapsed,
221                  "timestamp": datetime.now().isoformat()
222              }
223  
224  async def main():
225      if len(sys.argv) < 2:
226          print("Usage: python hybrid_orchestrator.py \"Your query\"")
227          print("       python hybrid_orchestrator.py --local \"Query\"")
228          sys.exit(1)
229      
230      force_local = False
231      query_start = 1
232      
233      if sys.argv[1] == "--local":
234          force_local = True
235          query_start = 2
236      
237      query = " ".join(sys.argv[query_start:])
238      
239      orchestrator = HybridOrchestrator()
240      result = await orchestrator.orchestrate(query, force_local)
241      
242      print("\n" + "="*60)
243      if result["success"]:
244          print("āœ… ORCHESTRATION SUCCESSFUL")
245          print("="*60)
246          print(f"Source: {result['source']}")
247          if result.get('model'):
248              print(f"Model: {result['model']}")
249          print(f"Time: {result['time']:.1f}s")
250          if result.get('tokens', 0) > 0:
251              print(f"Tokens: {result['tokens']}")
252          
253          print("\nšŸŽÆ ANSWER:")
254          print("="*60)
255          print(result['response'])
256          print("="*60)
257          
258          # Save result
259          timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
260          filename = f"hybrid_result_{timestamp}.json"
261          with open(filename, "w") as f:
262              json.dump(result, f, indent=2)
263          print(f"\nšŸ’¾ Saved to: {filename}")
264      else:
265          print("āŒ ORCHESTRATION FAILED")
266          print("="*60)
267          print(f"Error: {result['error']}")
268  
269  if __name__ == "__main__":
270      asyncio.run(main())