/ core / edge / device_fleet.py
device_fleet.py
  1  """
  2  Device Fleet Manager - Distributed Inference Orchestration
  3  
  4  Maps your physical devices to their compute capabilities and
  5  orchestrates inference across the fleet.
  6  
  7  Your Fleet:
  8  - MacBook Pro: Primary workstation, GPU, always with you
  9  - Mac Mini: Home server, always-on, background processing
 10  - iPhone: Mobile, battery-constrained, edge inference
 11  - iPad: Tablet, moderate compute, reading/annotation
 12  - iFlyTek: Dedicated transcription device, always recording
 13  
 14  Philosophy:
 15  - Each device has a model appropriate to its role
 16  - Devices sync state via Hypercore
 17  - Tasks flow to the appropriate device
 18  - Fallback to cloud when local fails
 19  """
 20  
 21  from dataclasses import dataclass, field
 22  from enum import Enum
 23  from typing import Optional, List, Dict, Any
 24  import json
 25  import urllib.request
 26  import os
 27  
 28  
 29  class DeviceRole(Enum):
 30      """Primary role of each device."""
 31      WORKSTATION = "workstation"      # Primary development
 32      SERVER = "server"                # Always-on, background
 33      MOBILE = "mobile"                # On-the-go, battery
 34      TABLET = "tablet"                # Reading, annotation
 35      DEDICATED = "dedicated"          # Single-purpose device
 36      WEARABLE = "wearable"            # Watch, glasses
 37  
 38  
 39  class PowerState(Enum):
 40      """Power state affects routing decisions."""
 41      PLUGGED = "plugged"
 42      BATTERY_HIGH = "battery_high"    # >50%
 43      BATTERY_LOW = "battery_low"      # 20-50%
 44      BATTERY_CRITICAL = "battery_critical"  # <20%
 45  
 46  
 47  @dataclass
 48  class DeviceCapabilities:
 49      """What this device can do."""
 50      max_model_params: int           # In billions (e.g., 7 for 7B)
 51      gpu_available: bool
 52      gpu_vram_gb: float = 0
 53      ram_gb: float = 8
 54      disk_gb: float = 256
 55      neural_engine: bool = False     # Apple Neural Engine
 56      always_on: bool = False
 57      battery_powered: bool = False
 58  
 59  
 60  @dataclass
 61  class ModelDeployment:
 62      """A model deployed on a device."""
 63      name: str
 64      params_b: float                 # Billions of params
 65      quantization: str = "q4"        # q4, q8, f16
 66      tasks: List[str] = field(default_factory=list)
 67      endpoint: str = ""              # How to reach it
 68  
 69  
 70  @dataclass
 71  class Device:
 72      """A device in your fleet."""
 73      id: str
 74      name: str
 75      role: DeviceRole
 76      capabilities: DeviceCapabilities
 77      models: List[ModelDeployment] = field(default_factory=list)
 78      tailscale_ip: Optional[str] = None
 79      power_state: PowerState = PowerState.PLUGGED
 80      online: bool = True
 81  
 82      def can_run(self, model_params_b: float) -> bool:
 83          """Check if device can run a model of this size."""
 84          return model_params_b <= self.capabilities.max_model_params
 85  
 86      def current_capacity(self) -> float:
 87          """Available capacity (0-1) based on power state."""
 88          if not self.online:
 89              return 0
 90          if self.power_state == PowerState.BATTERY_CRITICAL:
 91              return 0.1
 92          if self.power_state == PowerState.BATTERY_LOW:
 93              return 0.5
 94          if self.power_state == PowerState.BATTERY_HIGH:
 95              return 0.8
 96          return 1.0
 97  
 98  
 99  class DeviceFleet:
