/ core / attention / distributed_pipeline.py
distributed_pipeline.py
  1  #!/usr/bin/env python3
  2  """
  3  Distributed Pipeline Architecture
  4  
  5  Supports the Dell (transport) + Mac (compute) architecture:
  6  
  7  Dell Server (thin transport):
  8      - Runs collector mode
  9      - Watches data sources: Monologue, Comet history
 10      - Queues events for processing
 11      - Exposes /queue/push and /queue/pull endpoints
 12      - Minimal CPU usage
 13  
 14  Mac Mini / MacBook (compute nodes):
 15      - Run processor mode
 16      - Pull events from Dell queue
 17      - Run correlation and analysis
 18      - Store results locally or push back
 19  
 20  Data paths:
 21      Dell:
 22          - Monologue syncs via iCloud → Dell
 23          - Comet history is local to Mac (needs rsync or sync service)
 24  
 25      Mac:
 26          - Can access Dell queue via HTTP
 27          - Processes heavy correlation workloads
 28          - Can run GPU-accelerated analysis if needed
 29  
 30  Configuration is via environment variables or config file:
 31      SOVEREIGN_ROLE=collector|processor|standalone
 32      SOVEREIGN_QUEUE_URL=http://dell-server:5050
 33      SOVEREIGN_DATA_SYNC_PATH=/path/to/synced/data
 34  
 35  Usage:
 36      # On Dell
 37      export SOVEREIGN_ROLE=collector
 38      python server/sovereign_server.py
 39  
 40      # On Mac Mini
 41      export SOVEREIGN_ROLE=processor
 42      export SOVEREIGN_QUEUE_URL=http://192.168.1.100:5050
 43      python -m core.attention.distributed_pipeline run
 44  """
 45  
 46  import os
 47  import json
 48  import requests
 49  from dataclasses import dataclass, asdict
 50  from datetime import datetime, timedelta
 51  from pathlib import Path
 52  from typing import Dict, List, Optional, Any
 53  import hashlib
 54  import time
 55  
 56  from .ingestion_pipeline import IngestionPipeline, PipelineMode, QueuedEvent
 57  
 58  
 59  @dataclass
 60  class DistributedConfig:
 61      """Configuration for distributed pipeline."""
 62      role: str = 'standalone'  # collector, processor, standalone
 63      queue_url: Optional[str] = None  # URL of queue server (for processor)
 64      local_queue_path: Optional[str] = None  # Path to local queue DB
 65      data_sync_path: Optional[str] = None  # Path to synced data (for collector)
 66  
 67      # Timing
 68      poll_interval: int = 30  # seconds between queue polls
 69      batch_size: int = 50  # events per batch
 70  
 71      @classmethod
 72      def from_env(cls) -> 'DistributedConfig':
 73          """Load configuration from environment variables."""
 74          return cls(
 75              role=os.environ.get('SOVEREIGN_ROLE', 'standalone'),
 76              queue_url=os.environ.get('SOVEREIGN_QUEUE_URL'),
 77              local_queue_path=os.environ.get('SOVEREIGN_QUEUE_PATH'),
 78              data_sync_path=os.environ.get('SOVEREIGN_DATA_SYNC_PATH'),
 79              poll_interval=int(os.environ.get('SOVEREIGN_POLL_INTERVAL', 30)),
 80              batch_size=int(os.environ.get('SOVEREIGN_BATCH_SIZE', 50))
 81          )
 82  
 83  
 84  class QueueClient:
 85      """Client for remote queue operations."""
 86  
 87      def __init__(self, base_url: str, timeout: int = 30):
 88          self.base_url = base_url.rstrip('/')
 89          self.timeout = timeout
 90  
 91      def push_events(self, events: List[Dict[str, Any]]) -> Dict[str, Any]:
 92          """Push events to the remote queue."""
 93          try:
 94              response = requests.post(
 95                  f"{self.base_url}/queue/push",
 96                  json={'events': events},
 97                  timeout=self.timeout
 98              )
 99              response.raise_for_status()
