/ ultimate_ai_system.py
ultimate_ai_system.py
1 #!/usr/bin/env python3 2 """ 3 ULTIMATE AI SYSTEM 4 Combines: Cloud APIs + Local models + Response synthesis + Task breakdown 5 """ 6 7 import os 8 import sys 9 import asyncio 10 import aiohttp 11 import json 12 import time 13 import subprocess 14 from datetime import datetime 15 from typing import List, Dict, Optional, Tuple 16 import re 17 18 # ==================== CONFIGURATION ==================== 19 API_KEYS = { 20 'OPENROUTER_API_KEY': "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", 21 'GROQ_API_KEY': "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" 22 } 23 24 # Set environment for subprocesses 25 os.environ.update(API_KEYS) 26 27 # ==================== TASK ANALYZER ==================== 28 class TaskAnalyzer: 29 @staticmethod 30 def analyze_complexity(query: str) -> Tuple[int, List[str]]: 31 """Analyze query complexity and break into subtasks if needed""" 32 word_count = len(query.split()) 33 char_count = len(query) 34 35 # Simple: under 50 words, 1 subtask 36 if word_count < 50: 37 return 1, [query] 38 39 # Medium: 50-150 words, break into 2-3 parts 40 elif word_count < 150: 41 # Break by sentences 42 sentences = re.split(r'[.!?]+', query) 43 sentences = [s.strip() for s in sentences if len(s.strip()) > 10] 44 45 if len(sentences) <= 2: 46 return 2, [query] 47 else: 48 # Group sentences into 2-3 subtasks 49 groups = [] 50 current = "" 51 target_length = len(query) // min(3, len(sentences)) 52 53 for sentence in sentences: 54 if len(current) + len(sentence) < target_length or len(groups) >= 3: 55 current += sentence + ". " 56 else: 57 if current: 58 groups.append(current.strip()) 59 current = sentence + ". " 60 61 if current: 62 groups.append(current.strip()) 63 64 return min(3, len(groups)), groups[:3] 65 66 # Complex: over 150 words, break into 3-4 parts 67 else: 68 # Break by paragraphs or major sections 69 paragraphs = query.split('\n\n') 70 if len(paragraphs) > 1: 71 return min(4, len(paragraphs)), paragraphs[:4] 72 else: 73 # Break by "and", "or", "but" 74 parts = re.split(r'\s+(?:and|or|but)\s+', query, flags=re.IGNORECASE) 75 return min(4, len(parts)), parts[:4] 76 77 # ==================== CLOUD PROCESSOR ==================== 78 class CloudProcessor: 79 def __init__(self): 80 self.apis = { 81 "openrouter": { 82 "url": "https://openrouter.ai/api/v1/chat/completions", 83 "models": ["mistralai/mistral-7b-instruct:free", "google/gemma-7b-it:free"] 84 } 85 } 86 87 async def process_subtask(self, subtask: str, api_priority: List[str] = None) -> Optional[Dict]: 88 """Process a subtask using cloud APIs""" 89 if api_priority is None: 90 api_priority = ["openrouter"] # Default to OpenRouter since it works 91 92 for api_name in api_priority: 93 if api_name in self.apis: 94 config = self.apis[api_name] 95 headers = { 96 "Authorization": f"Bearer {API_KEYS.get(f'{api_name.upper()}_API_KEY', '')}", 97 "Content-Type": "application/json" 98 } 99 100 for model in config["models"]: 101 data = { 102 "model": model, 103 "messages": [{"role": "user", "content": subtask}], 104 "max_tokens": 400, 105 "temperature": 0.7 106 } 107 108 try: 109 timeout = aiohttp.ClientTimeout(total=20) 110 async with aiohttp.ClientSession(timeout=timeout) as session: 111 async with session.post(config["url"], headers=headers, json=data) as response: 112 if response.status == 200: 113 result = await response.json() 114 return { 115 "api": api_name, 116 "model": model, 117 "content": result["choices"][0]["message"]["content"], 118 "tokens": result.get("usage", {}).get("total_tokens", 0) 119 } 120 except: 121 continue 122 123 return None 124 125 # ==================== LOCAL PROCESSOR ==================== 126 class LocalProcessor: 127 @staticmethod 128 def process_with_model(query: str, model: str = "tinyllama") -> Optional[str]: 129 """Process query using local model""" 130 try: 131 cmd = ["python3", "ultimate_orchestrator.py", model, query] 132 result = subprocess.run( 133 cmd, 134 capture_output=True, 135 text=True, 136 timeout=180 137 ) 138 139 if result.returncode == 0: 140 # Extract answer from output 141 lines = result.stdout.split('\n') 142 for i, line in enumerate(lines): 143 if "FINAL ANSWER" in line or "šÆ FINAL ANSWER" in line: 144 # Find the answer section 145 answer_lines = [] 146 for j in range(i+1, min(i+20, len(lines))): 147 if lines[j].strip() and "====" not in lines[j]: 148 answer_lines.append(lines[j].strip()) 149 elif len(answer_lines) > 0: 150 break 151 if answer_lines: 152 return '\n'.join(answer_lines) 153 # Fallback: last meaningful line 154 for line in reversed(lines): 155 if line.strip() and len(line.strip()) > 20 and "====" not in line: 156 return line.strip() 157 except: 158 pass 159 return None 160 161 # ==================== RESPONSE SYNTHESIZER ==================== 162 class ResponseSynthesizer: 163 @staticmethod 164 def synthesize(responses: List[Dict], original_query: str) -> str: 165 """Synthesize multiple responses into a coherent answer""" 166 if not responses: 167 return "Unable to generate a response. Please try again or rephrase your query." 168 169 if len(responses) == 1: 170 return responses[0]["content"] 171 172 # Multiple responses - create a comprehensive answer 173 synthesis = f"# Answer to: {original_query[:100]}...\n\n" 174 synthesis += f"*Generated from {len(responses)} sources*\n\n" 175 176 # Add each response as a section 177 for i, resp in enumerate(responses, 1): 178 source = f"{resp.get('api', 'unknown').upper()}" 179 if resp.get('model'): 180 source += f" ({resp['model']})" 181 182 synthesis += f"## Source {i}: {source}\n\n" 183 synthesis += f"{resp['content'][:500]}...\n\n" 184 185 # Add executive summary 186 synthesis += "## Executive Summary\n\n" 187 # Take key points from first response 188 first_response = responses[0]["content"] 189 sentences = first_response.split('. ') 190 summary = '. '.join(sentences[:3]) + '.' if len(sentences) > 3 else first_response[:300] 191 synthesis += summary 192 193 return synthesis 194 195 # ==================== MAIN ORCHESTRATOR ==================== 196 class UltimateAIOrchestrator: 197 def __init__(self): 198 self.task_analyzer = TaskAnalyzer() 199 self.cloud_processor = CloudProcessor() 200 self.local_processor = LocalProcessor() 201 self.synthesizer = ResponseSynthesizer() 202 203 async def orchestrate(self, query: str, strategy: str = "auto") -> Dict: 204 """Main orchestration method""" 205 print(f"\nš¤ ULTIMATE AI ORCHESTRATION") 206 print("="*60) 207 print(f"Query: {query[:80]}...") 208 print(f"Strategy: {strategy}") 209 print("="*60) 210 211 start_time = time.time() 212 213 # Step 1: Analyze task 214 print("\nš Step 1: Analyzing task complexity...") 215 complexity, subtasks = self.task_analyzer.analyze_complexity(query) 216 print(f" Complexity: {complexity}/4") 217 print(f" Subtasks: {len(subtasks)}") 218 219 # Step 2: Determine processing strategy 220 if strategy == "cloud": 221 processor = "cloud" 222 elif strategy == "local": 223 processor = "local" 224 else: # auto 225 # Decide based on complexity and subtask count 226 if complexity <= 2 and len(subtasks) == 1: 227 processor = "cloud" # Simple tasks to cloud 228 else: 229 processor = "hybrid" # Complex tasks use both 230 231 print(f"\nš Step 2: Selected processor: {processor}") 232 233 # Step 3: Process subtasks 234 print(f"\nā” Step 3: Processing {len(subtasks)} subtask(s)...") 235 236 responses = [] 237 238 if processor in ["cloud", "hybrid"]: 239 # Try cloud first 240 cloud_tasks = [] 241 for subtask in subtasks: 242 task = self.cloud_processor.process_subtask(subtask) 243 cloud_tasks.append(task) 244 245 cloud_results = await asyncio.gather(*cloud_tasks) 246 247 for i, result in enumerate(cloud_results): 248 if result: 249 print(f" ā Subtask {i+1}: Cloud success") 250 responses.append(result) 251 elif processor == "hybrid": 252 # Fallback to local 253 print(f" ā ļø Subtask {i+1}: Cloud failed, trying local...") 254 for model in ["tinyllama", "llama2", "mistral"]: 255 local_result = self.local_processor.process_with_model(subtasks[i], model) 256 if local_result: 257 responses.append({ 258 "api": "local", 259 "model": model, 260 "content": local_result, 261 "tokens": 0 262 }) 263 print(f" ā Subtask {i+1}: Local success ({model})") 264 break 265 266 elif processor == "local": 267 # Local only 268 for i, subtask in enumerate(subtasks): 269 for model in ["mistral", "llama2", "tinyllama"]: # Try best first 270 result = self.local_processor.process_with_model(subtask, model) 271 if result: 272 responses.append({ 273 "api": "local", 274 "model": model, 275 "content": result, 276 "tokens": 0 277 }) 278 print(f" ā Subtask {i+1}: Local success ({model})") 279 break 280 281 # Step 4: Synthesize 282 print("\nš§© Step 4: Synthesizing final answer...") 283 final_answer = self.synthesizer.synthesize(responses, query) 284 285 elapsed = time.time() - start_time 286 287 # Compile results 288 result = { 289 "query": query, 290 "strategy": strategy, 291 "complexity": complexity, 292 "subtasks": subtasks, 293 "responses": responses, 294 "final_answer": final_answer, 295 "apis_used": list(set(r["api"] for r in responses)), 296 "models_used": list(set(r.get("model", "unknown") for r in responses)), 297 "response_count": len(responses), 298 "time_elapsed": elapsed, 299 "timestamp": datetime.now().isoformat() 300 } 301 302 return result 303 304 # ==================== MAIN ==================== 305 async def main(): 306 if len(sys.argv) < 2: 307 print("Usage: python ultimate_ai_system.py \"Your query\"") 308 print(" python ultimate_ai_system.py --strategy [auto|cloud|local] \"Query\"") 309 sys.exit(1) 310 311 # Parse arguments 312 strategy = "auto" 313 query_start = 1 314 315 if sys.argv[1] == "--strategy" and len(sys.argv) > 3: 316 strategy = sys.argv[2] 317 query_start = 3 318 elif sys.argv[1].startswith("--"): 319 print(f"Unknown option: {sys.argv[1]}") 320 sys.exit(1) 321 322 query = " ".join(sys.argv[query_start:]) 323 324 # Run orchestration 325 orchestrator = UltimateAIOrchestrator() 326 result = await orchestrator.orchestrate(query, strategy) 327 328 # Display results 329 print("\n" + "="*60) 330 print("ā ORCHESTRATION COMPLETE") 331 print("="*60) 332 print(f"Query: {result['query'][:80]}...") 333 print(f"Strategy: {result['strategy']}") 334 print(f"Complexity: {result['complexity']}/4") 335 print(f"Subtasks: {len(result['subtasks'])}") 336 print(f"Responses: {result['response_count']}") 337 print(f"APIs: {', '.join(result['apis_used']) if result['apis_used'] else 'None'}") 338 print(f"Time: {result['time_elapsed']:.1f}s") 339 340 print("\nšÆ FINAL ANSWER:") 341 print("="*60) 342 print(result['final_answer']) 343 print("="*60) 344 345 # Save results 346 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 347 filename = f"ultimate_result_{timestamp}" 348 349 # JSON 350 with open(f"{filename}.json", "w") as f: 351 json.dump(result, f, indent=2) 352 353 # Text 354 with open(f"{filename}.txt", "w") as f: 355 f.write(f"QUERY: {result['query']}\n") 356 f.write(f"TIME: {result['time_elapsed']:.1f}s\n") 357 f.write(f"STRATEGY: {result['strategy']}\n") 358 f.write(f"COMPLEXITY: {result['complexity']}/4\n") 359 f.write(f"RESPONSES: {result['response_count']}\n") 360 f.write(f"APIS: {', '.join(result['apis_used'])}\n") 361 f.write("\n" + "="*60 + "\n") 362 f.write("ANSWER:\n") 363 f.write("="*60 + "\n") 364 f.write(result['final_answer']) 365 366 print(f"\nš¾ Results saved:") 367 print(f" š JSON: {filename}.json") 368 print(f" š Text: {filename}.txt") 369 print("="*60) 370 371 if __name__ == "__main__": 372 asyncio.run(main())