100      """
101      Manages your fleet of devices for distributed inference.
102  
103      Key responsibilities:
104      1. Track device capabilities and status
105      2. Route tasks to appropriate devices
106      3. Handle device failures gracefully
107      4. Sync state across devices
108      """
109  
110      def __init__(self):
111          self.devices: Dict[str, Device] = {}
112          self.hypercore_url = "http://localhost:7777"
113          self._configure_fleet()
114  
115      def _configure_fleet(self):
116          """Configure your specific device fleet."""
117  
118          # MacBook Pro - Primary workstation
119          self.devices["macbook-pro"] = Device(
120              id="macbook-pro",
121              name="Rick's MacBook Pro",
122              role=DeviceRole.WORKSTATION,
123              capabilities=DeviceCapabilities(
124                  max_model_params=30,    # Can run 30B with unified memory
125                  gpu_available=True,
126                  gpu_vram_gb=36,         # Unified memory
127                  ram_gb=36,
128                  disk_gb=1000,
129                  neural_engine=True,
130                  battery_powered=True,
131              ),
132              models=[
133                  ModelDeployment(
134                      name="qwen2.5:32b",
135                      params_b=32,
136                      quantization="q4",
137                      tasks=["reasoning", "code", "analysis"],
138                      endpoint="ollama:qwen2.5:32b"
139                  ),
140                  ModelDeployment(
141                      name="qwen2.5:7b",
142                      params_b=7,
143                      quantization="q4",
144                      tasks=["general", "summarization"],
145                      endpoint="ollama:qwen2.5:7b"
146                  ),
147                  ModelDeployment(
148                      name="nomic-embed-text",
149                      params_b=0.1,
150                      tasks=["embedding"],
151                      endpoint="ollama:nomic-embed-text"
152                  ),
153              ],
154              tailscale_ip=None,  # Will be discovered
155          )
156  
157          # Mac Mini - Home server
158          self.devices["mac-mini"] = Device(
159              id="mac-mini",
160              name="Rick's Mac Mini",
161              role=DeviceRole.SERVER,
162              capabilities=DeviceCapabilities(
163                  max_model_params=14,    # M2 Pro, 16GB
164                  gpu_available=True,
165                  gpu_vram_gb=16,
166                  ram_gb=16,
167                  disk_gb=500,
168                  neural_engine=True,
169                  always_on=True,
170              ),
171              models=[
172                  ModelDeployment(
173                      name="qwen2.5:14b",
174                      params_b=14,
175                      quantization="q4",
176                      tasks=["background", "indexing", "summarization"],
177                      endpoint="ollama:qwen2.5:14b"
178                  ),
179                  ModelDeployment(
180                      name="whisper-large",
181                      params_b=1.5,
182                      tasks=["transcription"],
183                      endpoint="whisper:large"
184                  ),
185                  ModelDeployment(
186                      name="all-minilm",
187                      params_b=0.02,
188                      tasks=["embedding"],
189                      endpoint="ollama:all-minilm"
190                  ),
191              ],
192              tailscale_ip=None,
193          )
194  
195          # iPhone - Mobile
196          self.devices["iphone"] = Device(
197              id="iphone",
198              name="Rick's iPhone",
199              role=DeviceRole.MOBILE,
200              capabilities=DeviceCapabilities(
201                  max_model_params=3,     # Limited by RAM
202                  gpu_available=True,
203                  gpu_vram_gb=6,          # Shared with system
204                  ram_gb=6,
205                  disk_gb=256,
206                  neural_engine=True,
207                  battery_powered=True,
208              ),
209              models=[
210                  ModelDeployment(
211                      name="phi-3-mini",
212                      params_b=3.8,
213                      quantization="q4",
214                      tasks=["quick-qa", "classification"],
215                      endpoint="mlc:phi-3-mini"
216                  ),
217                  ModelDeployment(
218                      name="whisper-tiny",
219                      params_b=0.039,
220                      tasks=["transcription"],
221                      endpoint="whisper:tiny"
222                  ),
223              ],
224              power_state=PowerState.BATTERY_HIGH,
225          )
226  
227          # iPad - Tablet
228          self.devices["ipad"] = Device(
229              id="ipad",
230              name="Rick's iPad",
231              role=DeviceRole.TABLET,
232              capabilities=DeviceCapabilities(
233                  max_model_params=7,
234                  gpu_available=True,
235                  gpu_vram_gb=8,
236                  ram_gb=8,
237                  disk_gb=256,
238                  neural_engine=True,
239                  battery_powered=True,
240              ),
241              models=[
242                  ModelDeployment(
243                      name="qwen2.5:3b",
244                      params_b=3,
245                      quantization="q4",
246                      tasks=["summarization", "annotation"],
247                      endpoint="mlc:qwen2.5:3b"
248                  ),
249              ],
250              power_state=PowerState.BATTERY_HIGH,
251          )
252  
253          # iFlyTek - Dedicated transcription
254          self.devices["iflytek"] = Device(
255              id="iflytek",
256              name="iFlyTek Recorder",
257              role=DeviceRole.DEDICATED,
258              capabilities=DeviceCapabilities(
259                  max_model_params=0.1,   # Tiny models only
260                  gpu_available=False,
261                  ram_gb=1,
262                  disk_gb=32,
263              ),
264              models=[
265                  ModelDeployment(
266                      name="iflytek-asr",
267                      params_b=0.05,
268                      tasks=["live-transcription", "recording"],
269                      endpoint="builtin"
270                  ),
271              ],
272          )
273  
274      def get_device(self, device_id: str) -> Optional[Device]:
275          """Get a device by ID."""
276          return self.devices.get(device_id)
277  
278      def get_available_devices(self) -> List[Device]:
279          """Get all online devices with capacity."""
280          return [
281              d for d in self.devices.values()
282              if d.online and d.current_capacity() > 0
283          ]
284  
285      def find_device_for_task(
286          self,
287          task: str,
288          min_params: float = 0,
289          prefer_local: bool = True
290      ) -> Optional[Device]:
291          """
292          Find the best device to handle a task.
293  
294          Args:
295              task: Task type (e.g., "transcription", "reasoning")
296              min_params: Minimum model size needed
297              prefer_local: Prefer local device over network
298  
299          Returns:
300              Best device, or None if no device available
301          """
302          candidates = []
303  
304          for device in self.get_available_devices():
305              for model in device.models:
306                  if task in model.tasks and model.params_b >= min_params:
307                      score = self._score_device(device, model, prefer_local)
308                      candidates.append((device, model, score))
309  
310          if not candidates:
311              return None
312  
313          # Sort by score descending
314          candidates.sort(key=lambda x: x[2], reverse=True)
315          return candidates[0][0]
316  
317      def _score_device(
318          self,
319          device: Device,
320          model: ModelDeployment,
321          prefer_local: bool
322      ) -> float:
323          """Score a device for task routing."""
324          score = 0.0
325  
326          # Base score from capacity
327          score += device.current_capacity() * 10
328  
329          # Prefer always-on for background tasks
330          if device.capabilities.always_on:
331              score += 5
332  
333          # Prefer local device (current machine)
334          if prefer_local and device.id == self._get_current_device_id():
335              score += 20
336  
337          # Prefer larger models (more capable)
338          score += min(model.params_b, 10)
339  
340          # Penalize battery devices
341          if device.capabilities.battery_powered:
342              score -= 5
343  
344          return score
345  
346      def _get_current_device_id(self) -> str:
347          """Identify the current device."""
348          hostname = os.uname().nodename.lower()
349          if "macbook" in hostname:
350              return "macbook-pro"
351          elif "mini" in hostname:
352              return "mac-mini"
353          return "unknown"
354  
355      def route_inference(
356          self,
357          query: str,
358          task_type: str,
359          context: Dict[str, Any] = None
360      ) -> Dict[str, Any]:
361          """
362          Route an inference request to the best device.
363  
364          Returns routing decision with device, model, and endpoint.
365          """
366          # Estimate complexity
367          complexity = self._estimate_complexity(query, context)
368  
369          # Find appropriate device
370          device = self.find_device_for_task(
371              task=task_type,
372              min_params=complexity.min_params,
373              prefer_local=complexity.prefer_local
374          )
375  
376          if not device:
377              # Fallback to cloud
378              return {
379                  "device": None,
380                  "model": "claude-sonnet-4",
381                  "endpoint": "anthropic",
382                  "reason": "No local device available",
383                  "complexity": complexity.__dict__
384              }
385  
386          # Find the best model on this device
387          best_model = None
388          for model in device.models:
389              if task_type in model.tasks:
390                  if best_model is None or model.params_b > best_model.params_b:
391                      best_model = model
392  
393          return {
394              "device": device.id,
395              "device_name": device.name,
396              "model": best_model.name if best_model else None,
397              "endpoint": best_model.endpoint if best_model else None,
398              "reason": f"Routed to {device.name} ({device.role.value})",
399              "complexity": complexity.__dict__
400          }
401  
402      def _estimate_complexity(
403          self,
404          query: str,
405          context: Dict = None
406      ):
407          """Estimate query complexity for routing."""
408          @dataclass
409          class Complexity:
410              min_params: float
411              prefer_local: bool
412              estimated_tokens: int
413  
414          tokens = len(query.split()) * 1.3
415          context_tokens = 0
416          if context:
417              context_tokens = sum(
418                  len(str(v).split()) for v in context.values()
419              ) * 1.3
420  
421          # Simple heuristics
422          needs_reasoning = any(w in query.lower() for w in
423              ["why", "how", "explain", "analyze", "compare"])
424          needs_code = "```" in query or any(w in query.lower() for w in
425              ["implement", "code", "function", "class"])
426  
427          if needs_code:
428              min_params = 7  # Need decent code model
429          elif needs_reasoning:
430              min_params = 3  # Can use smaller for basic reasoning
431          else:
432              min_params = 0.5  # Tiny model sufficient
433  
434          # Prefer local unless very complex
435          prefer_local = (tokens + context_tokens) < 2000
436  
437          return Complexity(
438              min_params=min_params,
439              prefer_local=prefer_local,
440              estimated_tokens=int(tokens + context_tokens)
441          )
442  
443      def sync_state_to_hypercore(self):
444          """Sync fleet state to Hypercore for cross-device awareness."""
445          state = {
446              "devices": {
447                  d.id: {
448                      "name": d.name,
449                      "role": d.role.value,
450                      "online": d.online,
451                      "power_state": d.power_state.value,
452                      "capacity": d.current_capacity(),
453                      "models": [m.name for m in d.models]
454                  }
455                  for d in self.devices.values()
456              },
457              "current_device": self._get_current_device_id()
458          }
459  
460          try:
461              data = json.dumps({
462                  "type": "fleet_state",
463                  "fleet": state
464              }).encode('utf-8')
465  
466              req = urllib.request.Request(
467                  f"{self.hypercore_url}/event",
468                  data=data,
469                  headers={"Content-Type": "application/json"},
470                  method="POST"
471              )
472  
473              with urllib.request.urlopen(req, timeout=5):
474                  pass
475          except Exception as e:
476              print(f"Failed to sync fleet state: {e}")
477  
478      def print_fleet_status(self):
479          """Print current fleet status."""
480          print("\n" + "=" * 60)
481          print("SOVEREIGN DEVICE FLEET")
482          print("=" * 60)
483  
484          for device in self.devices.values():
485              status = "🟢" if device.online else "🔴"
486              power = ""
487              if device.capabilities.battery_powered:
488                  power = f" [{device.power_state.value}]"
489  
490              print(f"\n{status} {device.name}{power}")
491              print(f"   Role: {device.role.value}")
492              print(f"   Max Model: {device.capabilities.max_model_params}B")
493              print(f"   Capacity: {device.current_capacity() * 100:.0f}%")
494              print(f"   Models:")
495              for model in device.models:
496                  print(f"      - {model.name} ({model.params_b}B): {', '.join(model.tasks)}")
497  
498          print("\n" + "=" * 60)
499  
500  
501  # Singleton instance
502  _fleet: Optional[DeviceFleet] = None
503  
504  
505  def get_fleet() -> DeviceFleet:
506      """Get or create the global fleet instance."""
507      global _fleet
508      if _fleet is None:
509          _fleet = DeviceFleet()
510      return _fleet
511  
512  
513  if __name__ == "__main__":
514      fleet = get_fleet()
515      fleet.print_fleet_status()
516  
517      # Test routing
518      print("\n" + "=" * 60)
519      print("ROUTING TESTS")
520      print("=" * 60)
521  
522      test_queries = [
523          ("transcription", "Transcribe this audio file"),
524          ("summarization", "Summarize this document"),
525          ("reasoning", "Why does gravity work?"),
526          ("code", "Implement a binary search tree"),
527      ]
528  
529      for task, query in test_queries:
530          result = fleet.route_inference(query, task)
531          print(f"\nTask: {task}")
532          print(f"Query: {query[:40]}...")
533          print(f"→ Device: {result.get('device_name', 'Cloud')}")
534          print(f"→ Model: {result.get('model')}")
535          print(f"→ Reason: {result.get('reason')}")