/ swarm_orchestrator.py
swarm_orchestrator.py
  1  #!/usr/bin/env python3
  2  """
  3  PRODUCTION SWARM ORCHESTRATOR v3.0
  4  Uses real Google Gemini and Hugging Face APIs
  5  """
  6  
  7  import os
  8  import sys
  9  import json
 10  import re
 11  import asyncio
 12  import aiohttp
 13  from datetime import datetime
 14  
 15  # ========== API CONFIGURATION ==========
 16  API_KEYS = {
 17      "google": os.getenv("GOOGLE_API_KEY", "AIzaSyC9g4B4sY9xeaUntjNmN2MeWFyp5gL3_EM"),
 18      "huggingface": os.getenv("HUGGINGFACE_TOKEN", "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"),
 19      "openrouter": os.getenv("OPENROUTER_API_KEY", "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"),
 20      "groq": os.getenv("GROQ_API_KEY", "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")
 21  }
 22  
 23  class PrivacyLayer:
 24      """Mask sensitive information before sending to cloud"""
 25      
 26      def mask_pii(self, text):
 27          """Replace sensitive data with tokens"""
 28          # Email masking
 29          text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', 
 30                       '[EMAIL_MASKED]', text)
 31          # IP masking
 32          text = re.sub(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', 
 33                       '[IP_MASKED]', text)
 34          # Phone masking
 35          text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', 
 36                       '[PHONE_MASKED]', text)
 37          # Custom tokens (like API keys)
 38          text = re.sub(r'AIzaSy[A-Za-z0-9_\-]{35}', '[API_KEY_MASKED]', text)
 39          text = re.sub(r'sk-or-v1-[A-Za-z0-9]{64}', '[API_KEY_MASKED]', text)
 40          text = re.sub(r'gsk_[A-Za-z0-9]{64}', '[API_KEY_MASKED]', text)
 41          text = re.sub(r'hf_[A-Za-z0-9]{34}', '[API_KEY_MASKED]', text)
 42          
 43          return text
 44  
 45  class CloudAPIs:
 46      """Actual API implementations"""
 47      
 48      async def query_gemini(self, prompt):
 49          """Google Gemini API - 60 RPM, 1M tokens/month FREE"""
 50          api_key = API_KEYS["google"]
 51          url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-pro:generateContent?key={api_key}"
 52          
 53          payload = {
 54              "contents": [{
 55                  "parts": [{"text": prompt}]
 56              }],
 57              "generationConfig": {
 58                  "maxOutputTokens": 1000,
 59                  "temperature": 0.7
 60              }
 61          }
 62          
 63          try:
 64              async with aiohttp.ClientSession() as session:
 65                  async with session.post(url, json=payload, timeout=30) as response:
 66                      if response.status == 200:
 67                          data = await response.json()
 68                          return data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "No response")
 69                      else:
 70                          return f"[Gemini Error: {response.status}]"
 71          except Exception as e:
 72              return f"[Gemini Error: {str(e)}]"
 73      
 74      async def query_huggingface(self, prompt):
 75          """Hugging Face Inference API - 30K tokens/month FREE"""
 76          api_key = API_KEYS["huggingface"]
 77          url = "https://api-inference.huggingface.co/models/mistralai/Mistral-7B-Instruct-v0.2"
 78          headers = {"Authorization": f"Bearer {api_key}"}
 79          
 80          payload = {
 81              "inputs": prompt,
 82              "parameters": {
 83                  "max_new_tokens": 500,
 84                  "temperature": 0.7,
 85                  "return_full_text": False
 86              }
 87          }
 88          
 89          try:
 90              async with aiohttp.ClientSession() as session:
 91                  async with session.post(url, headers=headers, json=payload, timeout=30) as response:
 92                      if response.status == 200:
 93                          data = await response.json()
 94                          if isinstance(data, list) and len(data) > 0:
 95                              return data[0].get("generated_text", "No response")
 96                          return str(data)
 97                      else:
 98                          return f"[HF Error: {response.status}]"
 99          except Exception as e:
