/ swarm_n8n_integration.py
swarm_n8n_integration.py
1 #!/usr/bin/env python3 2 """ 3 N8N + SWARM INTEGRATION 4 Visual workflow orchestration for your AI swarm 5 """ 6 7 import json 8 import subprocess 9 import os 10 import sys 11 import time 12 import threading 13 from typing import Dict, List, Any 14 import urllib.request 15 import socket 16 17 class N8nSwarmIntegration: 18 """Integrates n8n workflow automation with AI swarm""" 19 20 def __init__(self, n8n_port=5678, swarm_port=8888): 21 self.n8n_port = n8n_port 22 self.swarm_port = swarm_port 23 self.n8n_process = None 24 self.swarm_nodes = [] 25 26 print("š¤ N8N + Swarm Integration") 27 print("==========================") 28 29 def install_n8n(self): 30 """Install n8n and required nodes""" 31 print("š¦ Installing n8n...") 32 33 try: 34 # Install n8n globally 35 subprocess.run(['npm', 'install', '-g', 'n8n'], check=True, capture_output=True) 36 37 # Install useful n8n nodes 38 subprocess.run(['npm', 'install', '-g', 39 'n8n-nodes-base', 40 'n8n-nodes-custom', 41 'n8n-nodes-ai' 42 ], capture_output=True) 43 44 print("ā n8n installed successfully") 45 return True 46 except Exception as e: 47 print(f"ā ļø n8n installation failed: {e}") 48 return False 49 50 def start_n8n(self): 51 """Start n8n server""" 52 print(f"š Starting n8n on port {self.n8n_port}...") 53 54 # Set N8N environment variables 55 env = os.environ.copy() 56 env['N8N_PORT'] = str(self.n8n_port) 57 env['N8N_PROTOCOL'] = 'http' 58 env['N8N_HOST'] = 'localhost' 59 60 # Start n8n in background 61 self.n8n_process = subprocess.Popen( 62 ['n8n', 'start'], 63 env=env, 64 stdout=subprocess.PIPE, 65 stderr=subprocess.PIPE 66 ) 67 68 # Wait for n8n to start 69 time.sleep(5) 70 print(f"ā n8n started at http://localhost:{self.n8n_port}") 71 print(f" Default credentials: admin@n8n.io / password") 72 return True 73 74 def create_swarm_workflow(self, workflow_name="AI Swarm Orchestrator"): 75 """Create n8n workflow for swarm management""" 76 workflow = { 77 "name": workflow_name, 78 "nodes": [ 79 # Trigger node 80 { 81 "name": "Manual Trigger", 82 "type": "n8n-nodes-base.manualTrigger", 83 "position": [250, 300], 84 "parameters": {} 85 }, 86 # Swarm Discovery Node 87 { 88 "name": "Discover Swarm Nodes", 89 "type": "n8n-nodes-base.httpRequest", 90 "position": [450, 300], 91 "parameters": { 92 "method": "GET", 93 "url": f"http://localhost:{self.swarm_port}/workers", 94 "authentication": "genericCredentialType", 95 "genericAuthType": "httpHeaderAuth", 96 "sendHeaders": True, 97 "headerParameters": { 98 "parameters": [ 99 { 100 "name": "Content-Type", 101 "value": "application/json" 102 } 103 ] 104 } 105 } 106 }, 107 # Task Decomposition Node 108 { 109 "name": "Decompose AI Task", 110 "type": "n8n-nodes-base.code", 111 "position": [650, 300], 112 "parameters": { 113 "jsCode": f""" 114 // Decompose complex AI task 115 const task = $input.first().json.prompt || $input.first().json; 116 117 // Call local swarm for decomposition 118 const decomposed = await $http.post( 119 `http://localhost:{self.swarm_port}/decompose`, 120 {{ 121 prompt: task 122 }} 123 ); 124 125 // Return decomposed tasks 126 return decomposed.tasks.map((t, i) => ({{ 127 json: {{ 128 task_id: 'task_${i}_${Date.now()}', 129 subtask: t, 130 priority: i === 0 ? 'high' : 'medium', 131 timestamp: new Date().toISOString() 132 }} 133 }})); 134 """ 135 } 136 }, 137 # Swarm Distribution Node 138 { 139 "name": "Distribute to Swarm", 140 "type": "n8n-nodes-base.httpRequest", 141 "position": [850, 300], 142 "parameters": { 143 "method": "POST", 144 "url": f"http://localhost:{self.swarm_port}/distribute", 145 "authentication": "genericCredentialType", 146 "genericAuthType": "httpHeaderAuth", 147 "sendHeaders": True, 148 "headerParameters": { 149 "parameters": [ 150 { 151 "name": "Content-Type", 152 "value": "application/json" 153 } 154 ] 155 }, 156 "sendBody": True, 157 "bodyParameters": { 158 "parameters": [ 159 { 160 "name": "tasks", 161 "value": "={{ $json.tasks }}" 162 } 163 ] 164 } 165 } 166 }, 167 # AI Processing Nodes (parallel execution) 168 { 169 "name": "Process with Groq AI", 170 "type": "n8n-nodes-base.httpRequest", 171 "position": [1050, 200], 172 "parameters": { 173 "method": "POST", 174 "url": "https://api.groq.com/openai/v1/chat/completions", 175 "authentication": "genericCredentialType", 176 "genericAuthType": "httpHeaderAuth", 177 "sendHeaders": True, 178 "headerParameters": { 179 "parameters": [ 180 { 181 "name": "Authorization", 182 "value": "Bearer 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" 183 }, 184 { 185 "name": "Content-Type", 186 "value": "application/json" 187 } 188 ] 189 }, 190 "sendBody": True, 191 "bodyParameters": { 192 "parameters": [ 193 { 194 "name": "model", 195 "value": "llama-3.3-70b-versatile" 196 }, 197 { 198 "name": "messages", 199 "value": "=[{\"role\": \"user\", \"content\": \"{{ $json.subtask }}\"}]" 200 }, 201 { 202 "name": "max_tokens", 203 "value": 1000 204 } 205 ] 206 } 207 } 208 }, 209 { 210 "name": "Process with Local Model", 211 "type": "n8n-nodes-base.httpRequest", 212 "position": [1050, 400], 213 "parameters": { 214 "method": "POST", 215 "url": "http://localhost:11434/api/generate", 216 "sendHeaders": True, 217 "headerParameters": { 218 "parameters": [ 219 { 220 "name": "Content-Type", 221 "value": "application/json" 222 } 223 ] 224 }, 225 "sendBody": True, 226 "bodyParameters": { 227 "parameters": [ 228 { 229 "name": "model", 230 "value": "mistral:7b" 231 }, 232 { 233 "name": "prompt", 234 "value": "={{ $json.subtask }}" 235 }, 236 { 237 "name": "stream", 238 "value": False 239 } 240 ] 241 } 242 } 243 }, 244 # Result Aggregation Node 245 { 246 "name": "Aggregate Results", 247 "type": "n8n-nodes-base.merge", 248 "position": [1250, 300], 249 "parameters": { 250 "mode": "combine", 251 "outputFormat": "singleItem" 252 } 253 }, 254 # Final Synthesis Node 255 { 256 "name": "Synthesize Final Answer", 257 "type": "n8n-nodes-base.code", 258 "position": [1450, 300], 259 "parameters": { 260 "jsCode": """ 261 // Combine all AI responses 262 const responses = $input.all(); 263 264 // Create final synthesis 265 const synthesis = { 266 original_task: $input.first().json.original_task, 267 decomposed_tasks: responses.map(r => r.json.subtask), 268 ai_responses: responses.map(r => r.json.response || r.json), 269 timestamp: new Date().toISOString(), 270 total_tasks: responses.length, 271 swarm_nodes_used: responses.length 272 }; 273 274 // Call swarm for final synthesis if available 275 try { 276 const final = await $http.post( 277 'http://localhost:8888/synthesize', 278 { 279 task: $input.first().json.original_task, 280 responses: responses.map(r => r.json) 281 } 282 ); 283 synthesis.final_result = final.result; 284 } catch (e) { 285 synthesis.final_result = "Synthesis failed: " + e.message; 286 } 287 288 return [{ json: synthesis }]; 289 """ 290 } 291 } 292 ], 293 "connections": { 294 "Manual Trigger": { 295 "main": [ 296 { 297 "node": "Discover Swarm Nodes", 298 "type": "main", 299 "index": 0 300 } 301 ] 302 }, 303 "Discover Swarm Nodes": { 304 "main": [ 305 { 306 "node": "Decompose AI Task", 307 "type": "main", 308 "index": 0 309 } 310 ] 311 }, 312 "Decompose AI Task": { 313 "main": [ 314 [ 315 { 316 "node": "Distribute to Swarm", 317 "type": "main", 318 "index": 0 319 } 320 ] 321 ] 322 }, 323 "Distribute to Swarm": { 324 "main": [ 325 [ 326 { 327 "node": "Process with Groq AI", 328 "type": "main", 329 "index": 0 330 }, 331 { 332 "node": "Process with Local Model", 333 "type": "main", 334 "index": 1 335 } 336 ] 337 ] 338 }, 339 "Process with Groq AI": { 340 "main": [ 341 { 342 "node": "Aggregate Results", 343 "type": "main", 344 "index": 0 345 } 346 ] 347 }, 348 "Process with Local Model": { 349 "main": [ 350 { 351 "node": "Aggregate Results", 352 "type": "main", 353 "index": 1 354 } 355 ] 356 }, 357 "Aggregate Results": { 358 "main": [ 359 { 360 "node": "Synthesize Final Answer", 361 "type": "main", 362 "index": 0 363 } 364 ] 365 } 366 }, 367 "settings": {}, 368 "staticData": None, 369 "pinData": {} 370 } 371 372 return workflow 373 374 def deploy_workflow(self, workflow_json, n8n_api_key=None): 375 """Deploy workflow to n8n via API""" 376 try: 377 workflow_str = json.dumps(workflow_json) 378 379 # Save to file 380 workflow_path = os.path.expanduser("~/swarm_workflow.json") 381 with open(workflow_path, 'w') as f: 382 f.write(workflow_str) 383 384 print(f"š Workflow saved to: {workflow_path}") 385 print(f"š Import manually at: http://localhost:{self.n8n_port}/workflow/new") 386 387 # Try to auto-import via API 388 if n8n_api_key: 389 import_url = f"http://localhost:{self.n8n_port}/rest/workflows" 390 headers = { 391 'X-N8N-API-KEY': n8n_api_key, 392 'Content-Type': 'application/json' 393 } 394 395 req = urllib.request.Request( 396 import_url, 397 data=workflow_str.encode(), 398 headers=headers, 399 method='POST' 400 ) 401 402 with urllib.request.urlopen(req) as response: 403 result = json.loads(response.read().decode()) 404 print(f"ā Workflow imported with ID: {result.get('id')}") 405 406 return True 407 except Exception as e: 408 print(f"ā ļø Workflow deployment failed: {e}") 409 return False 410 411 def create_swarm_api_for_n8n(self): 412 """Create a simple API for n8n to interact with swarm""" 413 from http.server import HTTPServer, BaseHTTPRequestHandler 414 import threading 415 416 class SwarmAPIHandler(BaseHTTPRequestHandler): 417 def do_GET(self): 418 if self.path == '/workers': 419 self.send_response(200) 420 self.send_header('Content-Type', 'application/json') 421 self.end_headers() 422 423 # Simulate discovered workers 424 workers = [ 425 {"id": "worker_1", "type": "groq", "status": "active"}, 426 {"id": "worker_2", "type": "local", "status": "active"}, 427 {"id": "worker_3", "type": "openrouter", "status": "ready"} 428 ] 429 430 self.wfile.write(json.dumps({"workers": workers}).encode()) 431 432 elif self.path == '/health': 433 self.send_response(200) 434 self.end_headers() 435 self.wfile.write(b"OK") 436 437 else: 438 self.send_response(404) 439 self.end_headers() 440 441 def do_POST(self): 442 content_length = int(self.headers['Content-Length']) 443 post_data = self.rfile.read(content_length) 444 data = json.loads(post_data.decode()) 445 446 if self.path == '/decompose': 447 # Task decomposition endpoint 448 prompt = data.get('prompt', '') 449 450 # Simple decomposition logic 451 words = prompt.split() 452 tasks = [] 453 454 if len(words) > 10: 455 # Break into chunks 456 chunk_size = len(words) // 3 457 for i in range(0, len(words), chunk_size): 458 task = ' '.join(words[i:i+chunk_size]) 459 tasks.append(f"Process: {task}") 460 else: 461 tasks = [f"Analyze: {prompt}"] 462 463 self.send_response(200) 464 self.send_header('Content-Type', 'application/json') 465 self.end_headers() 466 self.wfile.write(json.dumps({"tasks": tasks}).encode()) 467 468 elif self.path == '/distribute': 469 # Task distribution endpoint 470 tasks = data.get('tasks', []) 471 472 self.send_response(200) 473 self.send_header('Content-Type', 'application/json') 474 self.end_headers() 475 476 response = { 477 "distributed": True, 478 "task_count": len(tasks), 479 "message": f"Distributed {len(tasks)} tasks to swarm" 480 } 481 482 self.wfile.write(json.dumps(response).encode()) 483 484 elif self.path == '/synthesize': 485 # Result synthesis endpoint 486 task = data.get('task', '') 487 responses = data.get('responses', []) 488 489 # Simple synthesis 490 synthesis = f"Based on {len(responses)} AI responses:\n\n" 491 for i, resp in enumerate(responses): 492 synthesis += f"Response {i+1}: {str(resp)[:100]}...\n" 493 494 self.send_response(200) 495 self.send_header('Content-Type', 'application/json') 496 self.end_headers() 497 self.wfile.write(json.dumps({"result": synthesis}).encode()) 498 499 else: 500 self.send_response(404) 501 self.end_headers() 502 503 def log_message(self, format, *args): 504 pass 505 506 # Start API server in background thread 507 def run_api(): 508 server = HTTPServer(('localhost', self.swarm_port), SwarmAPIHandler) 509 print(f"š Swarm API listening on port {self.swarm_port}") 510 server.serve_forever() 511 512 api_thread = threading.Thread(target=run_api, daemon=True) 513 api_thread.start() 514 return True 515 516 def create_ai_workflow_templates(self): 517 """Create common AI workflow templates""" 518 templates = { 519 "security_audit": { 520 "name": "Security Audit Workflow", 521 "description": "Comprehensive security assessment using swarm AI", 522 "steps": [ 523 "1. Input target system/network", 524 "2. Decompose into security domains", 525 "3. Parallel vulnerability assessment", 526 "4. Threat modeling", 527 "5. Generate remediation report" 528 ] 529 }, 530 "code_review": { 531 "name": "AI-Powered Code Review", 532 "description": "Multi-AI code analysis and optimization", 533 "steps": [ 534 "1. Input codebase", 535 "2. Static analysis (Groq)", 536 "3. Security audit (Local model)", 537 "4. Performance optimization (OpenRouter)", 538 "5. Generate review report" 539 ] 540 }, 541 "research_assistant": { 542 "name": "Research Assistant", 543 "description": "Distributed research and synthesis", 544 "steps": [ 545 "1. Research topic input", 546 "2. Parallel information gathering", 547 "3. Cross-reference validation", 548 "4. Synthesis and citation", 549 "5. Generate research paper" 550 ] 551 } 552 } 553 554 return templates 555 556 def main(): 557 import argparse 558 559 parser = argparse.ArgumentParser(description="n8n + Swarm Integration") 560 parser.add_argument('--install', action='store_true', help='Install n8n') 561 parser.add_argument('--start', action='store_true', help='Start n8n and swarm API') 562 parser.add_argument('--create-workflow', action='store_true', help='Create swarm workflow') 563 parser.add_argument('--n8n-port', type=int, default=5678, help='n8n port') 564 parser.add_argument('--swarm-port', type=int, default=8888, help='Swarm API port') 565 566 args = parser.parse_args() 567 568 integrator = N8nSwarmIntegration(args.n8n_port, args.swarm_port) 569 570 if args.install: 571 integrator.install_n8n() 572 573 if args.start: 574 # Start n8n 575 if integrator.start_n8n(): 576 # Start swarm API 577 integrator.create_swarm_api_for_n8n() 578 579 print("\nā System Ready!") 580 print(f"š n8n Dashboard: http://localhost:{args.n8n_port}") 581 print(f"š§ Swarm API: http://localhost:{args.swarm_port}") 582 print("\nš” Workflow templates available:") 583 templates = integrator.create_ai_workflow_templates() 584 for key, template in templates.items(): 585 print(f" ⢠{template['name']}: {template['description']}") 586 587 if args.create_workflow: 588 workflow = integrator.create_swarm_workflow() 589 integrator.deploy_workflow(workflow) 590 591 print("\nš Workflow Nodes Created:") 592 for node in workflow['nodes']: 593 print(f" ⢠{node['name']} ({node['type'].split('.')[-1]})") 594 595 if __name__ == "__main__": 596 main()