/ swarm_n8n_integration.py
swarm_n8n_integration.py
  1  #!/usr/bin/env python3
  2  """
  3  N8N + SWARM INTEGRATION
  4  Visual workflow orchestration for your AI swarm
  5  """
  6  
  7  import json
  8  import subprocess
  9  import os
 10  import sys
 11  import time
 12  import threading
 13  from typing import Dict, List, Any
 14  import urllib.request
 15  import socket
 16  
 17  class N8nSwarmIntegration:
 18      """Integrates n8n workflow automation with AI swarm"""
 19      
 20      def __init__(self, n8n_port=5678, swarm_port=8888):
 21          self.n8n_port = n8n_port
 22          self.swarm_port = swarm_port
 23          self.n8n_process = None
 24          self.swarm_nodes = []
 25          
 26          print("šŸ¤– N8N + Swarm Integration")
 27          print("==========================")
 28      
 29      def install_n8n(self):
 30          """Install n8n and required nodes"""
 31          print("šŸ“¦ Installing n8n...")
 32          
 33          try:
 34              # Install n8n globally
 35              subprocess.run(['npm', 'install', '-g', 'n8n'], check=True, capture_output=True)
 36              
 37              # Install useful n8n nodes
 38              subprocess.run(['npm', 'install', '-g', 
 39                  'n8n-nodes-base',
 40                  'n8n-nodes-custom',
 41                  'n8n-nodes-ai'
 42              ], capture_output=True)
 43              
 44              print("āœ… n8n installed successfully")
 45              return True
 46          except Exception as e:
 47              print(f"āš ļø n8n installation failed: {e}")
 48              return False
 49      
 50      def start_n8n(self):
 51          """Start n8n server"""
 52          print(f"šŸš€ Starting n8n on port {self.n8n_port}...")
 53          
 54          # Set N8N environment variables
 55          env = os.environ.copy()
 56          env['N8N_PORT'] = str(self.n8n_port)
 57          env['N8N_PROTOCOL'] = 'http'
 58          env['N8N_HOST'] = 'localhost'
 59          
 60          # Start n8n in background
 61          self.n8n_process = subprocess.Popen(
 62              ['n8n', 'start'],
 63              env=env,
 64              stdout=subprocess.PIPE,
 65              stderr=subprocess.PIPE
 66          )
 67          
 68          # Wait for n8n to start
 69          time.sleep(5)
 70          print(f"āœ… n8n started at http://localhost:{self.n8n_port}")
 71          print(f"   Default credentials: admin@n8n.io / password")
 72          return True
 73      
 74      def create_swarm_workflow(self, workflow_name="AI Swarm Orchestrator"):
 75          """Create n8n workflow for swarm management"""
 76          workflow = {
 77              "name": workflow_name,
 78              "nodes": [
 79                  # Trigger node
 80                  {
 81                      "name": "Manual Trigger",
 82                      "type": "n8n-nodes-base.manualTrigger",
 83                      "position": [250, 300],
 84                      "parameters": {}
 85                  },
 86                  # Swarm Discovery Node
 87                  {
 88                      "name": "Discover Swarm Nodes",
 89                      "type": "n8n-nodes-base.httpRequest",
 90                      "position": [450, 300],
 91                      "parameters": {
 92                          "method": "GET",
 93                          "url": f"http://localhost:{self.swarm_port}/workers",
 94                          "authentication": "genericCredentialType",
 95                          "genericAuthType": "httpHeaderAuth",
 96                          "sendHeaders": True,
 97                          "headerParameters": {
 98                              "parameters": [
 99                                  {
100                                      "name": "Content-Type",
101                                      "value": "application/json"
102                                  }
103                              ]
104                          }
105                      }
106                  },
107                  # Task Decomposition Node
108                  {
109                      "name": "Decompose AI Task",
110                      "type": "n8n-nodes-base.code",
111                      "position": [650, 300],
112                      "parameters": {
113                          "jsCode": f"""
114  // Decompose complex AI task
115  const task = $input.first().json.prompt || $input.first().json;
116  
117  // Call local swarm for decomposition
118  const decomposed = await $http.post(
119    `http://localhost:{self.swarm_port}/decompose`,
120    {{
121      prompt: task
122    }}
123  );
124  
125  // Return decomposed tasks
126  return decomposed.tasks.map((t, i) => ({{
127    json: {{
128      task_id: 'task_${i}_${Date.now()}',
129      subtask: t,
130      priority: i === 0 ? 'high' : 'medium',
131      timestamp: new Date().toISOString()
132    }}
133  }}));
134  """
135                      }
136                  },
137                  # Swarm Distribution Node
138                  {
139                      "name": "Distribute to Swarm",
140                      "type": "n8n-nodes-base.httpRequest",
141                      "position": [850, 300],
142                      "parameters": {
143                          "method": "POST",
144                          "url": f"http://localhost:{self.swarm_port}/distribute",
145                          "authentication": "genericCredentialType",
146                          "genericAuthType": "httpHeaderAuth",
147                          "sendHeaders": True,
148                          "headerParameters": {
149                              "parameters": [
150                                  {
151                                      "name": "Content-Type",
152                                      "value": "application/json"
153                                  }
154                              ]
155                          },
156                          "sendBody": True,
157                          "bodyParameters": {
158                              "parameters": [
159                                  {
160                                      "name": "tasks",
161                                      "value": "={{ $json.tasks }}"
162                                  }
163                              ]
164                          }
165                      }
166                  },
167                  # AI Processing Nodes (parallel execution)
168                  {
169                      "name": "Process with Groq AI",
170                      "type": "n8n-nodes-base.httpRequest",
171                      "position": [1050, 200],
172                      "parameters": {
173                          "method": "POST",
174                          "url": "https://api.groq.com/openai/v1/chat/completions",
175                          "authentication": "genericCredentialType",
176                          "genericAuthType": "httpHeaderAuth",
177                          "sendHeaders": True,
178                          "headerParameters": {
179                              "parameters": [
180                                  {
181                                      "name": "Authorization",
182                                      "value": "Bearer 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"
183                                  },
184                                  {
185                                      "name": "Content-Type",
186                                      "value": "application/json"
187                                  }
188                              ]
189                          },
190                          "sendBody": True,
191                          "bodyParameters": {
192                              "parameters": [
193                                  {
194                                      "name": "model",
195                                      "value": "llama-3.3-70b-versatile"
196                                  },
197                                  {
198                                      "name": "messages",
199                                      "value": "=[{\"role\": \"user\", \"content\": \"{{ $json.subtask }}\"}]"
200                                  },
201                                  {
202                                      "name": "max_tokens",
203                                      "value": 1000
204                                  }
205                              ]
206                          }
207                      }
208                  },
209                  {
210                      "name": "Process with Local Model",
211                      "type": "n8n-nodes-base.httpRequest",
212                      "position": [1050, 400],
213                      "parameters": {
214                          "method": "POST",
215                          "url": "http://localhost:11434/api/generate",
216                          "sendHeaders": True,
217                          "headerParameters": {
218                              "parameters": [
219                                  {
220                                      "name": "Content-Type",
221                                      "value": "application/json"
222                                  }
223                              ]
224                          },
225                          "sendBody": True,
226                          "bodyParameters": {
227                              "parameters": [
228                                  {
229                                      "name": "model",
230                                      "value": "mistral:7b"
231                                  },
232                                  {
233                                      "name": "prompt",
234                                      "value": "={{ $json.subtask }}"
235                                  },
236                                  {
237                                      "name": "stream",
238                                      "value": False
239                                  }
240                              ]
241                          }
242                      }
243                  },
244                  # Result Aggregation Node
245                  {
246                      "name": "Aggregate Results",
247                      "type": "n8n-nodes-base.merge",
248                      "position": [1250, 300],
249                      "parameters": {
250                          "mode": "combine",
251                          "outputFormat": "singleItem"
252                      }
253                  },
254                  # Final Synthesis Node
255                  {
256                      "name": "Synthesize Final Answer",
257                      "type": "n8n-nodes-base.code",
258                      "position": [1450, 300],
259                      "parameters": {
260                          "jsCode": """
261  // Combine all AI responses
262  const responses = $input.all();
263  
264  // Create final synthesis
265  const synthesis = {
266    original_task: $input.first().json.original_task,
267    decomposed_tasks: responses.map(r => r.json.subtask),
268    ai_responses: responses.map(r => r.json.response || r.json),
269    timestamp: new Date().toISOString(),
270    total_tasks: responses.length,
271    swarm_nodes_used: responses.length
272  };
273  
274  // Call swarm for final synthesis if available
275  try {
276    const final = await $http.post(
277      'http://localhost:8888/synthesize',
278      {
279        task: $input.first().json.original_task,
280        responses: responses.map(r => r.json)
281      }
282    );
283    synthesis.final_result = final.result;
284  } catch (e) {
285    synthesis.final_result = "Synthesis failed: " + e.message;
286  }
287  
288  return [{ json: synthesis }];
289  """
290                      }
291                  }
292              ],
293              "connections": {
294                  "Manual Trigger": {
295                      "main": [
296                          {
297                              "node": "Discover Swarm Nodes",
298                              "type": "main",
299                              "index": 0
300                          }
301                      ]
302                  },
303                  "Discover Swarm Nodes": {
304                      "main": [
305                          {
306                              "node": "Decompose AI Task",
307                              "type": "main",
308                              "index": 0
309                          }
310                      ]
311                  },
312                  "Decompose AI Task": {
313                      "main": [
314                          [
315                              {
316                                  "node": "Distribute to Swarm",
317                                  "type": "main",
318                                  "index": 0
319                              }
320                          ]
321                      ]
322                  },
323                  "Distribute to Swarm": {
324                      "main": [
325                          [
326                              {
327                                  "node": "Process with Groq AI",
328                                  "type": "main",
329                                  "index": 0
330                              },
331                              {
332                                  "node": "Process with Local Model",
333                                  "type": "main",
334                                  "index": 1
335                              }
336                          ]
337                      ]
338                  },
339                  "Process with Groq AI": {
340                      "main": [
341                          {
342                              "node": "Aggregate Results",
343                              "type": "main",
344                              "index": 0
345                          }
346                      ]
347                  },
348                  "Process with Local Model": {
349                      "main": [
350                          {
351                              "node": "Aggregate Results",
352                              "type": "main",
353                              "index": 1
354                          }
355                      ]
356                  },
357                  "Aggregate Results": {
358                      "main": [
359                          {
360                              "node": "Synthesize Final Answer",
361                              "type": "main",
362                              "index": 0
363                          }
364                      ]
365                  }
366              },
367              "settings": {},
368              "staticData": None,
369              "pinData": {}
370          }
371          
372          return workflow
373      
374      def deploy_workflow(self, workflow_json, n8n_api_key=None):
375          """Deploy workflow to n8n via API"""
376          try:
377              workflow_str = json.dumps(workflow_json)
378              
379              # Save to file
380              workflow_path = os.path.expanduser("~/swarm_workflow.json")
381              with open(workflow_path, 'w') as f:
382                  f.write(workflow_str)
383              
384              print(f"šŸ“ Workflow saved to: {workflow_path}")
385              print(f"šŸ”— Import manually at: http://localhost:{self.n8n_port}/workflow/new")
386              
387              # Try to auto-import via API
388              if n8n_api_key:
389                  import_url = f"http://localhost:{self.n8n_port}/rest/workflows"
390                  headers = {
391                      'X-N8N-API-KEY': n8n_api_key,
392                      'Content-Type': 'application/json'
393                  }
394                  
395                  req = urllib.request.Request(
396                      import_url,
397                      data=workflow_str.encode(),
398                      headers=headers,
399                      method='POST'
400                  )
401                  
402                  with urllib.request.urlopen(req) as response:
403                      result = json.loads(response.read().decode())
404                      print(f"āœ… Workflow imported with ID: {result.get('id')}")
405              
406              return True
407          except Exception as e:
408              print(f"āš ļø Workflow deployment failed: {e}")
409              return False
410      
411      def create_swarm_api_for_n8n(self):
412          """Create a simple API for n8n to interact with swarm"""
413          from http.server import HTTPServer, BaseHTTPRequestHandler
414          import threading
415          
416          class SwarmAPIHandler(BaseHTTPRequestHandler):
417              def do_GET(self):
418                  if self.path == '/workers':
419                      self.send_response(200)
420                      self.send_header('Content-Type', 'application/json')
421                      self.end_headers()
422                      
423                      # Simulate discovered workers
424                      workers = [
425                          {"id": "worker_1", "type": "groq", "status": "active"},
426                          {"id": "worker_2", "type": "local", "status": "active"},
427                          {"id": "worker_3", "type": "openrouter", "status": "ready"}
428                      ]
429                      
430                      self.wfile.write(json.dumps({"workers": workers}).encode())
431                      
432                  elif self.path == '/health':
433                      self.send_response(200)
434                      self.end_headers()
435                      self.wfile.write(b"OK")
436                      
437                  else:
438                      self.send_response(404)
439                      self.end_headers()
440              
441              def do_POST(self):
442                  content_length = int(self.headers['Content-Length'])
443                  post_data = self.rfile.read(content_length)
444                  data = json.loads(post_data.decode())
445                  
446                  if self.path == '/decompose':
447                      # Task decomposition endpoint
448                      prompt = data.get('prompt', '')
449                      
450                      # Simple decomposition logic
451                      words = prompt.split()
452                      tasks = []
453                      
454                      if len(words) > 10:
455                          # Break into chunks
456                          chunk_size = len(words) // 3
457                          for i in range(0, len(words), chunk_size):
458                              task = ' '.join(words[i:i+chunk_size])
459                              tasks.append(f"Process: {task}")
460                      else:
461                          tasks = [f"Analyze: {prompt}"]
462                      
463                      self.send_response(200)
464                      self.send_header('Content-Type', 'application/json')
465                      self.end_headers()
466                      self.wfile.write(json.dumps({"tasks": tasks}).encode())
467                      
468                  elif self.path == '/distribute':
469                      # Task distribution endpoint
470                      tasks = data.get('tasks', [])
471                      
472                      self.send_response(200)
473                      self.send_header('Content-Type', 'application/json')
474                      self.end_headers()
475                      
476                      response = {
477                          "distributed": True,
478                          "task_count": len(tasks),
479                          "message": f"Distributed {len(tasks)} tasks to swarm"
480                      }
481                      
482                      self.wfile.write(json.dumps(response).encode())
483                      
484                  elif self.path == '/synthesize':
485                      # Result synthesis endpoint
486                      task = data.get('task', '')
487                      responses = data.get('responses', [])
488                      
489                      # Simple synthesis
490                      synthesis = f"Based on {len(responses)} AI responses:\n\n"
491                      for i, resp in enumerate(responses):
492                          synthesis += f"Response {i+1}: {str(resp)[:100]}...\n"
493                      
494                      self.send_response(200)
495                      self.send_header('Content-Type', 'application/json')
496                      self.end_headers()
497                      self.wfile.write(json.dumps({"result": synthesis}).encode())
498                  
499                  else:
500                      self.send_response(404)
501                      self.end_headers()
502              
503              def log_message(self, format, *args):
504                  pass
505          
506          # Start API server in background thread
507          def run_api():
508              server = HTTPServer(('localhost', self.swarm_port), SwarmAPIHandler)
509              print(f"🌐 Swarm API listening on port {self.swarm_port}")
510              server.serve_forever()
511          
512          api_thread = threading.Thread(target=run_api, daemon=True)
513          api_thread.start()
514          return True
515      
516      def create_ai_workflow_templates(self):
517          """Create common AI workflow templates"""
518          templates = {
519              "security_audit": {
520                  "name": "Security Audit Workflow",
521                  "description": "Comprehensive security assessment using swarm AI",
522                  "steps": [
523                      "1. Input target system/network",
524                      "2. Decompose into security domains",
525                      "3. Parallel vulnerability assessment",
526                      "4. Threat modeling",
527                      "5. Generate remediation report"
528                  ]
529              },
530              "code_review": {
531                  "name": "AI-Powered Code Review",
532                  "description": "Multi-AI code analysis and optimization",
533                  "steps": [
534                      "1. Input codebase",
535                      "2. Static analysis (Groq)",
536                      "3. Security audit (Local model)",
537                      "4. Performance optimization (OpenRouter)",
538                      "5. Generate review report"
539                  ]
540              },
541              "research_assistant": {
542                  "name": "Research Assistant",
543                  "description": "Distributed research and synthesis",
544                  "steps": [
545                      "1. Research topic input",
546                      "2. Parallel information gathering",
547                      "3. Cross-reference validation",
548                      "4. Synthesis and citation",
549                      "5. Generate research paper"
550                  ]
551              }
552          }
553          
554          return templates
555  
556  def main():
557      import argparse
558      
559      parser = argparse.ArgumentParser(description="n8n + Swarm Integration")
560      parser.add_argument('--install', action='store_true', help='Install n8n')
561      parser.add_argument('--start', action='store_true', help='Start n8n and swarm API')
562      parser.add_argument('--create-workflow', action='store_true', help='Create swarm workflow')
563      parser.add_argument('--n8n-port', type=int, default=5678, help='n8n port')
564      parser.add_argument('--swarm-port', type=int, default=8888, help='Swarm API port')
565      
566      args = parser.parse_args()
567      
568      integrator = N8nSwarmIntegration(args.n8n_port, args.swarm_port)
569      
570      if args.install:
571          integrator.install_n8n()
572      
573      if args.start:
574          # Start n8n
575          if integrator.start_n8n():
576              # Start swarm API
577              integrator.create_swarm_api_for_n8n()
578              
579              print("\nāœ… System Ready!")
580              print(f"šŸ“Š n8n Dashboard: http://localhost:{args.n8n_port}")
581              print(f"šŸ”§ Swarm API: http://localhost:{args.swarm_port}")
582              print("\nšŸ’” Workflow templates available:")
583              templates = integrator.create_ai_workflow_templates()
584              for key, template in templates.items():
585                  print(f"  • {template['name']}: {template['description']}")
586      
587      if args.create_workflow:
588          workflow = integrator.create_swarm_workflow()
589          integrator.deploy_workflow(workflow)
590          
591          print("\nšŸ“‹ Workflow Nodes Created:")
592          for node in workflow['nodes']:
593              print(f"  • {node['name']} ({node['type'].split('.')[-1]})")
594  
595  if __name__ == "__main__":
596      main()