100              return response.json()
101          except Exception as e:
102              return {'error': str(e), 'success': False}
103  
104      def pull_events(self, batch_size: int = 50) -> List[Dict[str, Any]]:
105          """Pull events from the remote queue."""
106          try:
107              response = requests.get(
108                  f"{self.base_url}/queue/pull",
109                  params={'batch_size': batch_size},
110                  timeout=self.timeout
111              )
112              response.raise_for_status()
113              return response.json().get('events', [])
114          except Exception as e:
115              print(f"[QueueClient] Pull error: {e}")
116              return []
117  
118      def ack_events(self, event_ids: List[str]) -> Dict[str, Any]:
119          """Acknowledge processed events."""
120          try:
121              response = requests.post(
122                  f"{self.base_url}/queue/ack",
123                  json={'event_ids': event_ids},
124                  timeout=self.timeout
125              )
126              response.raise_for_status()
127              return response.json()
128          except Exception as e:
129              return {'error': str(e), 'success': False}
130  
131      def get_status(self) -> Dict[str, Any]:
132          """Get queue status."""
133          try:
134              response = requests.get(
135                  f"{self.base_url}/queue/status",
136                  timeout=self.timeout
137              )
138              response.raise_for_status()
139              return response.json()
140          except Exception as e:
141              return {'error': str(e)}
142  
143  
144  class DistributedCollector:
145      """
146      Collector node - runs on Dell server.
147  
148      Watches data sources and queues events for processing.
149      Exposes queue endpoints for processors to pull from.
150      """
151  
152      def __init__(self, config: DistributedConfig):
153          self.config = config
154          self.pipeline = IngestionPipeline(mode='collector')
155  
156          # Track what we've collected
157          self._collection_stats = {
158              'total_collected': 0,
159              'last_collection': None
160          }
161  
162      def collect_and_queue(self) -> int:
163          """Run one collection cycle."""
164          collected = self.pipeline.collect_new_events()
165          if collected > 0:
166              self._collection_stats['total_collected'] += collected
167              self._collection_stats['last_collection'] = datetime.now().isoformat()
168          return collected
169  
170      def get_queue_stats(self) -> Dict[str, Any]:
171          """Get queue statistics."""
172          state = self.pipeline.get_state()
173          return {
174              'role': 'collector',
175              'queue_size': state['queue_size'],
176              'total_collected': self._collection_stats['total_collected'],
177              'last_collection': self._collection_stats['last_collection']
178          }
179  
180  
181  class DistributedProcessor:
182      """
183      Processor node - runs on Mac Mini/MacBook.
184  
185      Pulls events from remote queue and runs correlation.
186      """
187  
188      def __init__(self, config: DistributedConfig):
189          self.config = config
190          self.queue_client = QueueClient(config.queue_url) if config.queue_url else None
191          self.pipeline = IngestionPipeline(mode='processor')
192  
193          # Stats
194          self._processing_stats = {
195              'total_processed': 0,
196              'correlations_found': 0,
197              'last_processing': None
198          }
199  
200      def process_batch(self) -> Dict[str, Any]:
201          """Pull and process a batch of events."""
202          if not self.queue_client:
203              return {'error': 'No queue URL configured'}
204  
205          # Pull events from remote queue
206          events = self.queue_client.pull_events(batch_size=self.config.batch_size)
207          if not events:
208              return {'events_processed': 0, 'pulled': 0}
209  
210          # Convert to local format and queue
211          event_ids = []
212          for event_data in events:
213              event_ids.append(event_data.get('id'))
214              # Add to local queue for processing
215              # (The processor pipeline will handle correlation)
216  
217          # Process locally
218          processed = self.pipeline.process_queue(batch_size=len(events))
219  
220          # Acknowledge processed events
221          if event_ids:
222              self.queue_client.ack_events(event_ids)
223  
224          self._processing_stats['total_processed'] += processed
225          self._processing_stats['last_processing'] = datetime.now().isoformat()
226  
227          return {
228              'pulled': len(events),
229              'events_processed': processed,
230              'correlations_found': self.pipeline.state.correlations_found
231          }
232  
233      def get_stats(self) -> Dict[str, Any]:
234          """Get processing statistics."""
235          queue_status = self.queue_client.get_status() if self.queue_client else {}
236          return {
237              'role': 'processor',
238              'queue_url': self.config.queue_url,
239              'remote_queue_status': queue_status,
240              **self._processing_stats
241          }
242  
243  
244  # Flask endpoints for queue (add to server when role=collector)
245  def add_queue_endpoints(app, collector: DistributedCollector):
246      """Add queue endpoints to Flask app."""
247      from flask import request, jsonify
248  
249      @app.route('/queue/status')
250      def queue_status():
251          return jsonify(collector.get_queue_stats())
252  
253      @app.route('/queue/push', methods=['POST'])
254      def queue_push():
255          """Push events to the queue (from other collectors)."""
256          data = request.get_json() or {}
257          events = data.get('events', [])
258          # Store in local queue
259          # ... implementation
260          return jsonify({'success': True, 'queued': len(events)})
261  
262      @app.route('/queue/pull', methods=['GET'])
263      def queue_pull():
264          """Pull events from the queue (for processors)."""
265          batch_size = int(request.args.get('batch_size', 50))
266  
267          # Get unprocessed events from local queue DB
268          import sqlite3
269          db_path = collector.pipeline.data_dir / 'event_queue.db'
270          conn = sqlite3.connect(str(db_path))
271  
272          cursor = conn.execute("""
273              SELECT id, source, timestamp, content, metadata
274              FROM events
275              WHERE processed = 0
276              ORDER BY timestamp
277              LIMIT ?
278          """, (batch_size,))
279  
280          events = []
281          for row in cursor.fetchall():
282              events.append({
283                  'id': row[0],
284                  'source': row[1],
285                  'timestamp': row[2],
286                  'content': row[3],
287                  'metadata': json.loads(row[4]) if row[4] else {}
288              })
289  
290          conn.close()
291          return jsonify({'events': events, 'count': len(events)})
292  
293      @app.route('/queue/ack', methods=['POST'])
294      def queue_ack():
295          """Acknowledge processed events."""
296          data = request.get_json() or {}
297          event_ids = data.get('event_ids', [])
298  
299          if not event_ids:
300              return jsonify({'success': True, 'acknowledged': 0})
301  
302          import sqlite3
303          db_path = collector.pipeline.data_dir / 'event_queue.db'
304          conn = sqlite3.connect(str(db_path))
305  
306          placeholders = ','.join('?' * len(event_ids))
307          conn.execute(f"""
308              UPDATE events SET processed = 1
309              WHERE id IN ({placeholders})
310          """, event_ids)
311  
312          conn.commit()
313          conn.close()
314  
315          return jsonify({'success': True, 'acknowledged': len(event_ids)})
316  
317  
318  def create_distributed_node(config: Optional[DistributedConfig] = None):
319      """Create the appropriate node based on configuration."""
320      config = config or DistributedConfig.from_env()
321  
322      if config.role == 'collector':
323          return DistributedCollector(config)
324      elif config.role == 'processor':
325          return DistributedProcessor(config)
326      else:
327          # Standalone - return regular pipeline
328          return IngestionPipeline(mode='standalone')
329  
330  
331  if __name__ == '__main__':
332      import sys
333  
334      config = DistributedConfig.from_env()
335      print(f"Running as {config.role}...")
336  
337      if config.role == 'collector':
338          collector = DistributedCollector(config)
339          while True:
340              collected = collector.collect_and_queue()
341              print(f"Collected {collected} events. Queue: {collector.get_queue_stats()}")
342              time.sleep(config.poll_interval)
343  
344      elif config.role == 'processor':
345          processor = DistributedProcessor(config)
346          while True:
347              result = processor.process_batch()
348              print(f"Processing result: {result}")
349              time.sleep(config.poll_interval)
350  
351      else:
352          # Standalone
353          pipeline = IngestionPipeline(mode='standalone')
354          pipeline.run()