distributed_pipeline.py
1 #!/usr/bin/env python3 2 """ 3 Distributed Pipeline Architecture 4 5 Supports the Dell (transport) + Mac (compute) architecture: 6 7 Dell Server (thin transport): 8 - Runs collector mode 9 - Watches data sources: Monologue, Comet history 10 - Queues events for processing 11 - Exposes /queue/push and /queue/pull endpoints 12 - Minimal CPU usage 13 14 Mac Mini / MacBook (compute nodes): 15 - Run processor mode 16 - Pull events from Dell queue 17 - Run correlation and analysis 18 - Store results locally or push back 19 20 Data paths: 21 Dell: 22 - Monologue syncs via iCloud → Dell 23 - Comet history is local to Mac (needs rsync or sync service) 24 25 Mac: 26 - Can access Dell queue via HTTP 27 - Processes heavy correlation workloads 28 - Can run GPU-accelerated analysis if needed 29 30 Configuration is via environment variables or config file: 31 SOVEREIGN_ROLE=collector|processor|standalone 32 SOVEREIGN_QUEUE_URL=http://dell-server:5050 33 SOVEREIGN_DATA_SYNC_PATH=/path/to/synced/data 34 35 Usage: 36 # On Dell 37 export SOVEREIGN_ROLE=collector 38 python server/sovereign_server.py 39 40 # On Mac Mini 41 export SOVEREIGN_ROLE=processor 42 export SOVEREIGN_QUEUE_URL=http://192.168.1.100:5050 43 python -m core.attention.distributed_pipeline run 44 """ 45 46 import os 47 import json 48 import requests 49 from dataclasses import dataclass, asdict 50 from datetime import datetime, timedelta 51 from pathlib import Path 52 from typing import Dict, List, Optional, Any 53 import hashlib 54 import time 55 56 from .ingestion_pipeline import IngestionPipeline, PipelineMode, QueuedEvent 57 58 59 @dataclass 60 class DistributedConfig: 61 """Configuration for distributed pipeline.""" 62 role: str = 'standalone' # collector, processor, standalone 63 queue_url: Optional[str] = None # URL of queue server (for processor) 64 local_queue_path: Optional[str] = None # Path to local queue DB 65 data_sync_path: Optional[str] = None # Path to synced data (for collector) 66 67 # Timing 68 poll_interval: int = 30 # seconds between queue polls 69 batch_size: int = 50 # events per batch 70 71 @classmethod 72 def from_env(cls) -> 'DistributedConfig': 73 """Load configuration from environment variables.""" 74 return cls( 75 role=os.environ.get('SOVEREIGN_ROLE', 'standalone'), 76 queue_url=os.environ.get('SOVEREIGN_QUEUE_URL'), 77 local_queue_path=os.environ.get('SOVEREIGN_QUEUE_PATH'), 78 data_sync_path=os.environ.get('SOVEREIGN_DATA_SYNC_PATH'), 79 poll_interval=int(os.environ.get('SOVEREIGN_POLL_INTERVAL', 30)), 80 batch_size=int(os.environ.get('SOVEREIGN_BATCH_SIZE', 50)) 81 ) 82 83 84 class QueueClient: 85 """Client for remote queue operations.""" 86 87 def __init__(self, base_url: str, timeout: int = 30): 88 self.base_url = base_url.rstrip('/') 89 self.timeout = timeout 90 91 def push_events(self, events: List[Dict[str, Any]]) -> Dict[str, Any]: 92 """Push events to the remote queue.""" 93 try: 94 response = requests.post( 95 f"{self.base_url}/queue/push", 96 json={'events': events}, 97 timeout=self.timeout 98 ) 99 response.raise_for_status() 100 return response.json() 101 except Exception as e: 102 return {'error': str(e), 'success': False} 103 104 def pull_events(self, batch_size: int = 50) -> List[Dict[str, Any]]: 105 """Pull events from the remote queue.""" 106 try: 107 response = requests.get( 108 f"{self.base_url}/queue/pull", 109 params={'batch_size': batch_size}, 110 timeout=self.timeout 111 ) 112 response.raise_for_status() 113 return response.json().get('events', []) 114 except Exception as e: 115 print(f"[QueueClient] Pull error: {e}") 116 return [] 117 118 def ack_events(self, event_ids: List[str]) -> Dict[str, Any]: 119 """Acknowledge processed events.""" 120 try: 121 response = requests.post( 122 f"{self.base_url}/queue/ack", 123 json={'event_ids': event_ids}, 124 timeout=self.timeout 125 ) 126 response.raise_for_status() 127 return response.json() 128 except Exception as e: 129 return {'error': str(e), 'success': False} 130 131 def get_status(self) -> Dict[str, Any]: 132 """Get queue status.""" 133 try: 134 response = requests.get( 135 f"{self.base_url}/queue/status", 136 timeout=self.timeout 137 ) 138 response.raise_for_status() 139 return response.json() 140 except Exception as e: 141 return {'error': str(e)} 142 143 144 class DistributedCollector: 145 """ 146 Collector node - runs on Dell server. 147 148 Watches data sources and queues events for processing. 149 Exposes queue endpoints for processors to pull from. 150 """ 151 152 def __init__(self, config: DistributedConfig): 153 self.config = config 154 self.pipeline = IngestionPipeline(mode='collector') 155 156 # Track what we've collected 157 self._collection_stats = { 158 'total_collected': 0, 159 'last_collection': None 160 } 161 162 def collect_and_queue(self) -> int: 163 """Run one collection cycle.""" 164 collected = self.pipeline.collect_new_events() 165 if collected > 0: 166 self._collection_stats['total_collected'] += collected 167 self._collection_stats['last_collection'] = datetime.now().isoformat() 168 return collected 169 170 def get_queue_stats(self) -> Dict[str, Any]: 171 """Get queue statistics.""" 172 state = self.pipeline.get_state() 173 return { 174 'role': 'collector', 175 'queue_size': state['queue_size'], 176 'total_collected': self._collection_stats['total_collected'], 177 'last_collection': self._collection_stats['last_collection'] 178 } 179 180 181 class DistributedProcessor: 182 """ 183 Processor node - runs on Mac Mini/MacBook. 184 185 Pulls events from remote queue and runs correlation. 186 """ 187 188 def __init__(self, config: DistributedConfig): 189 self.config = config 190 self.queue_client = QueueClient(config.queue_url) if config.queue_url else None 191 self.pipeline = IngestionPipeline(mode='processor') 192 193 # Stats 194 self._processing_stats = { 195 'total_processed': 0, 196 'correlations_found': 0, 197 'last_processing': None 198 } 199 200 def process_batch(self) -> Dict[str, Any]: 201 """Pull and process a batch of events.""" 202 if not self.queue_client: 203 return {'error': 'No queue URL configured'} 204 205 # Pull events from remote queue 206 events = self.queue_client.pull_events(batch_size=self.config.batch_size) 207 if not events: 208 return {'events_processed': 0, 'pulled': 0} 209 210 # Convert to local format and queue 211 event_ids = [] 212 for event_data in events: 213 event_ids.append(event_data.get('id')) 214 # Add to local queue for processing 215 # (The processor pipeline will handle correlation) 216 217 # Process locally 218 processed = self.pipeline.process_queue(batch_size=len(events)) 219 220 # Acknowledge processed events 221 if event_ids: 222 self.queue_client.ack_events(event_ids) 223 224 self._processing_stats['total_processed'] += processed 225 self._processing_stats['last_processing'] = datetime.now().isoformat() 226 227 return { 228 'pulled': len(events), 229 'events_processed': processed, 230 'correlations_found': self.pipeline.state.correlations_found 231 } 232 233 def get_stats(self) -> Dict[str, Any]: 234 """Get processing statistics.""" 235 queue_status = self.queue_client.get_status() if self.queue_client else {} 236 return { 237 'role': 'processor', 238 'queue_url': self.config.queue_url, 239 'remote_queue_status': queue_status, 240 **self._processing_stats 241 } 242 243 244 # Flask endpoints for queue (add to server when role=collector) 245 def add_queue_endpoints(app, collector: DistributedCollector): 246 """Add queue endpoints to Flask app.""" 247 from flask import request, jsonify 248 249 @app.route('/queue/status') 250 def queue_status(): 251 return jsonify(collector.get_queue_stats()) 252 253 @app.route('/queue/push', methods=['POST']) 254 def queue_push(): 255 """Push events to the queue (from other collectors).""" 256 data = request.get_json() or {} 257 events = data.get('events', []) 258 # Store in local queue 259 # ... implementation 260 return jsonify({'success': True, 'queued': len(events)}) 261 262 @app.route('/queue/pull', methods=['GET']) 263 def queue_pull(): 264 """Pull events from the queue (for processors).""" 265 batch_size = int(request.args.get('batch_size', 50)) 266 267 # Get unprocessed events from local queue DB 268 import sqlite3 269 db_path = collector.pipeline.data_dir / 'event_queue.db' 270 conn = sqlite3.connect(str(db_path)) 271 272 cursor = conn.execute(""" 273 SELECT id, source, timestamp, content, metadata 274 FROM events 275 WHERE processed = 0 276 ORDER BY timestamp 277 LIMIT ? 278 """, (batch_size,)) 279 280 events = [] 281 for row in cursor.fetchall(): 282 events.append({ 283 'id': row[0], 284 'source': row[1], 285 'timestamp': row[2], 286 'content': row[3], 287 'metadata': json.loads(row[4]) if row[4] else {} 288 }) 289 290 conn.close() 291 return jsonify({'events': events, 'count': len(events)}) 292 293 @app.route('/queue/ack', methods=['POST']) 294 def queue_ack(): 295 """Acknowledge processed events.""" 296 data = request.get_json() or {} 297 event_ids = data.get('event_ids', []) 298 299 if not event_ids: 300 return jsonify({'success': True, 'acknowledged': 0}) 301 302 import sqlite3 303 db_path = collector.pipeline.data_dir / 'event_queue.db' 304 conn = sqlite3.connect(str(db_path)) 305 306 placeholders = ','.join('?' * len(event_ids)) 307 conn.execute(f""" 308 UPDATE events SET processed = 1 309 WHERE id IN ({placeholders}) 310 """, event_ids) 311 312 conn.commit() 313 conn.close() 314 315 return jsonify({'success': True, 'acknowledged': len(event_ids)}) 316 317 318 def create_distributed_node(config: Optional[DistributedConfig] = None): 319 """Create the appropriate node based on configuration.""" 320 config = config or DistributedConfig.from_env() 321 322 if config.role == 'collector': 323 return DistributedCollector(config) 324 elif config.role == 'processor': 325 return DistributedProcessor(config) 326 else: 327 # Standalone - return regular pipeline 328 return IngestionPipeline(mode='standalone') 329 330 331 if __name__ == '__main__': 332 import sys 333 334 config = DistributedConfig.from_env() 335 print(f"Running as {config.role}...") 336 337 if config.role == 'collector': 338 collector = DistributedCollector(config) 339 while True: 340 collected = collector.collect_and_queue() 341 print(f"Collected {collected} events. Queue: {collector.get_queue_stats()}") 342 time.sleep(config.poll_interval) 343 344 elif config.role == 'processor': 345 processor = DistributedProcessor(config) 346 while True: 347 result = processor.process_batch() 348 print(f"Processing result: {result}") 349 time.sleep(config.poll_interval) 350 351 else: 352 # Standalone 353 pipeline = IngestionPipeline(mode='standalone') 354 pipeline.run()