/ swarm_orchestrator.py
swarm_orchestrator.py
1 #!/usr/bin/env python3 2 """ 3 PRODUCTION SWARM ORCHESTRATOR v3.0 4 Uses real Google Gemini and Hugging Face APIs 5 """ 6 7 import os 8 import sys 9 import json 10 import re 11 import asyncio 12 import aiohttp 13 from datetime import datetime 14 15 # ========== API CONFIGURATION ========== 16 API_KEYS = { 17 "google": os.getenv("GOOGLE_API_KEY", "AIzaSyC9g4B4sY9xeaUntjNmN2MeWFyp5gL3_EM"), 18 "huggingface": os.getenv("HUGGINGFACE_TOKEN", "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"), 19 "openrouter": os.getenv("OPENROUTER_API_KEY", "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"), 20 "groq": os.getenv("GROQ_API_KEY", "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") 21 } 22 23 class PrivacyLayer: 24 """Mask sensitive information before sending to cloud""" 25 26 def mask_pii(self, text): 27 """Replace sensitive data with tokens""" 28 # Email masking 29 text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', 30 '[EMAIL_MASKED]', text) 31 # IP masking 32 text = re.sub(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', 33 '[IP_MASKED]', text) 34 # Phone masking 35 text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', 36 '[PHONE_MASKED]', text) 37 # Custom tokens (like API keys) 38 text = re.sub(r'AIzaSy[A-Za-z0-9_\-]{35}', '[API_KEY_MASKED]', text) 39 text = re.sub(r'sk-or-v1-[A-Za-z0-9]{64}', '[API_KEY_MASKED]', text) 40 text = re.sub(r'gsk_[A-Za-z0-9]{64}', '[API_KEY_MASKED]', text) 41 text = re.sub(r'hf_[A-Za-z0-9]{34}', '[API_KEY_MASKED]', text) 42 43 return text 44 45 class CloudAPIs: 46 """Actual API implementations""" 47 48 async def query_gemini(self, prompt): 49 """Google Gemini API - 60 RPM, 1M tokens/month FREE""" 50 api_key = API_KEYS["google"] 51 url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-pro:generateContent?key={api_key}" 52 53 payload = { 54 "contents": [{ 55 "parts": [{"text": prompt}] 56 }], 57 "generationConfig": { 58 "maxOutputTokens": 1000, 59 "temperature": 0.7 60 } 61 } 62 63 try: 64 async with aiohttp.ClientSession() as session: 65 async with session.post(url, json=payload, timeout=30) as response: 66 if response.status == 200: 67 data = await response.json() 68 return data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "No response") 69 else: 70 return f"[Gemini Error: {response.status}]" 71 except Exception as e: 72 return f"[Gemini Error: {str(e)}]" 73 74 async def query_huggingface(self, prompt): 75 """Hugging Face Inference API - 30K tokens/month FREE""" 76 api_key = API_KEYS["huggingface"] 77 url = "https://api-inference.huggingface.co/models/mistralai/Mistral-7B-Instruct-v0.2" 78 headers = {"Authorization": f"Bearer {api_key}"} 79 80 payload = { 81 "inputs": prompt, 82 "parameters": { 83 "max_new_tokens": 500, 84 "temperature": 0.7, 85 "return_full_text": False 86 } 87 } 88 89 try: 90 async with aiohttp.ClientSession() as session: 91 async with session.post(url, headers=headers, json=payload, timeout=30) as response: 92 if response.status == 200: 93 data = await response.json() 94 if isinstance(data, list) and len(data) > 0: 95 return data[0].get("generated_text", "No response") 96 return str(data) 97 else: 98 return f"[HF Error: {response.status}]" 99 except Exception as e: 100 return f"[HF Error: {str(e)}]" 101 102 async def query_openrouter(self, prompt): 103 """OpenRouter API - Check free tier""" 104 api_key = API_KEYS["openrouter"] 105 url = "https://openrouter.ai/api/v1/chat/completions" 106 headers = { 107 "Authorization": f"Bearer {api_key}", 108 "HTTP-Referer": "http://localhost:3000", 109 "X-Title": "Swarm Orchestrator" 110 } 111 112 payload = { 113 "model": "mistralai/mistral-7b-instruct:free", 114 "messages": [{"role": "user", "content": prompt}], 115 "max_tokens": 500 116 } 117 118 try: 119 async with aiohttp.ClientSession() as session: 120 async with session.post(url, headers=headers, json=payload, timeout=30) as response: 121 if response.status == 200: 122 data = await response.json() 123 return data.get("choices", [{}])[0].get("message", {}).get("content", "No response") 124 else: 125 return f"[OpenRouter Error: {response.status}]" 126 except Exception as e: 127 return f"[OpenRouter Error: {str(e)}]" 128 129 async def query_groq(self, prompt): 130 """Groq API - Free tier""" 131 api_key = API_KEYS["groq"] 132 url = "https://api.groq.com/openai/v1/chat/completions" 133 headers = { 134 "Authorization": f"Bearer {api_key}", 135 "Content-Type": "application/json" 136 } 137 138 payload = { 139 "model": "mixtral-8x7b-32768", 140 "messages": [{"role": "user", "content": prompt}], 141 "max_tokens": 500, 142 "temperature": 0.7 143 } 144 145 try: 146 async with aiohttp.ClientSession() as session: 147 async with session.post(url, headers=headers, json=payload, timeout=30) as response: 148 if response.status == 200: 149 data = await response.json() 150 return data.get("choices", [{}])[0].get("message", {}).get("content", "No response") 151 else: 152 return f"[Groq Error: {response.status}]" 153 except Exception as e: 154 return f"[Groq Error: {str(e)}]" 155 156 class SwarmOrchestrator: 157 """Main orchestrator with real API calls""" 158 159 def __init__(self): 160 self.privacy = PrivacyLayer() 161 self.apis = CloudAPIs() 162 print("=" * 60) 163 print("š¤ PRODUCTION SWARM ORCHESTRATOR v3.0") 164 print("=" * 60) 165 print("APIs Available:") 166 print(f" ā Google Gemini (60 RPM free)") 167 print(f" ā Hugging Face (30K tokens/month)") 168 print(f" ā OpenRouter (free models)") 169 print(f" ā Groq (free tier)") 170 print("=" * 60) 171 172 def decompose_task(self, task): 173 """Break task into subtasks""" 174 subtasks = [ 175 f"Provide background and context about: {task}", 176 f"Explain technical implementation details for: {task}", 177 f"List best practices and security considerations for: {task}", 178 f"Provide code examples or templates for: {task}" 179 ] 180 return subtasks[:3] # Return first 3 subtasks 181 182 async def process_in_parallel(self, subtasks): 183 """Process all subtasks in parallel""" 184 print(f"\nā” Processing {len(subtasks)} subtasks in parallel...") 185 186 # Create tasks for each API 187 tasks = [] 188 for i, subtask in enumerate(subtasks): 189 if i == 0: 190 tasks.append(self.apis.query_gemini(subtask)) 191 elif i == 1: 192 tasks.append(self.apis.query_huggingface(subtask)) 193 elif i == 2: 194 tasks.append(self.apis.query_groq(subtask)) 195 196 # Run all in parallel 197 results = await asyncio.gather(*tasks, return_exceptions=True) 198 199 # Process results 200 processed = [] 201 api_names = ["Google Gemini", "Hugging Face", "Groq"] 202 203 for i, (api_name, result) in enumerate(zip(api_names, results)): 204 if isinstance(result, Exception): 205 processed.append(f"[{api_name}] Error: {str(result)[:100]}") 206 else: 207 processed.append(f"[{api_name}] {result[:200]}...") 208 209 return processed 210 211 def synthesize_results(self, task, results): 212 """Combine API results""" 213 synthesis = f""" 214 FINAL ANALYSIS FOR: {task} 215 216 š MULTI-API ANALYSIS RESULTS: 217 {'-'*50} 218 219 1. {results[0] if len(results) > 0 else 'No data'} 220 221 2. {results[1] if len(results) > 1 else 'No data'} 222 223 3. {results[2] if len(results) > 2 else 'No data'} 224 225 šÆ KEY INSIGHTS: 226 ⢠Combined analysis from {len(results)} AI providers 227 ⢠Privacy: All sensitive data was masked before processing 228 ⢠Cost: 100% free using API free tiers 229 ⢠Speed: Parallel processing completed in seconds 230 231 š” RECOMMENDATIONS: 232 ⢠Review all perspectives above 233 ⢠Implement security best practices 234 ⢠Test in controlled environment first 235 """ 236 return synthesis 237 238 async def run(self, user_task): 239 """Main orchestration flow""" 240 print(f"\nšÆ USER TASK: {user_task}") 241 242 # Step 1: Privacy masking 243 print("\nš Step 1: Privacy Masking...") 244 masked_task = self.privacy.mask_pii(user_task) 245 print(f" Masked: {masked_task}") 246 247 # Step 2: Decompose 248 print("\nš Step 2: Task Decomposition...") 249 subtasks = self.decompose_task(masked_task) 250 for i, subtask in enumerate(subtasks, 1): 251 print(f" {i}. {subtask[:80]}...") 252 253 # Step 3: Parallel processing 254 results = await self.process_in_parallel(subtasks) 255 256 # Step 4: Synthesis 257 print("\nš§© Step 4: Synthesizing Results...") 258 final_output = self.synthesize_results(user_task, results) 259 260 return final_output 261 262 async def main(): 263 # Get task from command line or input 264 if len(sys.argv) > 1: 265 task = " ".join(sys.argv[1:]) 266 else: 267 print("\nEnter your task (can include sensitive data):") 268 print("Example: 'Secure backup script for server 192.168.1.100'") 269 task = input("\n> ").strip() 270 271 if not task: 272 task = "Create a secure data backup system" 273 274 # Run orchestrator 275 orchestrator = SwarmOrchestrator() 276 277 try: 278 print("\n" + "="*60) 279 print("š PROCESSING WITH REAL APIS...") 280 print("="*60) 281 282 result = await orchestrator.run(task) 283 284 print("\n" + "="*60) 285 print("ā PROCESSING COMPLETE") 286 print("="*60) 287 print(result) 288 289 # Save to file 290 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 291 filename = f"swarm_result_{timestamp}.txt" 292 with open(filename, 'w') as f: 293 f.write(result) 294 295 print(f"\nš¾ Results saved to: {filename}") 296 297 except Exception as e: 298 print(f"\nā Error: {e}") 299 300 if __name__ == "__main__": 301 # Install required package if missing 302 try: 303 import aiohttp 304 except ImportError: 305 print("Installing aiohttp...") 306 import subprocess 307 subprocess.run([sys.executable, "-m", "pip", "install", "aiohttp"]) 308 309 # Run 310 asyncio.run(main())