100              return f"[HF Error: {str(e)}]"
101      
102      async def query_openrouter(self, prompt):
103          """OpenRouter API - Check free tier"""
104          api_key = API_KEYS["openrouter"]
105          url = "https://openrouter.ai/api/v1/chat/completions"
106          headers = {
107              "Authorization": f"Bearer {api_key}",
108              "HTTP-Referer": "http://localhost:3000",
109              "X-Title": "Swarm Orchestrator"
110          }
111          
112          payload = {
113              "model": "mistralai/mistral-7b-instruct:free",
114              "messages": [{"role": "user", "content": prompt}],
115              "max_tokens": 500
116          }
117          
118          try:
119              async with aiohttp.ClientSession() as session:
120                  async with session.post(url, headers=headers, json=payload, timeout=30) as response:
121                      if response.status == 200:
122                          data = await response.json()
123                          return data.get("choices", [{}])[0].get("message", {}).get("content", "No response")
124                      else:
125                          return f"[OpenRouter Error: {response.status}]"
126          except Exception as e:
127              return f"[OpenRouter Error: {str(e)}]"
128      
129      async def query_groq(self, prompt):
130          """Groq API - Free tier"""
131          api_key = API_KEYS["groq"]
132          url = "https://api.groq.com/openai/v1/chat/completions"
133          headers = {
134              "Authorization": f"Bearer {api_key}",
135              "Content-Type": "application/json"
136          }
137          
138          payload = {
139              "model": "mixtral-8x7b-32768",
140              "messages": [{"role": "user", "content": prompt}],
141              "max_tokens": 500,
142              "temperature": 0.7
143          }
144          
145          try:
146              async with aiohttp.ClientSession() as session:
147                  async with session.post(url, headers=headers, json=payload, timeout=30) as response:
148                      if response.status == 200:
149                          data = await response.json()
150                          return data.get("choices", [{}])[0].get("message", {}).get("content", "No response")
151                      else:
152                          return f"[Groq Error: {response.status}]"
153          except Exception as e:
154              return f"[Groq Error: {str(e)}]"
155  
156  class SwarmOrchestrator:
157      """Main orchestrator with real API calls"""
158      
159      def __init__(self):
160          self.privacy = PrivacyLayer()
161          self.apis = CloudAPIs()
162          print("=" * 60)
163          print("šŸ¤– PRODUCTION SWARM ORCHESTRATOR v3.0")
164          print("=" * 60)
165          print("APIs Available:")
166          print(f"  āœ“ Google Gemini (60 RPM free)")
167          print(f"  āœ“ Hugging Face (30K tokens/month)")
168          print(f"  āœ“ OpenRouter (free models)")
169          print(f"  āœ“ Groq (free tier)")
170          print("=" * 60)
171      
172      def decompose_task(self, task):
173          """Break task into subtasks"""
174          subtasks = [
175              f"Provide background and context about: {task}",
176              f"Explain technical implementation details for: {task}",
177              f"List best practices and security considerations for: {task}",
178              f"Provide code examples or templates for: {task}"
179          ]
180          return subtasks[:3]  # Return first 3 subtasks
181      
182      async def process_in_parallel(self, subtasks):
183          """Process all subtasks in parallel"""
184          print(f"\n⚔ Processing {len(subtasks)} subtasks in parallel...")
185          
186          # Create tasks for each API
187          tasks = []
188          for i, subtask in enumerate(subtasks):
189              if i == 0:
190                  tasks.append(self.apis.query_gemini(subtask))
191              elif i == 1:
192                  tasks.append(self.apis.query_huggingface(subtask))
193              elif i == 2:
194                  tasks.append(self.apis.query_groq(subtask))
195          
196          # Run all in parallel
197          results = await asyncio.gather(*tasks, return_exceptions=True)
198          
199          # Process results
200          processed = []
201          api_names = ["Google Gemini", "Hugging Face", "Groq"]
202          
203          for i, (api_name, result) in enumerate(zip(api_names, results)):
204              if isinstance(result, Exception):
205                  processed.append(f"[{api_name}] Error: {str(result)[:100]}")
206              else:
207                  processed.append(f"[{api_name}] {result[:200]}...")
208          
209          return processed
210      
211      def synthesize_results(self, task, results):
212          """Combine API results"""
213          synthesis = f"""
214  FINAL ANALYSIS FOR: {task}
215  
216  šŸ“Š MULTI-API ANALYSIS RESULTS:
217  {'-'*50}
218  
219  1. {results[0] if len(results) > 0 else 'No data'}
220  
221  2. {results[1] if len(results) > 1 else 'No data'}
222  
223  3. {results[2] if len(results) > 2 else 'No data'}
224  
225  šŸŽÆ KEY INSIGHTS:
226  • Combined analysis from {len(results)} AI providers
227  • Privacy: All sensitive data was masked before processing
228  • Cost: 100% free using API free tiers
229  • Speed: Parallel processing completed in seconds
230  
231  šŸ’” RECOMMENDATIONS:
232  • Review all perspectives above
233  • Implement security best practices
234  • Test in controlled environment first
235  """
236          return synthesis
237      
238      async def run(self, user_task):
239          """Main orchestration flow"""
240          print(f"\nšŸŽÆ USER TASK: {user_task}")
241          
242          # Step 1: Privacy masking
243          print("\nšŸ”’ Step 1: Privacy Masking...")
244          masked_task = self.privacy.mask_pii(user_task)
245          print(f"   Masked: {masked_task}")
246          
247          # Step 2: Decompose
248          print("\nšŸ” Step 2: Task Decomposition...")
249          subtasks = self.decompose_task(masked_task)
250          for i, subtask in enumerate(subtasks, 1):
251              print(f"   {i}. {subtask[:80]}...")
252          
253          # Step 3: Parallel processing
254          results = await self.process_in_parallel(subtasks)
255          
256          # Step 4: Synthesis
257          print("\n🧩 Step 4: Synthesizing Results...")
258          final_output = self.synthesize_results(user_task, results)
259          
260          return final_output
261  
262  async def main():
263      # Get task from command line or input
264      if len(sys.argv) > 1:
265          task = " ".join(sys.argv[1:])
266      else:
267          print("\nEnter your task (can include sensitive data):")
268          print("Example: 'Secure backup script for server 192.168.1.100'")
269          task = input("\n> ").strip()
270          
271          if not task:
272              task = "Create a secure data backup system"
273      
274      # Run orchestrator
275      orchestrator = SwarmOrchestrator()
276      
277      try:
278          print("\n" + "="*60)
279          print("šŸ”„ PROCESSING WITH REAL APIS...")
280          print("="*60)
281          
282          result = await orchestrator.run(task)
283          
284          print("\n" + "="*60)
285          print("āœ… PROCESSING COMPLETE")
286          print("="*60)
287          print(result)
288          
289          # Save to file
290          timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
291          filename = f"swarm_result_{timestamp}.txt"
292          with open(filename, 'w') as f:
293              f.write(result)
294          
295          print(f"\nšŸ’¾ Results saved to: {filename}")
296          
297      except Exception as e:
298          print(f"\nāŒ Error: {e}")
299  
300  if __name__ == "__main__":
301      # Install required package if missing
302      try:
303          import aiohttp
304      except ImportError:
305          print("Installing aiohttp...")
306          import subprocess
307          subprocess.run([sys.executable, "-m", "pip", "install", "aiohttp"])
308      
309      # Run
310      asyncio.run(main())