/ final_ai_orchestrator.py
final_ai_orchestrator.py
1 #!/usr/bin/env python3 2 """ 3 FINAL OPTIMIZED AI ORCHESTRATOR 4 Google Gemini + Groq - Both Working 5 """ 6 7 import json 8 import urllib.request 9 import urllib.error 10 import asyncio 11 import sys 12 import re 13 from datetime import datetime 14 15 # ========== API CONFIGURATION ========== 16 class APIConfig: 17 GOOGLE_KEY = "AIzaSyD_4aAx2tnLIgu7XUOmleCbYhHtKgdHl0" 18 GROQ_KEY = "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" 19 20 # Working models (tested) 21 GOOGLE_MODEL = "gemini-2.5-flash" # 100% working 22 GROQ_MODEL = "llama-3.3-70b-versatile" # Working (llama3-8b-8192 deprecated) 23 24 @staticmethod 25 def validate_keys(): 26 """Check if API keys are properly formatted""" 27 issues = [] 28 if not APIConfig.GOOGLE_KEY.startswith("AIza"): 29 issues.append("Google key format incorrect") 30 if not APIConfig.GROQ_KEY.startswith("gsk_"): 31 issues.append("Groq key format incorrect") 32 return issues 33 34 class PrivacyEngine: 35 """Advanced privacy protection""" 36 37 def __init__(self): 38 self.masking_patterns = [ 39 (r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]'), 40 (r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', '[IP_ADDRESS]'), 41 (r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]'), 42 (r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]'), 43 (r'\b[A-Z]{2,10}-\d{3,5}\b', '[ID_NUMBER]'), 44 ] 45 46 def mask_sensitive_data(self, text): 47 """Mask PII before sending to cloud""" 48 masked = text 49 for pattern, replacement in self.masking_patterns: 50 masked = re.sub(pattern, replacement, masked, flags=re.IGNORECASE) 51 return masked 52 53 def unmask_data(self, text, original_context=None): 54 """Restore masked data (if needed)""" 55 # For now, just return as-is since we don't store mapping 56 return text 57 58 class AIOrchestrator: 59 """Main orchestrator with error handling and retries""" 60 61 def __init__(self): 62 self.privacy = PrivacyEngine() 63 self.stats = { 64 "google_calls": 0, 65 "groq_calls": 0, 66 "successful_calls": 0, 67 "failed_calls": 0 68 } 69 70 print("=" * 60) 71 print("š¤ FINAL AI ORCHESTRATOR v4.0") 72 print("=" * 60) 73 print("ā APIs Configured:") 74 print(f" ⢠Google Gemini 2.5 Flash (1M tokens free)") 75 print(f" ⢠Groq Llama 3.3 70B (fast & free)") 76 print("=" * 60) 77 78 # Validate keys 79 key_issues = APIConfig.validate_keys() 80 if key_issues: 81 print("ā ļø Key warnings:") 82 for issue in key_issues: 83 print(f" ⢠{issue}") 84 85 async def query_google_gemini(self, prompt, max_tokens=400): 86 """Query Google Gemini with retry logic""" 87 self.stats["google_calls"] += 1 88 89 url = f"https://generativelanguage.googleapis.com/v1beta/models/{APIConfig.GOOGLE_MODEL}:generateContent?key={APIConfig.GOOGLE_KEY}" 90 91 payload = { 92 "contents": [{"parts": [{"text": prompt}]}], 93 "generationConfig": { 94 "maxOutputTokens": max_tokens, 95 "temperature": 0.7, 96 "topP": 0.8 97 }, 98 "safetySettings": [ 99 { 100 "category": "HARM_CATEGORY_DANGEROUS_CONTENT", 101 "threshold": "BLOCK_MEDIUM_AND_ABOVE" 102 } 103 ] 104 } 105 106 try: 107 data = json.dumps(payload).encode('utf-8') 108 req = urllib.request.Request( 109 url, 110 data=data, 111 headers={'Content-Type': 'application/json'}, 112 method='POST' 113 ) 114 115 with urllib.request.urlopen(req, timeout=20) as response: 116 result = json.loads(response.read().decode('utf-8')) 117 118 if 'candidates' in result and result['candidates']: 119 text = result['candidates'][0]['content']['parts'][0]['text'] 120 self.stats["successful_calls"] += 1 121 return { 122 "success": True, 123 "provider": "Google Gemini", 124 "model": APIConfig.GOOGLE_MODEL, 125 "response": text, 126 "tokens": len(text.split()) 127 } 128 else: 129 error_msg = result.get('error', {}).get('message', 'Unknown error') 130 self.stats["failed_calls"] += 1 131 return { 132 "success": False, 133 "provider": "Google Gemini", 134 "error": error_msg 135 } 136 137 except urllib.error.HTTPError as e: 138 self.stats["failed_calls"] += 1 139 return { 140 "success": False, 141 "provider": "Google Gemini", 142 "error": f"HTTP {e.code}: {e.reason}" 143 } 144 except Exception as e: 145 self.stats["failed_calls"] += 1 146 return { 147 "success": False, 148 "provider": "Google Gemini", 149 "error": str(e)[:100] 150 } 151 152 async def query_groq(self, prompt, max_tokens=300): 153 """Query Groq API with retry logic""" 154 self.stats["groq_calls"] += 1 155 156 url = "https://api.groq.com/openai/v1/chat/completions" 157 158 payload = { 159 "model": APIConfig.GROQ_MODEL, 160 "messages": [{"role": "user", "content": prompt}], 161 "max_tokens": max_tokens, 162 "temperature": 0.7, 163 "top_p": 0.8 164 } 165 166 try: 167 data = json.dumps(payload).encode('utf-8') 168 req = urllib.request.Request( 169 url, 170 data=data, 171 headers={ 172 'Authorization': f'Bearer {APIConfig.GROQ_KEY}', 173 'Content-Type': 'application/json' 174 }, 175 method='POST' 176 ) 177 178 with urllib.request.urlopen(req, timeout=20) as response: 179 result = json.loads(response.read().decode('utf-8')) 180 181 if 'choices' in result and result['choices']: 182 text = result['choices'][0]['message']['content'] 183 self.stats["successful_calls"] += 1 184 return { 185 "success": True, 186 "provider": "Groq", 187 "model": APIConfig.GROQ_MODEL, 188 "response": text, 189 "tokens": len(text.split()) 190 } 191 else: 192 error_msg = result.get('error', {}).get('message', 'Unknown error') 193 self.stats["failed_calls"] += 1 194 return { 195 "success": False, 196 "provider": "Groq", 197 "error": error_msg 198 } 199 200 except urllib.error.HTTPError as e: 201 self.stats["failed_calls"] += 1 202 return { 203 "success": False, 204 "provider": "Groq", 205 "error": f"HTTP {e.code}: {e.reason}" 206 } 207 except Exception as e: 208 self.stats["failed_calls"] += 1 209 return { 210 "success": False, 211 "provider": "Groq", 212 "error": str(e)[:100] 213 } 214 215 def create_intelligent_subtasks(self, main_task): 216 """Create smart subtasks for parallel processing""" 217 218 # Determine task type and create appropriate subtasks 219 task_lower = main_task.lower() 220 221 if any(word in task_lower for word in ['explain', 'what is', 'describe', 'define']): 222 # Explanatory task 223 return [ 224 f"Provide a comprehensive explanation of: {main_task}", 225 f"Give practical examples and applications of: {main_task}", 226 f"What are the key concepts and principles behind: {main_task}" 227 ] 228 elif any(word in task_lower for word in ['how to', 'create', 'build', 'make', 'write']): 229 # Instructional task 230 return [ 231 f"Provide step-by-step instructions for: {main_task}", 232 f"List best practices and tips for: {main_task}", 233 f"Give code examples or templates for: {main_task}" 234 ] 235 elif any(word in task_lower for word in ['compare', 'difference', 'vs', 'versus']): 236 # Comparative task 237 return [ 238 f"Analyze and compare aspects of: {main_task}", 239 f"List advantages and disadvantages of: {main_task}", 240 f"Provide use cases for different approaches in: {main_task}" 241 ] 242 else: 243 # General task 244 return [ 245 f"Analyze and explain: {main_task}", 246 f"Provide insights and examples about: {main_task}", 247 f"Discuss implications and applications of: {main_task}" 248 ] 249 250 async def orchestrate_task(self, user_task): 251 """Main orchestration workflow""" 252 253 print(f"\nšÆ USER TASK: {user_task}") 254 255 # Step 1: Privacy protection 256 print("\nš Step 1: Privacy Protection...") 257 masked_task = self.privacy.mask_sensitive_data(user_task) 258 if masked_task != user_task: 259 print(f" Original: {user_task[:80]}...") 260 print(f" Masked: {masked_task[:80]}...") 261 262 # Step 2: Task decomposition 263 print("\nš Step 2: Intelligent Task Decomposition...") 264 subtasks = self.create_intelligent_subtasks(masked_task) 265 print(f" Created {len(subtasks)} optimized subtasks:") 266 for i, subtask in enumerate(subtasks, 1): 267 print(f" {i}. {subtask[:70]}...") 268 269 # Step 3: Parallel processing 270 print("\nā” Step 3: Parallel AI Processing...") 271 272 # Create tasks for each API 273 api_tasks = [] 274 275 # Use Google for first subtask 276 api_tasks.append(self.query_google_gemini(subtasks[0])) 277 278 # Use Groq for second subtask (if available) 279 if len(subtasks) > 1: 280 api_tasks.append(self.query_groq(subtasks[1])) 281 282 # Run all API calls in parallel 283 print(f" Executing {len(api_tasks)} API calls in parallel...") 284 results = await asyncio.gather(*api_tasks) 285 286 # Process results 287 successful_results = [] 288 failed_results = [] 289 290 for result in results: 291 if result["success"]: 292 successful_results.append(result) 293 else: 294 failed_results.append(result) 295 296 # Step 4: Results synthesis 297 print("\nš§© Step 4: Synthesizing Results...") 298 299 if successful_results: 300 # Generate comprehensive summary 301 synthesis = self._synthesize_results(user_task, successful_results) 302 else: 303 synthesis = "ā All API calls failed. Please check your API keys and network connection." 304 305 return { 306 "original_task": user_task, 307 "masked_task": masked_task, 308 "subtasks": subtasks, 309 "api_results": results, 310 "synthesis": synthesis, 311 "statistics": self.stats.copy(), 312 "timestamp": datetime.now().isoformat() 313 } 314 315 def _synthesize_results(self, original_task, api_results): 316 """Combine API results into comprehensive answer""" 317 318 # Build the synthesis 319 synthesis = f"## š¤ AI ANALYSIS: {original_task}\n\n" 320 321 # Add individual responses 322 for i, result in enumerate(api_results, 1): 323 synthesis += f"### {i}. {result['provider']} ({result['model']}):\n" 324 synthesis += f"{result['response']}\n\n" 325 326 # Add comparative analysis 327 if len(api_results) > 1: 328 synthesis += "### š COMPARATIVE INSIGHTS:\n" 329 synthesis += f"- **Total perspectives analyzed**: {len(api_results)}\n" 330 synthesis += f"- **Total tokens processed**: {sum(r.get('tokens', 0) for r in api_results)}\n" 331 synthesis += "- **Key takeaways**: Multiple AI models provide complementary insights\n" 332 synthesis += "- **Recommendation**: Consider the consensus across different AI systems\n\n" 333 334 # Add action items 335 synthesis += "### šÆ RECOMMENDED NEXT STEPS:\n" 336 synthesis += "1. Review the AI analyses above\n" 337 synthesis += "2. Identify common themes and recommendations\n" 338 synthesis += "3. Test any code or procedures in a safe environment\n" 339 synthesis += "4. Implement solutions gradually with monitoring\n" 340 341 return synthesis 342 343 def generate_report(self, orchestration_result): 344 """Generate formatted report""" 345 346 report = f""" 347 {'='*70} 348 š¤ AI ORCHESTRATION REPORT 349 {'='*70} 350 351 š Timestamp: {orchestration_result['timestamp']} 352 šÆ Original Task: {orchestration_result['original_task']} 353 354 š EXECUTION STATISTICS: 355 ⢠Google Gemini Calls: {self.stats['google_calls']} 356 ⢠Groq Calls: {self.stats['groq_calls']} 357 ⢠Successful: {self.stats['successful_calls']} 358 ⢠Failed: {self.stats['failed_calls']} 359 360 š SUBTASKS EXECUTED: 361 """ 362 363 for i, subtask in enumerate(orchestration_result['subtasks'], 1): 364 report += f" {i}. {subtask[:60]}...\n" 365 366 report += f"\n{'='*70}\nš SYNTHESIZED ANALYSIS\n{'='*70}\n" 367 report += orchestration_result['synthesis'] 368 369 return report 370 371 async def main(): 372 """Main execution function""" 373 374 print("\nš FINAL AI ORCHESTRATOR") 375 print("="*60) 376 377 # Get task from user 378 if len(sys.argv) > 1: 379 user_task = " ".join(sys.argv[1:]) 380 else: 381 print("\nš Enter your task or question:") 382 print("Examples:") 383 print(" ⢠'Explain neural networks and deep learning'") 384 print(" ⢠'How to secure a Linux web server'") 385 print(" ⢠'Write a Python script for file encryption'") 386 print(" ⢠'Compare different machine learning algorithms'") 387 print("\nYour question (emails/IPs auto-masked):") 388 389 user_task = sys.stdin.readline().strip() 390 if not user_task: 391 user_task = "Explain the concept of artificial intelligence and its applications" 392 393 # Create orchestrator 394 orchestrator = AIOrchestrator() 395 396 try: 397 print("\n" + "="*60) 398 print("š INTELLIGENT ORCHESTRATION IN PROGRESS...") 399 print("="*60) 400 401 # Run orchestration 402 result = await orchestrator.orchestrate_task(user_task) 403 404 # Generate and display report 405 report = orchestrator.generate_report(result) 406 print(report) 407 408 # Save results 409 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 410 411 # Save JSON (structured data) 412 json_file = f"orchestration_result_{timestamp}.json" 413 with open(json_file, 'w', encoding='utf-8') as f: 414 json.dump(result, f, indent=2, ensure_ascii=False) 415 416 # Save text report (readable) 417 txt_file = f"orchestration_result_{timestamp}.txt" 418 with open(txt_file, 'w', encoding='utf-8') as f: 419 f.write(report) 420 421 print(f"\nš¾ Results saved to:") 422 print(f" ⢠{json_file} (structured data)") 423 print(f" ⢠{txt_file} (readable report)") 424 425 print("\n" + "="*60) 426 print("ā ORCHESTRATION COMPLETE!") 427 print("="*60) 428 429 # Display quick stats 430 print(f"\nš Quick Stats:") 431 print(f" Successful API calls: {orchestrator.stats['successful_calls']}/2") 432 print(f" Privacy protections: Enabled ā") 433 print(f" Parallel processing: Enabled ā") 434 435 except KeyboardInterrupt: 436 print("\n\nā¹ļø Orchestration cancelled by user") 437 except Exception as e: 438 print(f"\nā Orchestration failed: {e}") 439 440 if __name__ == "__main__": 441 # Run the orchestrator 442 asyncio.run(main())