device_fleet.py
1 """ 2 Device Fleet Manager - Distributed Inference Orchestration 3 4 Maps your physical devices to their compute capabilities and 5 orchestrates inference across the fleet. 6 7 Your Fleet: 8 - MacBook Pro: Primary workstation, GPU, always with you 9 - Mac Mini: Home server, always-on, background processing 10 - iPhone: Mobile, battery-constrained, edge inference 11 - iPad: Tablet, moderate compute, reading/annotation 12 - iFlyTek: Dedicated transcription device, always recording 13 14 Philosophy: 15 - Each device has a model appropriate to its role 16 - Devices sync state via Hypercore 17 - Tasks flow to the appropriate device 18 - Fallback to cloud when local fails 19 """ 20 21 from dataclasses import dataclass, field 22 from enum import Enum 23 from typing import Optional, List, Dict, Any 24 import json 25 import urllib.request 26 import os 27 28 29 class DeviceRole(Enum): 30 """Primary role of each device.""" 31 WORKSTATION = "workstation" # Primary development 32 SERVER = "server" # Always-on, background 33 MOBILE = "mobile" # On-the-go, battery 34 TABLET = "tablet" # Reading, annotation 35 DEDICATED = "dedicated" # Single-purpose device 36 WEARABLE = "wearable" # Watch, glasses 37 38 39 class PowerState(Enum): 40 """Power state affects routing decisions.""" 41 PLUGGED = "plugged" 42 BATTERY_HIGH = "battery_high" # >50% 43 BATTERY_LOW = "battery_low" # 20-50% 44 BATTERY_CRITICAL = "battery_critical" # <20% 45 46 47 @dataclass 48 class DeviceCapabilities: 49 """What this device can do.""" 50 max_model_params: int # In billions (e.g., 7 for 7B) 51 gpu_available: bool 52 gpu_vram_gb: float = 0 53 ram_gb: float = 8 54 disk_gb: float = 256 55 neural_engine: bool = False # Apple Neural Engine 56 always_on: bool = False 57 battery_powered: bool = False 58 59 60 @dataclass 61 class ModelDeployment: 62 """A model deployed on a device.""" 63 name: str 64 params_b: float # Billions of params 65 quantization: str = "q4" # q4, q8, f16 66 tasks: List[str] = field(default_factory=list) 67 endpoint: str = "" # How to reach it 68 69 70 @dataclass 71 class Device: 72 """A device in your fleet.""" 73 id: str 74 name: str 75 role: DeviceRole 76 capabilities: DeviceCapabilities 77 models: List[ModelDeployment] = field(default_factory=list) 78 tailscale_ip: Optional[str] = None 79 power_state: PowerState = PowerState.PLUGGED 80 online: bool = True 81 82 def can_run(self, model_params_b: float) -> bool: 83 """Check if device can run a model of this size.""" 84 return model_params_b <= self.capabilities.max_model_params 85 86 def current_capacity(self) -> float: 87 """Available capacity (0-1) based on power state.""" 88 if not self.online: 89 return 0 90 if self.power_state == PowerState.BATTERY_CRITICAL: 91 return 0.1 92 if self.power_state == PowerState.BATTERY_LOW: 93 return 0.5 94 if self.power_state == PowerState.BATTERY_HIGH: 95 return 0.8 96 return 1.0 97 98 99 class DeviceFleet: 100 """ 101 Manages your fleet of devices for distributed inference. 102 103 Key responsibilities: 104 1. Track device capabilities and status 105 2. Route tasks to appropriate devices 106 3. Handle device failures gracefully 107 4. Sync state across devices 108 """ 109 110 def __init__(self): 111 self.devices: Dict[str, Device] = {} 112 self.hypercore_url = "http://localhost:7777" 113 self._configure_fleet() 114 115 def _configure_fleet(self): 116 """Configure your specific device fleet.""" 117 118 # MacBook Pro - Primary workstation 119 self.devices["macbook-pro"] = Device( 120 id="macbook-pro", 121 name="Rick's MacBook Pro", 122 role=DeviceRole.WORKSTATION, 123 capabilities=DeviceCapabilities( 124 max_model_params=30, # Can run 30B with unified memory 125 gpu_available=True, 126 gpu_vram_gb=36, # Unified memory 127 ram_gb=36, 128 disk_gb=1000, 129 neural_engine=True, 130 battery_powered=True, 131 ), 132 models=[ 133 ModelDeployment( 134 name="qwen2.5:32b", 135 params_b=32, 136 quantization="q4", 137 tasks=["reasoning", "code", "analysis"], 138 endpoint="ollama:qwen2.5:32b" 139 ), 140 ModelDeployment( 141 name="qwen2.5:7b", 142 params_b=7, 143 quantization="q4", 144 tasks=["general", "summarization"], 145 endpoint="ollama:qwen2.5:7b" 146 ), 147 ModelDeployment( 148 name="nomic-embed-text", 149 params_b=0.1, 150 tasks=["embedding"], 151 endpoint="ollama:nomic-embed-text" 152 ), 153 ], 154 tailscale_ip=None, # Will be discovered 155 ) 156 157 # Mac Mini - Home server 158 self.devices["mac-mini"] = Device( 159 id="mac-mini", 160 name="Rick's Mac Mini", 161 role=DeviceRole.SERVER, 162 capabilities=DeviceCapabilities( 163 max_model_params=14, # M2 Pro, 16GB 164 gpu_available=True, 165 gpu_vram_gb=16, 166 ram_gb=16, 167 disk_gb=500, 168 neural_engine=True, 169 always_on=True, 170 ), 171 models=[ 172 ModelDeployment( 173 name="qwen2.5:14b", 174 params_b=14, 175 quantization="q4", 176 tasks=["background", "indexing", "summarization"], 177 endpoint="ollama:qwen2.5:14b" 178 ), 179 ModelDeployment( 180 name="whisper-large", 181 params_b=1.5, 182 tasks=["transcription"], 183 endpoint="whisper:large" 184 ), 185 ModelDeployment( 186 name="all-minilm", 187 params_b=0.02, 188 tasks=["embedding"], 189 endpoint="ollama:all-minilm" 190 ), 191 ], 192 tailscale_ip=None, 193 ) 194 195 # iPhone - Mobile 196 self.devices["iphone"] = Device( 197 id="iphone", 198 name="Rick's iPhone", 199 role=DeviceRole.MOBILE, 200 capabilities=DeviceCapabilities( 201 max_model_params=3, # Limited by RAM 202 gpu_available=True, 203 gpu_vram_gb=6, # Shared with system 204 ram_gb=6, 205 disk_gb=256, 206 neural_engine=True, 207 battery_powered=True, 208 ), 209 models=[ 210 ModelDeployment( 211 name="phi-3-mini", 212 params_b=3.8, 213 quantization="q4", 214 tasks=["quick-qa", "classification"], 215 endpoint="mlc:phi-3-mini" 216 ), 217 ModelDeployment( 218 name="whisper-tiny", 219 params_b=0.039, 220 tasks=["transcription"], 221 endpoint="whisper:tiny" 222 ), 223 ], 224 power_state=PowerState.BATTERY_HIGH, 225 ) 226 227 # iPad - Tablet 228 self.devices["ipad"] = Device( 229 id="ipad", 230 name="Rick's iPad", 231 role=DeviceRole.TABLET, 232 capabilities=DeviceCapabilities( 233 max_model_params=7, 234 gpu_available=True, 235 gpu_vram_gb=8, 236 ram_gb=8, 237 disk_gb=256, 238 neural_engine=True, 239 battery_powered=True, 240 ), 241 models=[ 242 ModelDeployment( 243 name="qwen2.5:3b", 244 params_b=3, 245 quantization="q4", 246 tasks=["summarization", "annotation"], 247 endpoint="mlc:qwen2.5:3b" 248 ), 249 ], 250 power_state=PowerState.BATTERY_HIGH, 251 ) 252 253 # iFlyTek - Dedicated transcription 254 self.devices["iflytek"] = Device( 255 id="iflytek", 256 name="iFlyTek Recorder", 257 role=DeviceRole.DEDICATED, 258 capabilities=DeviceCapabilities( 259 max_model_params=0.1, # Tiny models only 260 gpu_available=False, 261 ram_gb=1, 262 disk_gb=32, 263 ), 264 models=[ 265 ModelDeployment( 266 name="iflytek-asr", 267 params_b=0.05, 268 tasks=["live-transcription", "recording"], 269 endpoint="builtin" 270 ), 271 ], 272 ) 273 274 def get_device(self, device_id: str) -> Optional[Device]: 275 """Get a device by ID.""" 276 return self.devices.get(device_id) 277 278 def get_available_devices(self) -> List[Device]: 279 """Get all online devices with capacity.""" 280 return [ 281 d for d in self.devices.values() 282 if d.online and d.current_capacity() > 0 283 ] 284 285 def find_device_for_task( 286 self, 287 task: str, 288 min_params: float = 0, 289 prefer_local: bool = True 290 ) -> Optional[Device]: 291 """ 292 Find the best device to handle a task. 293 294 Args: 295 task: Task type (e.g., "transcription", "reasoning") 296 min_params: Minimum model size needed 297 prefer_local: Prefer local device over network 298 299 Returns: 300 Best device, or None if no device available 301 """ 302 candidates = [] 303 304 for device in self.get_available_devices(): 305 for model in device.models: 306 if task in model.tasks and model.params_b >= min_params: 307 score = self._score_device(device, model, prefer_local) 308 candidates.append((device, model, score)) 309 310 if not candidates: 311 return None 312 313 # Sort by score descending 314 candidates.sort(key=lambda x: x[2], reverse=True) 315 return candidates[0][0] 316 317 def _score_device( 318 self, 319 device: Device, 320 model: ModelDeployment, 321 prefer_local: bool 322 ) -> float: 323 """Score a device for task routing.""" 324 score = 0.0 325 326 # Base score from capacity 327 score += device.current_capacity() * 10 328 329 # Prefer always-on for background tasks 330 if device.capabilities.always_on: 331 score += 5 332 333 # Prefer local device (current machine) 334 if prefer_local and device.id == self._get_current_device_id(): 335 score += 20 336 337 # Prefer larger models (more capable) 338 score += min(model.params_b, 10) 339 340 # Penalize battery devices 341 if device.capabilities.battery_powered: 342 score -= 5 343 344 return score 345 346 def _get_current_device_id(self) -> str: 347 """Identify the current device.""" 348 hostname = os.uname().nodename.lower() 349 if "macbook" in hostname: 350 return "macbook-pro" 351 elif "mini" in hostname: 352 return "mac-mini" 353 return "unknown" 354 355 def route_inference( 356 self, 357 query: str, 358 task_type: str, 359 context: Dict[str, Any] = None 360 ) -> Dict[str, Any]: 361 """ 362 Route an inference request to the best device. 363 364 Returns routing decision with device, model, and endpoint. 365 """ 366 # Estimate complexity 367 complexity = self._estimate_complexity(query, context) 368 369 # Find appropriate device 370 device = self.find_device_for_task( 371 task=task_type, 372 min_params=complexity.min_params, 373 prefer_local=complexity.prefer_local 374 ) 375 376 if not device: 377 # Fallback to cloud 378 return { 379 "device": None, 380 "model": "claude-sonnet-4", 381 "endpoint": "anthropic", 382 "reason": "No local device available", 383 "complexity": complexity.__dict__ 384 } 385 386 # Find the best model on this device 387 best_model = None 388 for model in device.models: 389 if task_type in model.tasks: 390 if best_model is None or model.params_b > best_model.params_b: 391 best_model = model 392 393 return { 394 "device": device.id, 395 "device_name": device.name, 396 "model": best_model.name if best_model else None, 397 "endpoint": best_model.endpoint if best_model else None, 398 "reason": f"Routed to {device.name} ({device.role.value})", 399 "complexity": complexity.__dict__ 400 } 401 402 def _estimate_complexity( 403 self, 404 query: str, 405 context: Dict = None 406 ): 407 """Estimate query complexity for routing.""" 408 @dataclass 409 class Complexity: 410 min_params: float 411 prefer_local: bool 412 estimated_tokens: int 413 414 tokens = len(query.split()) * 1.3 415 context_tokens = 0 416 if context: 417 context_tokens = sum( 418 len(str(v).split()) for v in context.values() 419 ) * 1.3 420 421 # Simple heuristics 422 needs_reasoning = any(w in query.lower() for w in 423 ["why", "how", "explain", "analyze", "compare"]) 424 needs_code = "```" in query or any(w in query.lower() for w in 425 ["implement", "code", "function", "class"]) 426 427 if needs_code: 428 min_params = 7 # Need decent code model 429 elif needs_reasoning: 430 min_params = 3 # Can use smaller for basic reasoning 431 else: 432 min_params = 0.5 # Tiny model sufficient 433 434 # Prefer local unless very complex 435 prefer_local = (tokens + context_tokens) < 2000 436 437 return Complexity( 438 min_params=min_params, 439 prefer_local=prefer_local, 440 estimated_tokens=int(tokens + context_tokens) 441 ) 442 443 def sync_state_to_hypercore(self): 444 """Sync fleet state to Hypercore for cross-device awareness.""" 445 state = { 446 "devices": { 447 d.id: { 448 "name": d.name, 449 "role": d.role.value, 450 "online": d.online, 451 "power_state": d.power_state.value, 452 "capacity": d.current_capacity(), 453 "models": [m.name for m in d.models] 454 } 455 for d in self.devices.values() 456 }, 457 "current_device": self._get_current_device_id() 458 } 459 460 try: 461 data = json.dumps({ 462 "type": "fleet_state", 463 "fleet": state 464 }).encode('utf-8') 465 466 req = urllib.request.Request( 467 f"{self.hypercore_url}/event", 468 data=data, 469 headers={"Content-Type": "application/json"}, 470 method="POST" 471 ) 472 473 with urllib.request.urlopen(req, timeout=5): 474 pass 475 except Exception as e: 476 print(f"Failed to sync fleet state: {e}") 477 478 def print_fleet_status(self): 479 """Print current fleet status.""" 480 print("\n" + "=" * 60) 481 print("SOVEREIGN DEVICE FLEET") 482 print("=" * 60) 483 484 for device in self.devices.values(): 485 status = "🟢" if device.online else "🔴" 486 power = "" 487 if device.capabilities.battery_powered: 488 power = f" [{device.power_state.value}]" 489 490 print(f"\n{status} {device.name}{power}") 491 print(f" Role: {device.role.value}") 492 print(f" Max Model: {device.capabilities.max_model_params}B") 493 print(f" Capacity: {device.current_capacity() * 100:.0f}%") 494 print(f" Models:") 495 for model in device.models: 496 print(f" - {model.name} ({model.params_b}B): {', '.join(model.tasks)}") 497 498 print("\n" + "=" * 60) 499 500 501 # Singleton instance 502 _fleet: Optional[DeviceFleet] = None 503 504 505 def get_fleet() -> DeviceFleet: 506 """Get or create the global fleet instance.""" 507 global _fleet 508 if _fleet is None: 509 _fleet = DeviceFleet() 510 return _fleet 511 512 513 if __name__ == "__main__": 514 fleet = get_fleet() 515 fleet.print_fleet_status() 516 517 # Test routing 518 print("\n" + "=" * 60) 519 print("ROUTING TESTS") 520 print("=" * 60) 521 522 test_queries = [ 523 ("transcription", "Transcribe this audio file"), 524 ("summarization", "Summarize this document"), 525 ("reasoning", "Why does gravity work?"), 526 ("code", "Implement a binary search tree"), 527 ] 528 529 for task, query in test_queries: 530 result = fleet.route_inference(query, task) 531 print(f"\nTask: {task}") 532 print(f"Query: {query[:40]}...") 533 print(f"→ Device: {result.get('device_name', 'Cloud')}") 534 print(f"→ Model: {result.get('model')}") 535 print(f"→ Reason: {result.get('reason')}")