/ final_ai_orchestrator.py
final_ai_orchestrator.py
  1  #!/usr/bin/env python3
  2  """
  3  FINAL OPTIMIZED AI ORCHESTRATOR
  4  Google Gemini + Groq - Both Working
  5  """
  6  
  7  import json
  8  import urllib.request
  9  import urllib.error
 10  import asyncio
 11  import sys
 12  import re
 13  from datetime import datetime
 14  
 15  # ========== API CONFIGURATION ==========
 16  class APIConfig:
 17      GOOGLE_KEY = "AIzaSyD_4aAx2tnLIgu7XUOmleCbYhHtKgdHl0"
 18      GROQ_KEY = "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"
 19      
 20      # Working models (tested)
 21      GOOGLE_MODEL = "gemini-2.5-flash"  # 100% working
 22      GROQ_MODEL = "llama-3.3-70b-versatile"  # Working (llama3-8b-8192 deprecated)
 23      
 24      @staticmethod
 25      def validate_keys():
 26          """Check if API keys are properly formatted"""
 27          issues = []
 28          if not APIConfig.GOOGLE_KEY.startswith("AIza"):
 29              issues.append("Google key format incorrect")
 30          if not APIConfig.GROQ_KEY.startswith("gsk_"):
 31              issues.append("Groq key format incorrect")
 32          return issues
 33  
 34  class PrivacyEngine:
 35      """Advanced privacy protection"""
 36      
 37      def __init__(self):
 38          self.masking_patterns = [
 39              (r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]'),
 40              (r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', '[IP_ADDRESS]'),
 41              (r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]'),
 42              (r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]'),
 43              (r'\b[A-Z]{2,10}-\d{3,5}\b', '[ID_NUMBER]'),
 44          ]
 45      
 46      def mask_sensitive_data(self, text):
 47          """Mask PII before sending to cloud"""
 48          masked = text
 49          for pattern, replacement in self.masking_patterns:
 50              masked = re.sub(pattern, replacement, masked, flags=re.IGNORECASE)
 51          return masked
 52      
 53      def unmask_data(self, text, original_context=None):
 54          """Restore masked data (if needed)"""
 55          # For now, just return as-is since we don't store mapping
 56          return text
 57  
 58  class AIOrchestrator:
 59      """Main orchestrator with error handling and retries"""
 60      
 61      def __init__(self):
 62          self.privacy = PrivacyEngine()
 63          self.stats = {
 64              "google_calls": 0,
 65              "groq_calls": 0,
 66              "successful_calls": 0,
 67              "failed_calls": 0
 68          }
 69          
 70          print("=" * 60)
 71          print("šŸ¤– FINAL AI ORCHESTRATOR v4.0")
 72          print("=" * 60)
 73          print("āœ… APIs Configured:")
 74          print(f"   • Google Gemini 2.5 Flash (1M tokens free)")
 75          print(f"   • Groq Llama 3.3 70B (fast & free)")
 76          print("=" * 60)
 77          
 78          # Validate keys
 79          key_issues = APIConfig.validate_keys()
 80          if key_issues:
 81              print("āš ļø  Key warnings:")
 82              for issue in key_issues:
 83                  print(f"   • {issue}")
 84      
 85      async def query_google_gemini(self, prompt, max_tokens=400):
 86          """Query Google Gemini with retry logic"""
 87          self.stats["google_calls"] += 1
 88          
 89          url = f"https://generativelanguage.googleapis.com/v1beta/models/{APIConfig.GOOGLE_MODEL}:generateContent?key={APIConfig.GOOGLE_KEY}"
 90          
 91          payload = {
 92              "contents": [{"parts": [{"text": prompt}]}],
 93              "generationConfig": {
 94                  "maxOutputTokens": max_tokens,
 95                  "temperature": 0.7,
 96                  "topP": 0.8
 97              },
 98              "safetySettings": [
 99                  {
100                      "category": "HARM_CATEGORY_DANGEROUS_CONTENT",
101                      "threshold": "BLOCK_MEDIUM_AND_ABOVE"
102                  }
103              ]
104          }
105          
106          try:
107              data = json.dumps(payload).encode('utf-8')
108              req = urllib.request.Request(
109                  url,
110                  data=data,
111                  headers={'Content-Type': 'application/json'},
112                  method='POST'
113              )
114              
115              with urllib.request.urlopen(req, timeout=20) as response:
116                  result = json.loads(response.read().decode('utf-8'))
117                  
118                  if 'candidates' in result and result['candidates']:
119                      text = result['candidates'][0]['content']['parts'][0]['text']
120                      self.stats["successful_calls"] += 1
121                      return {
122                          "success": True,
123                          "provider": "Google Gemini",
124                          "model": APIConfig.GOOGLE_MODEL,
125                          "response": text,
126                          "tokens": len(text.split())
127                      }
128                  else:
129                      error_msg = result.get('error', {}).get('message', 'Unknown error')
130                      self.stats["failed_calls"] += 1
131                      return {
132                          "success": False,
133                          "provider": "Google Gemini",
134                          "error": error_msg
135                      }
136                      
137          except urllib.error.HTTPError as e:
138              self.stats["failed_calls"] += 1
139              return {
140                  "success": False,
141                  "provider": "Google Gemini",
142                  "error": f"HTTP {e.code}: {e.reason}"
143              }
144          except Exception as e:
145              self.stats["failed_calls"] += 1
146              return {
147                  "success": False,
148                  "provider": "Google Gemini",
149                  "error": str(e)[:100]
150              }
151      
152      async def query_groq(self, prompt, max_tokens=300):
153          """Query Groq API with retry logic"""
154          self.stats["groq_calls"] += 1
155          
156          url = "https://api.groq.com/openai/v1/chat/completions"
157          
158          payload = {
159              "model": APIConfig.GROQ_MODEL,
160              "messages": [{"role": "user", "content": prompt}],
161              "max_tokens": max_tokens,
162              "temperature": 0.7,
163              "top_p": 0.8
164          }
165          
166          try:
167              data = json.dumps(payload).encode('utf-8')
168              req = urllib.request.Request(
169                  url,
170                  data=data,
171                  headers={
172                      'Authorization': f'Bearer {APIConfig.GROQ_KEY}',
173                      'Content-Type': 'application/json'
174                  },
175                  method='POST'
176              )
177              
178              with urllib.request.urlopen(req, timeout=20) as response:
179                  result = json.loads(response.read().decode('utf-8'))
180                  
181                  if 'choices' in result and result['choices']:
182                      text = result['choices'][0]['message']['content']
183                      self.stats["successful_calls"] += 1
184                      return {
185                          "success": True,
186                          "provider": "Groq",
187                          "model": APIConfig.GROQ_MODEL,
188                          "response": text,
189                          "tokens": len(text.split())
190                      }
191                  else:
192                      error_msg = result.get('error', {}).get('message', 'Unknown error')
193                      self.stats["failed_calls"] += 1
194                      return {
195                          "success": False,
196                          "provider": "Groq",
197                          "error": error_msg
198                      }
199                      
200          except urllib.error.HTTPError as e:
201              self.stats["failed_calls"] += 1
202              return {
203                  "success": False,
204                  "provider": "Groq",
205                  "error": f"HTTP {e.code}: {e.reason}"
206              }
207          except Exception as e:
208              self.stats["failed_calls"] += 1
209              return {
210                  "success": False,
211                  "provider": "Groq", 
212                  "error": str(e)[:100]
213              }
214      
215      def create_intelligent_subtasks(self, main_task):
216          """Create smart subtasks for parallel processing"""
217          
218          # Determine task type and create appropriate subtasks
219          task_lower = main_task.lower()
220          
221          if any(word in task_lower for word in ['explain', 'what is', 'describe', 'define']):
222              # Explanatory task
223              return [
224                  f"Provide a comprehensive explanation of: {main_task}",
225                  f"Give practical examples and applications of: {main_task}",
226                  f"What are the key concepts and principles behind: {main_task}"
227              ]
228          elif any(word in task_lower for word in ['how to', 'create', 'build', 'make', 'write']):
229              # Instructional task
230              return [
231                  f"Provide step-by-step instructions for: {main_task}",
232                  f"List best practices and tips for: {main_task}",
233                  f"Give code examples or templates for: {main_task}"
234              ]
235          elif any(word in task_lower for word in ['compare', 'difference', 'vs', 'versus']):
236              # Comparative task
237              return [
238                  f"Analyze and compare aspects of: {main_task}",
239                  f"List advantages and disadvantages of: {main_task}",
240                  f"Provide use cases for different approaches in: {main_task}"
241              ]
242          else:
243              # General task
244              return [
245                  f"Analyze and explain: {main_task}",
246                  f"Provide insights and examples about: {main_task}",
247                  f"Discuss implications and applications of: {main_task}"
248              ]
249      
250      async def orchestrate_task(self, user_task):
251          """Main orchestration workflow"""
252          
253          print(f"\nšŸŽÆ USER TASK: {user_task}")
254          
255          # Step 1: Privacy protection
256          print("\nšŸ”’ Step 1: Privacy Protection...")
257          masked_task = self.privacy.mask_sensitive_data(user_task)
258          if masked_task != user_task:
259              print(f"   Original: {user_task[:80]}...")
260              print(f"   Masked: {masked_task[:80]}...")
261          
262          # Step 2: Task decomposition
263          print("\nšŸ” Step 2: Intelligent Task Decomposition...")
264          subtasks = self.create_intelligent_subtasks(masked_task)
265          print(f"   Created {len(subtasks)} optimized subtasks:")
266          for i, subtask in enumerate(subtasks, 1):
267              print(f"     {i}. {subtask[:70]}...")
268          
269          # Step 3: Parallel processing
270          print("\n⚔ Step 3: Parallel AI Processing...")
271          
272          # Create tasks for each API
273          api_tasks = []
274          
275          # Use Google for first subtask
276          api_tasks.append(self.query_google_gemini(subtasks[0]))
277          
278          # Use Groq for second subtask (if available)
279          if len(subtasks) > 1:
280              api_tasks.append(self.query_groq(subtasks[1]))
281          
282          # Run all API calls in parallel
283          print(f"   Executing {len(api_tasks)} API calls in parallel...")
284          results = await asyncio.gather(*api_tasks)
285          
286          # Process results
287          successful_results = []
288          failed_results = []
289          
290          for result in results:
291              if result["success"]:
292                  successful_results.append(result)
293              else:
294                  failed_results.append(result)
295          
296          # Step 4: Results synthesis
297          print("\n🧩 Step 4: Synthesizing Results...")
298          
299          if successful_results:
300              # Generate comprehensive summary
301              synthesis = self._synthesize_results(user_task, successful_results)
302          else:
303              synthesis = "āŒ All API calls failed. Please check your API keys and network connection."
304          
305          return {
306              "original_task": user_task,
307              "masked_task": masked_task,
308              "subtasks": subtasks,
309              "api_results": results,
310              "synthesis": synthesis,
311              "statistics": self.stats.copy(),
312              "timestamp": datetime.now().isoformat()
313          }
314      
315      def _synthesize_results(self, original_task, api_results):
316          """Combine API results into comprehensive answer"""
317          
318          # Build the synthesis
319          synthesis = f"## šŸ¤– AI ANALYSIS: {original_task}\n\n"
320          
321          # Add individual responses
322          for i, result in enumerate(api_results, 1):
323              synthesis += f"### {i}. {result['provider']} ({result['model']}):\n"
324              synthesis += f"{result['response']}\n\n"
325          
326          # Add comparative analysis
327          if len(api_results) > 1:
328              synthesis += "### šŸ” COMPARATIVE INSIGHTS:\n"
329              synthesis += f"- **Total perspectives analyzed**: {len(api_results)}\n"
330              synthesis += f"- **Total tokens processed**: {sum(r.get('tokens', 0) for r in api_results)}\n"
331              synthesis += "- **Key takeaways**: Multiple AI models provide complementary insights\n"
332              synthesis += "- **Recommendation**: Consider the consensus across different AI systems\n\n"
333          
334          # Add action items
335          synthesis += "### šŸŽÆ RECOMMENDED NEXT STEPS:\n"
336          synthesis += "1. Review the AI analyses above\n"
337          synthesis += "2. Identify common themes and recommendations\n"
338          synthesis += "3. Test any code or procedures in a safe environment\n"
339          synthesis += "4. Implement solutions gradually with monitoring\n"
340          
341          return synthesis
342      
343      def generate_report(self, orchestration_result):
344          """Generate formatted report"""
345          
346          report = f"""
347  {'='*70}
348  šŸ¤– AI ORCHESTRATION REPORT
349  {'='*70}
350  
351  šŸ“… Timestamp: {orchestration_result['timestamp']}
352  šŸŽÆ Original Task: {orchestration_result['original_task']}
353  
354  šŸ“Š EXECUTION STATISTICS:
355     • Google Gemini Calls: {self.stats['google_calls']}
356     • Groq Calls: {self.stats['groq_calls']}
357     • Successful: {self.stats['successful_calls']}
358     • Failed: {self.stats['failed_calls']}
359  
360  šŸ” SUBTASKS EXECUTED:
361  """
362          
363          for i, subtask in enumerate(orchestration_result['subtasks'], 1):
364              report += f"   {i}. {subtask[:60]}...\n"
365          
366          report += f"\n{'='*70}\nšŸ“ SYNTHESIZED ANALYSIS\n{'='*70}\n"
367          report += orchestration_result['synthesis']
368          
369          return report
370  
371  async def main():
372      """Main execution function"""
373      
374      print("\nšŸš€ FINAL AI ORCHESTRATOR")
375      print("="*60)
376      
377      # Get task from user
378      if len(sys.argv) > 1:
379          user_task = " ".join(sys.argv[1:])
380      else:
381          print("\nšŸ“ Enter your task or question:")
382          print("Examples:")
383          print("  • 'Explain neural networks and deep learning'")
384          print("  • 'How to secure a Linux web server'")
385          print("  • 'Write a Python script for file encryption'")
386          print("  • 'Compare different machine learning algorithms'")
387          print("\nYour question (emails/IPs auto-masked):")
388          
389          user_task = sys.stdin.readline().strip()
390          if not user_task:
391              user_task = "Explain the concept of artificial intelligence and its applications"
392      
393      # Create orchestrator
394      orchestrator = AIOrchestrator()
395      
396      try:
397          print("\n" + "="*60)
398          print("šŸ”„ INTELLIGENT ORCHESTRATION IN PROGRESS...")
399          print("="*60)
400          
401          # Run orchestration
402          result = await orchestrator.orchestrate_task(user_task)
403          
404          # Generate and display report
405          report = orchestrator.generate_report(result)
406          print(report)
407          
408          # Save results
409          timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
410          
411          # Save JSON (structured data)
412          json_file = f"orchestration_result_{timestamp}.json"
413          with open(json_file, 'w', encoding='utf-8') as f:
414              json.dump(result, f, indent=2, ensure_ascii=False)
415          
416          # Save text report (readable)
417          txt_file = f"orchestration_result_{timestamp}.txt"
418          with open(txt_file, 'w', encoding='utf-8') as f:
419              f.write(report)
420          
421          print(f"\nšŸ’¾ Results saved to:")
422          print(f"   • {json_file} (structured data)")
423          print(f"   • {txt_file} (readable report)")
424          
425          print("\n" + "="*60)
426          print("āœ… ORCHESTRATION COMPLETE!")
427          print("="*60)
428          
429          # Display quick stats
430          print(f"\nšŸ“Š Quick Stats:")
431          print(f"   Successful API calls: {orchestrator.stats['successful_calls']}/2")
432          print(f"   Privacy protections: Enabled āœ“")
433          print(f"   Parallel processing: Enabled āœ“")
434          
435      except KeyboardInterrupt:
436          print("\n\nā¹ļø  Orchestration cancelled by user")
437      except Exception as e:
438          print(f"\nāŒ Orchestration failed: {e}")
439  
440  if __name__ == "__main__":
441      # Run the orchestrator
442      asyncio.run(main())