/ abzu-inference / src / mesh.rs
mesh.rs
   1  //! Mesh Integration: DHT-based discovery and task routing for Agent Circles.
   2  //!
   3  //! This module connects Agent Circles to the Abzu mesh network, enabling:
   4  //!
   5  //! - **Circle discovery**: Announce and find circles via DHT
   6  //! - **Task routing**: Distribute inference requests to capable agents
   7  //! - **Load balancing**: Spread work across available capacity
   8  //!
   9  //! ## Architecture
  10  //!
  11  //! ```text
  12  //! ┌────────────────────────────────────────────────────────────┐
  13  //! │                         DHT Layer                         │
  14  //! │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐    │
  15  //! │  │ Circle Alpha │  │ Circle Beta  │  │ Circle Gamma │    │
  16  //! │  │ inference:*  │  │ embedding:*  │  │ image:*      │    │
  17  //! │  └──────────────┘  └──────────────┘  └──────────────┘    │
  18  //! └────────────────────────────────────────────────────────────┘
  19  //!           │                   │                   │
  20  //!      ┌────▼────┐         ┌────▼────┐         ┌────▼────┐
  21  //!      │  Agent  │         │  Agent  │         │  Agent  │
  22  //!      │  Router │         │  Router │         │  Router │
  23  //!      └─────────┘         └─────────┘         └─────────┘
  24  //! ```
  25  
  26  use crate::agent_circles::{AgentCircle, AgentProfile, AgentStatus, Capability};
  27  use crate::discovery::{CircleCache, DiscoveryQuery, DhtOp, PublishedCircle, plan_publish, plan_discovery};
  28  use crate::reputation::{ReputationTracker, TaskRecord};
  29  use serde::{Deserialize, Serialize};
  30  use std::collections::HashMap;
  31  use std::sync::Arc;
  32  use tokio::sync::RwLock;
  33  
  34  /// Circle announcement for DHT publication.
  35  #[derive(Debug, Clone, Serialize, Deserialize)]
  36  pub struct CircleAnnouncement {
  37      /// Circle ID
  38      pub circle_id: [u8; 32],
  39      
  40      /// Circle name
  41      pub name: String,
  42      
  43      /// Description
  44      pub description: Option<String>,
  45      
  46      /// Aggregated capabilities (for discovery)
  47      pub capabilities: Vec<String>,
  48      
  49      /// Member count
  50      pub member_count: usize,
  51      
  52      /// Available capacity (members ready to work)
  53      pub available_count: usize,
  54      
  55      /// Whether open for new members
  56      pub open_membership: bool,
  57      
  58      /// Announcement timestamp (Unix ms)
  59      pub timestamp: u64,
  60      
  61      /// Announcer's signature
  62      pub signature: Vec<u8>,
  63  }
  64  
  65  impl CircleAnnouncement {
  66      /// Create an announcement from an AgentCircle
  67      pub fn from_circle(circle: &AgentCircle) -> Self {
  68          let available_count = circle.members
  69              .iter()
  70              .filter(|m| m.status == AgentStatus::Available)
  71              .count();
  72          
  73          Self {
  74              circle_id: circle.id,
  75              name: circle.name.clone(),
  76              description: circle.description.clone(),
  77              capabilities: circle.capabilities.iter().cloned().collect(),
  78              member_count: circle.members.len(),
  79              available_count,
  80              open_membership: circle.open_membership,
  81              timestamp: std::time::SystemTime::now()
  82                  .duration_since(std::time::UNIX_EPOCH)
  83                  .unwrap()
  84                  .as_millis() as u64,
  85              signature: vec![], // Must be signed by caller
  86          }
  87      }
  88      
  89      /// Derive DHT key for this announcement
  90      pub fn dht_key(&self) -> [u8; 32] {
  91          let mut hasher = blake3::Hasher::new();
  92          hasher.update(b"abzu:agent-circle:announcement:");
  93          hasher.update(&self.circle_id);
  94          *hasher.finalize().as_bytes()
  95      }
  96      
  97      /// Derive capability topic keys for discovery
  98      pub fn capability_keys(&self) -> Vec<[u8; 32]> {
  99          self.capabilities
 100              .iter()
 101              .map(|cap| {
 102                  let mut hasher = blake3::Hasher::new();
 103                  hasher.update(b"abzu:agent-circle:capability:");
 104                  hasher.update(cap.as_bytes());
 105                  *hasher.finalize().as_bytes()
 106              })
 107              .collect()
 108      }
 109  }
 110  
 111  /// Task to be routed to an agent circle.
 112  #[derive(Debug, Clone, Serialize, Deserialize)]
 113  pub struct Task {
 114      /// Unique task ID
 115      pub id: [u8; 32],
 116      
 117      /// Required capability
 118      pub capability: String,
 119      
 120      /// Task payload (capability-specific)
 121      pub payload: Vec<u8>,
 122      
 123      /// Priority (higher = more urgent)
 124      pub priority: u8,
 125      
 126      /// Maximum latency allowed (ms, 0 = no limit)
 127      pub max_latency_ms: u64,
 128      
 129      /// Requester's public key
 130      pub requester: [u8; 32],
 131      
 132      /// Created timestamp
 133      pub created_at: u64,
 134  }
 135  
 136  impl Task {
 137      /// Create a new task
 138      pub fn new(capability: impl Into<String>, payload: Vec<u8>, requester: [u8; 32]) -> Self {
 139          let mut id = [0u8; 32];
 140          let nonce: [u8; 16] = rand::random();
 141          let mut hasher = blake3::Hasher::new();
 142          hasher.update(&requester);
 143          hasher.update(&nonce);
 144          id.copy_from_slice(hasher.finalize().as_bytes());
 145          
 146          Self {
 147              id,
 148              capability: capability.into(),
 149              payload,
 150              priority: 5, // Medium default
 151              max_latency_ms: 0,
 152              requester,
 153              created_at: std::time::SystemTime::now()
 154                  .duration_since(std::time::UNIX_EPOCH)
 155                  .unwrap()
 156                  .as_millis() as u64,
 157          }
 158      }
 159      
 160      /// Set priority (0-10)
 161      pub fn with_priority(mut self, priority: u8) -> Self {
 162          self.priority = priority.min(10);
 163          self
 164      }
 165      
 166      /// Set max latency requirement
 167      pub fn with_max_latency(mut self, ms: u64) -> Self {
 168          self.max_latency_ms = ms;
 169          self
 170      }
 171  }
 172  
 173  /// Result of a completed task.
 174  #[derive(Debug, Clone, Serialize, Deserialize)]
 175  pub struct TaskResult {
 176      /// Task ID this result is for
 177      pub task_id: [u8; 32],
 178      
 179      /// Whether the task succeeded
 180      pub success: bool,
 181      
 182      /// Result payload (if success)
 183      pub payload: Option<Vec<u8>>,
 184      
 185      /// Error message (if failure)
 186      pub error: Option<String>,
 187      
 188      /// Agent that completed the task
 189      pub completed_by: [u8; 32],
 190      
 191      /// Completion timestamp
 192      pub completed_at: u64,
 193      
 194      /// Execution time (ms)
 195      pub execution_time_ms: u64,
 196  }
 197  
 198  /// Routing strategy for task distribution.
 199  #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
 200  pub enum RoutingStrategy {
 201      /// Lowest latency first
 202      #[default]
 203      LowestLatency,
 204      
 205      /// Random selection among capable agents
 206      Random,
 207      
 208      /// Round-robin across capable agents
 209      RoundRobin,
 210      
 211      /// Highest reputation first
 212      HighestReputation,
 213      
 214      /// Least loaded agent
 215      LeastLoaded,
 216  }
 217  
 218  /// Task router for distributing work across agent circles.
 219  pub struct TaskRouter {
 220      /// Known circles (by ID)
 221      circles: Arc<RwLock<HashMap<[u8; 32], AgentCircle>>>,
 222      
 223      /// Capability index (capability -> circle IDs)
 224      capability_index: Arc<RwLock<HashMap<String, Vec<[u8; 32]>>>>,
 225      
 226      /// Round-robin counters (for RoundRobin strategy)
 227      rr_counters: Arc<RwLock<HashMap<String, usize>>>,
 228      
 229      /// Routing strategy
 230      strategy: RoutingStrategy,
 231      
 232      /// Our node's public key
 233      our_pubkey: [u8; 32],
 234  }
 235  
 236  impl TaskRouter {
 237      /// Create a new task router
 238      pub fn new(our_pubkey: [u8; 32]) -> Self {
 239          Self {
 240              circles: Arc::new(RwLock::new(HashMap::new())),
 241              capability_index: Arc::new(RwLock::new(HashMap::new())),
 242              rr_counters: Arc::new(RwLock::new(HashMap::new())),
 243              strategy: RoutingStrategy::default(),
 244              our_pubkey,
 245          }
 246      }
 247      
 248      /// Set routing strategy
 249      pub fn with_strategy(mut self, strategy: RoutingStrategy) -> Self {
 250          self.strategy = strategy;
 251          self
 252      }
 253      
 254      /// Register a circle with the router
 255      pub async fn register_circle(&self, circle: AgentCircle) {
 256          let circle_id = circle.id;
 257          
 258          // Update capability index
 259          {
 260              let mut index = self.capability_index.write().await;
 261              for cap in &circle.capabilities {
 262                  index.entry(cap.clone())
 263                      .or_insert_with(Vec::new)
 264                      .push(circle_id);
 265              }
 266          }
 267          
 268          // Store circle
 269          {
 270              let mut circles = self.circles.write().await;
 271              circles.insert(circle_id, circle);
 272          }
 273      }
 274      
 275      /// Unregister a circle
 276      pub async fn unregister_circle(&self, circle_id: &[u8; 32]) {
 277          let circle = {
 278              let mut circles = self.circles.write().await;
 279              circles.remove(circle_id)
 280          };
 281          
 282          // Clean up capability index
 283          if let Some(circle) = circle {
 284              let mut index = self.capability_index.write().await;
 285              for cap in &circle.capabilities {
 286                  if let Some(ids) = index.get_mut(cap) {
 287                      ids.retain(|id| id != circle_id);
 288                  }
 289              }
 290          }
 291      }
 292      
 293      /// Find circles with a specific capability
 294      pub async fn find_capable(&self, capability: &str) -> Vec<AgentCircle> {
 295          let circle_ids = {
 296              let index = self.capability_index.read().await;
 297              index.get(capability).cloned().unwrap_or_default()
 298          };
 299          
 300          let circles = self.circles.read().await;
 301          circle_ids.iter()
 302              .filter_map(|id| circles.get(id).cloned())
 303              .collect()
 304      }
 305      
 306      /// Route a task to the best available agent
 307      pub async fn route(&self, task: &Task) -> Result<AgentProfile, String> {
 308          let circles = self.find_capable(&task.capability).await;
 309          
 310          if circles.is_empty() {
 311              return Err(format!("No circles with capability: {}", task.capability));
 312          }
 313          
 314          // Collect all available agents with the capability
 315          let mut candidates: Vec<(&AgentCircle, &AgentProfile)> = vec![];
 316          
 317          for circle in &circles {
 318              for member in &circle.members {
 319                  if member.status == AgentStatus::Available {
 320                      // Check if member has the specific capability
 321                      let has_cap = member.capabilities.iter().any(|c| {
 322                          capability_matches(c, &task.capability)
 323                      });
 324                      
 325                      if has_cap {
 326                          candidates.push((circle, member));
 327                      }
 328                  }
 329              }
 330          }
 331          
 332          if candidates.is_empty() {
 333              return Err("No available agents for capability".into());
 334          }
 335          
 336          // Apply routing strategy
 337          let selected = match self.strategy {
 338              RoutingStrategy::LowestLatency => {
 339                  // For now, just pick first (latency tracking TODO)
 340                  candidates.first()
 341              }
 342              RoutingStrategy::Random => {
 343                  use rand::seq::SliceRandom;
 344                  candidates.choose(&mut rand::thread_rng())
 345              }
 346              RoutingStrategy::RoundRobin => {
 347                  let mut counters = self.rr_counters.write().await;
 348                  let counter = counters.entry(task.capability.clone()).or_insert(0);
 349                  let idx = *counter % candidates.len();
 350                  *counter = counter.wrapping_add(1);
 351                  candidates.get(idx)
 352              }
 353              RoutingStrategy::HighestReputation => {
 354                  candidates.iter()
 355                      .max_by(|a, b| {
 356                          a.1.reputation.partial_cmp(&b.1.reputation).unwrap()
 357                      })
 358              }
 359              RoutingStrategy::LeastLoaded => {
 360                  // Use member count as proxy for load (TODO: actual load tracking)
 361                  candidates.iter()
 362                      .min_by_key(|(circle, _)| circle.members.len())
 363              }
 364          };
 365          
 366          selected
 367              .map(|(_, agent)| (*agent).clone())
 368              .ok_or_else(|| "No agent selected".into())
 369      }
 370      
 371      /// Get statistics about registered circles
 372      pub async fn stats(&self) -> RouterStats {
 373          let circles = self.circles.read().await;
 374          let index = self.capability_index.read().await;
 375          
 376          let total_members: usize = circles.values().map(|c| c.members.len()).sum();
 377          let available_members: usize = circles.values()
 378              .flat_map(|c| c.members.iter())
 379              .filter(|m| m.status == AgentStatus::Available)
 380              .count();
 381          
 382          RouterStats {
 383              circle_count: circles.len(),
 384              capability_count: index.len(),
 385              total_members,
 386              available_members,
 387          }
 388      }
 389  }
 390  
 391  /// Check if a capability matches a query string
 392  fn capability_matches(cap: &Capability, query: &str) -> bool {
 393      let cap_name = match cap {
 394          Capability::Inference { model, .. } => format!("inference:{}", model),
 395          Capability::Embedding { model, .. } => format!("embedding:{}", model),
 396          Capability::ImageGeneration { model } => format!("image:{}", model),
 397          Capability::Transcription { model } => format!("transcription:{}", model),
 398          Capability::Speech { model } => format!("speech:{}", model),
 399          Capability::CodeExecution { runtime } => format!("code:{}", runtime),
 400          Capability::WebBrowsing => "web".into(),
 401          Capability::Storage { .. } => "storage".into(),
 402          Capability::Custom { name, .. } => format!("custom:{}", name),
 403      };
 404      
 405      // Exact match or wildcard
 406      cap_name == query || query.ends_with(":*") && cap_name.starts_with(&query[..query.len()-1])
 407  }
 408  
 409  /// Router statistics.
 410  #[derive(Debug, Clone, Default)]
 411  pub struct RouterStats {
 412      pub circle_count: usize,
 413      pub capability_count: usize,
 414      pub total_members: usize,
 415      pub available_members: usize,
 416  }
 417  
 418  // ============================================================================
 419  // DHT-Integrated Mesh Discovery
 420  // ============================================================================
 421  
 422  /// Mesh discovery router with DHT integration.
 423  /// 
 424  /// Combines local circles (direct membership) with discovered circles (via DHT)
 425  /// for cross-mesh task routing. Pure logic - I/O layer executes DHT operations.
 426  /// 
 427  /// ## Example
 428  /// 
 429  /// ```ignore
 430  /// let mesh = MeshDiscoveryRouter::new([0x01; 32]);
 431  /// 
 432  /// // Register our local circle
 433  /// mesh.register_local_circle(my_circle).await;
 434  /// 
 435  /// // Publish to DHT (returns op for I/O layer)
 436  /// let ops = mesh.plan_announce_all().await;
 437  /// for op in ops {
 438  ///     dht_io.execute(op).await;
 439  /// }
 440  /// 
 441  /// // Discover remote circles (returns query op)
 442  /// let query_op = mesh.plan_discover_capability("inference:llama".into());
 443  /// let responses = dht_io.execute(query_op).await;
 444  /// mesh.ingest_discovery(responses).await;
 445  /// 
 446  /// // Route now considers both local AND discovered circles
 447  /// let target = mesh.route_task(&task).await?;
 448  /// ```
 449  pub struct MeshDiscoveryRouter {
 450      /// Our node's public key
 451      pub our_pubkey: [u8; 32],
 452      
 453      /// Local circles (we're a member of these)
 454      local_circles: Arc<RwLock<HashMap<[u8; 32], AgentCircle>>>,
 455      
 456      /// Discovered circles from DHT (cached)
 457      discovered_cache: Arc<RwLock<CircleCache>>,
 458      
 459      /// Capability index for local circles
 460      local_capability_index: Arc<RwLock<HashMap<String, Vec<[u8; 32]>>>>,
 461      
 462      /// Our announcements (for re-publishing)
 463      our_announcements: Arc<RwLock<HashMap<[u8; 32], PublishedCircle>>>,
 464      
 465      /// Circle-level reputation tracking (for discovered circles)
 466      circle_reputation: Arc<RwLock<ReputationTracker>>,
 467      
 468      /// Routing strategy
 469      strategy: RoutingStrategy,
 470  }
 471  
 472  impl MeshDiscoveryRouter {
 473      /// Create a new mesh discovery router.
 474      pub fn new(our_pubkey: [u8; 32]) -> Self {
 475          Self {
 476              our_pubkey,
 477              local_circles: Arc::new(RwLock::new(HashMap::new())),
 478              discovered_cache: Arc::new(RwLock::new(CircleCache::new())),
 479              local_capability_index: Arc::new(RwLock::new(HashMap::new())),
 480              our_announcements: Arc::new(RwLock::new(HashMap::new())),
 481              circle_reputation: Arc::new(RwLock::new(ReputationTracker::new())),
 482              strategy: RoutingStrategy::default(),
 483          }
 484      }
 485      
 486      /// Set routing strategy.
 487      pub fn with_strategy(mut self, strategy: RoutingStrategy) -> Self {
 488          self.strategy = strategy;
 489          self
 490      }
 491      
 492      // ------------------------------------------------------------------------
 493      // Local Circle Management
 494      // ------------------------------------------------------------------------
 495      
 496      /// Register a local circle (we're a member).
 497      pub async fn register_local_circle(&self, circle: AgentCircle) {
 498          let circle_id = circle.id;
 499          
 500          // Update capability index
 501          {
 502              let mut index = self.local_capability_index.write().await;
 503              for cap in &circle.capabilities {
 504                  index.entry(cap.clone())
 505                      .or_default()
 506                      .push(circle_id);
 507              }
 508          }
 509          
 510          // Create announcement for DHT publishing
 511          let announcement = CircleAnnouncement::from_circle(&circle);
 512          let now = std::time::SystemTime::now()
 513              .duration_since(std::time::UNIX_EPOCH)
 514              .unwrap()
 515              .as_secs();
 516          let published = PublishedCircle::new(circle_id, announcement, now);
 517          
 518          {
 519              let mut announcements = self.our_announcements.write().await;
 520              announcements.insert(circle_id, published);
 521          }
 522          
 523          // Store circle
 524          {
 525              let mut circles = self.local_circles.write().await;
 526              circles.insert(circle_id, circle);
 527          }
 528      }
 529      
 530      /// Unregister a local circle.
 531      pub async fn unregister_local_circle(&self, circle_id: &[u8; 32]) {
 532          // Remove from local storage
 533          let circle = {
 534              let mut circles = self.local_circles.write().await;
 535              circles.remove(circle_id)
 536          };
 537          
 538          // Clean up capability index
 539          if let Some(circle) = circle {
 540              let mut index = self.local_capability_index.write().await;
 541              for cap in &circle.capabilities {
 542                  if let Some(ids) = index.get_mut(cap) {
 543                      ids.retain(|id| id != circle_id);
 544                  }
 545              }
 546          }
 547          
 548          // Remove announcement
 549          {
 550              let mut announcements = self.our_announcements.write().await;
 551              announcements.remove(circle_id);
 552          }
 553      }
 554      
 555      // ------------------------------------------------------------------------
 556      // DHT Operations (Pure Logic - I/O layer executes)
 557      // ------------------------------------------------------------------------
 558      
 559      /// Plan DHT announce for all our circles.
 560      /// Returns DhtOps that the I/O layer should execute.
 561      pub async fn plan_announce_all(&self) -> Vec<DhtOp> {
 562          let announcements = self.our_announcements.read().await;
 563          announcements.values()
 564              .filter_map(|p| plan_publish(p).ok())
 565              .collect()
 566      }
 567      
 568      /// Plan DHT announce for a specific circle.
 569      pub async fn plan_announce(&self, circle_id: &[u8; 32]) -> Option<DhtOp> {
 570          let announcements = self.our_announcements.read().await;
 571          announcements.get(circle_id)
 572              .and_then(|p| plan_publish(p).ok())
 573      }
 574      
 575      /// Plan discovery query by capability.
 576      pub fn plan_discover_capability(&self, capability: String) -> Option<DhtOp> {
 577          let query = DiscoveryQuery::ByCapability(capability);
 578          plan_discovery(&query)
 579      }
 580      
 581      /// Plan discovery query by circle ID.
 582      pub fn plan_discover_circle(&self, circle_id: [u8; 32]) -> Option<DhtOp> {
 583          let query = DiscoveryQuery::ByCircleId(circle_id);
 584          plan_discovery(&query)
 585      }
 586      
 587      /// Ingest discovered circles from DHT responses.
 588      /// Call this with the payload from successful DHT queries.
 589      pub async fn ingest_discovery(&self, payloads: Vec<Vec<u8>>) -> usize {
 590          let mut cache = self.discovered_cache.write().await;
 591          let mut count = 0;
 592          
 593          for payload in payloads {
 594              if let Ok(circle) = PublishedCircle::from_bytes(&payload) {
 595                  cache.insert(circle);
 596                  count += 1;
 597              }
 598          }
 599          
 600          count
 601      }
 602      
 603      /// Garbage collect expired discovered circles.
 604      pub async fn gc_discovered(&self, now: u64) {
 605          let mut cache = self.discovered_cache.write().await;
 606          cache.gc(now);
 607      }
 608      
 609      // ------------------------------------------------------------------------
 610      // Circle Reputation Tracking
 611      // ------------------------------------------------------------------------
 612      
 613      /// Record a task completion for a discovered circle.
 614      /// This updates the circle's reputation based on task outcomes.
 615      pub async fn record_circle_task(
 616          &self,
 617          circle_id: [u8; 32],
 618          record: TaskRecord,
 619      ) {
 620          let mut reputation = self.circle_reputation.write().await;
 621          reputation.record(circle_id, record);
 622      }
 623      
 624      /// Get reputation score for a discovered circle.
 625      pub async fn get_circle_reputation(&self, circle_id: &[u8; 32]) -> Option<f32> {
 626          let reputation = self.circle_reputation.read().await;
 627          reputation.get(circle_id).map(|s| s.score)
 628      }
 629      
 630      /// Get top circles by reputation.
 631      pub async fn top_circles(&self, n: usize) -> Vec<([u8; 32], f32)> {
 632          let reputation = self.circle_reputation.read().await;
 633          reputation.top_agents(n)
 634              .into_iter()
 635              .map(|(id, score)| (id, score.score))
 636              .collect()
 637      }
 638      
 639      /// Get circles that should be avoided (low reputation).
 640      pub async fn blacklisted_circles(&self) -> Vec<[u8; 32]> {
 641          let reputation = self.circle_reputation.read().await;
 642          reputation.blacklisted_agents()
 643      }
 644      
 645      // ------------------------------------------------------------------------
 646      // Announcement Triggers
 647      // ------------------------------------------------------------------------
 648      
 649      /// Trigger an announcement when a circle's state changes.
 650      /// Returns DhtOps to publish if an announcement is warranted.
 651      pub async fn on_circle_state_change(
 652          &self,
 653          circle_id: [u8; 32],
 654          now: u64,
 655          _ttl: u64,
 656      ) -> Vec<DhtOp> {
 657          let circles = self.local_circles.read().await;
 658          
 659          if let Some(circle) = circles.get(&circle_id) {
 660              // Create announcement for updated circle state
 661              let announcement = CircleAnnouncement::from_circle(circle);
 662              let published = PublishedCircle::new(
 663                  circle_id,
 664                  announcement.clone(),
 665                  now,
 666              );
 667              
 668              // Update our cached announcement
 669              drop(circles); // Release read lock
 670              let mut our_announcements = self.our_announcements.write().await;
 671              our_announcements.insert(circle_id, published.clone());
 672              
 673              // Plan the DHT store - returns single op wrapped in vec
 674              if let Ok(op) = plan_publish(&published) {
 675                  return vec![op];
 676              }
 677          }
 678          
 679          vec![]
 680      }
 681      
 682      /// Trigger announcements for all circles where members have changed.
 683      /// Call this after agent joins/leaves or status changes.
 684      pub async fn on_member_change(
 685          &self,
 686          circle_id: [u8; 32],
 687          now: u64,
 688          ttl: u64,
 689      ) -> Vec<DhtOp> {
 690          self.on_circle_state_change(circle_id, now, ttl).await
 691      }
 692      
 693      /// Trigger announcements for all circles where capabilities have changed.
 694      pub async fn on_capability_change(
 695          &self,
 696          circle_id: [u8; 32],
 697          now: u64,
 698          ttl: u64,
 699      ) -> Vec<DhtOp> {
 700          self.on_circle_state_change(circle_id, now, ttl).await
 701      }
 702      
 703      // ------------------------------------------------------------------------
 704      // Routing (Local + Discovered)
 705      // ------------------------------------------------------------------------
 706      
 707      /// Find all circles (local + discovered) with a capability.
 708      pub async fn find_capable_circles(&self, capability: &str, now: u64) -> MeshDiscoveryResult {
 709          let mut result = MeshDiscoveryResult::default();
 710          
 711          // Local circles
 712          {
 713              let index = self.local_capability_index.read().await;
 714              if let Some(ids) = index.get(capability) {
 715                  let circles = self.local_circles.read().await;
 716                  for id in ids {
 717                      if let Some(circle) = circles.get(id) {
 718                          result.local_circles.push(circle.clone());
 719                      }
 720                  }
 721              }
 722          }
 723          
 724          // Discovered circles
 725          {
 726              let cache = self.discovered_cache.read().await;
 727              let query = DiscoveryQuery::ByCapability(capability.to_string());
 728              result.discovered_circles = cache.query(&query, now);
 729          }
 730          
 731          result
 732      }
 733      
 734      /// Route a task to the best available target.
 735      /// Returns either a local agent or a remote circle endpoint.
 736      pub async fn route_task(&self, task: &Task, now: u64) -> Result<RoutingTarget, String> {
 737          let discovery = self.find_capable_circles(&task.capability, now).await;
 738          
 739          // First, try local circles (lower latency)
 740          if !discovery.local_circles.is_empty() {
 741              let mut candidates: Vec<(&AgentCircle, &AgentProfile)> = vec![];
 742              
 743              for circle in &discovery.local_circles {
 744                  for member in &circle.members {
 745                      if member.status == AgentStatus::Available {
 746                          let has_cap = member.capabilities.iter()
 747                              .any(|c| capability_matches(c, &task.capability));
 748                          if has_cap {
 749                              candidates.push((circle, member));
 750                          }
 751                      }
 752                  }
 753              }
 754              
 755              if !candidates.is_empty() {
 756                  let selected = self.select_candidate(&candidates, task).await;
 757                  if let Some((circle, agent)) = selected {
 758                      return Ok(RoutingTarget::LocalAgent {
 759                          circle_id: circle.id,
 760                          agent_pubkey: agent.pubkey,
 761                          agent_id: agent.id.clone(),
 762                      });
 763                  }
 764              }
 765          }
 766          
 767          // Fall back to discovered circles (cross-mesh)
 768          if !discovery.discovered_circles.is_empty() {
 769              // Pick best discovered circle based on strategy
 770              let reputation = self.circle_reputation.read().await;
 771              let selected = match self.strategy {
 772                  RoutingStrategy::LowestLatency | RoutingStrategy::LeastLoaded => {
 773                      // Prefer circles with more available capacity
 774                      discovery.discovered_circles.iter()
 775                          .max_by_key(|c| c.announcement.available_count)
 776                  }
 777                  RoutingStrategy::HighestReputation => {
 778                      // Select circle with highest reputation score
 779                      discovery.discovered_circles.iter()
 780                          .max_by(|a, b| {
 781                              let rep_a = reputation.get(&a.circle_id).map(|s| s.score).unwrap_or(0.5);
 782                              let rep_b = reputation.get(&b.circle_id).map(|s| s.score).unwrap_or(0.5);
 783                              rep_a.partial_cmp(&rep_b).unwrap_or(std::cmp::Ordering::Equal)
 784                          })
 785                  }
 786                  _ => discovery.discovered_circles.first(),
 787              };
 788              
 789              if let Some(circle) = selected {
 790                  return Ok(RoutingTarget::RemoteCircle {
 791                      circle_id: circle.circle_id,
 792                      announcement: circle.announcement.clone(),
 793                  });
 794              }
 795          }
 796          
 797          Err(format!("No circles found for capability: {}", task.capability))
 798      }
 799      
 800      /// Select best candidate from local options.
 801      async fn select_candidate<'a>(
 802          &self,
 803          candidates: &[(&'a AgentCircle, &'a AgentProfile)],
 804          _task: &Task,  
 805      ) -> Option<(&'a AgentCircle, &'a AgentProfile)> {
 806          match self.strategy {
 807              RoutingStrategy::LowestLatency => candidates.first().cloned(),
 808              RoutingStrategy::Random => {
 809                  use rand::seq::SliceRandom;
 810                  candidates.choose(&mut rand::thread_rng()).cloned()
 811              }
 812              RoutingStrategy::HighestReputation => {
 813                  candidates.iter()
 814                      .max_by(|a, b| {
 815                          a.1.reputation.partial_cmp(&b.1.reputation)
 816                              .unwrap_or(std::cmp::Ordering::Equal)
 817                      })
 818                      .cloned()
 819              }
 820              RoutingStrategy::LeastLoaded => {
 821                  candidates.iter()
 822                      .min_by_key(|(circle, _)| circle.members.len())
 823                      .cloned()
 824              }
 825              RoutingStrategy::RoundRobin => {
 826                  // Simplified: just pick first for now
 827                  candidates.first().cloned()
 828              }
 829          }
 830      }
 831      
 832      /// Get mesh statistics.
 833      pub async fn stats(&self, now: u64) -> MeshStats {
 834          let local_circles = self.local_circles.read().await;
 835          let discovered_cache = self.discovered_cache.read().await;
 836          let local_index = self.local_capability_index.read().await;
 837          
 838          let local_members: usize = local_circles.values().map(|c| c.members.len()).sum();
 839          let local_available: usize = local_circles.values()
 840              .flat_map(|c| c.members.iter())
 841              .filter(|m| m.status == AgentStatus::Available)
 842              .count();
 843          
 844          let discovered_circles = discovered_cache.query(&DiscoveryQuery::AllKnown, now);
 845          let discovered_capacity: usize = discovered_circles.iter()
 846              .map(|c| c.announcement.available_count)
 847              .sum();
 848          
 849          MeshStats {
 850              local_circle_count: local_circles.len(),
 851              local_capability_count: local_index.len(),
 852              local_member_count: local_members,
 853              local_available_count: local_available,
 854              discovered_circle_count: discovered_circles.len(),
 855              discovered_capacity,
 856          }
 857      }
 858  }
 859  
 860  /// Result of mesh discovery.
 861  #[derive(Debug, Clone, Default)]
 862  pub struct MeshDiscoveryResult {
 863      /// Circles we're a member of
 864      pub local_circles: Vec<AgentCircle>,
 865      /// Circles discovered via DHT
 866      pub discovered_circles: Vec<PublishedCircle>,
 867  }
 868  
 869  /// Routing target for a task.
 870  #[derive(Debug, Clone)]
 871  pub enum RoutingTarget {
 872      /// Route to a local agent (same mesh)
 873      LocalAgent {
 874          circle_id: [u8; 32],
 875          agent_pubkey: [u8; 32],
 876          agent_id: String,
 877      },
 878      /// Route to a remote circle (cross-mesh via DHT)
 879      RemoteCircle {
 880          circle_id: [u8; 32],
 881          announcement: CircleAnnouncement,
 882      },
 883  }
 884  
 885  /// Mesh statistics.
 886  #[derive(Debug, Clone, Default)]
 887  pub struct MeshStats {
 888      pub local_circle_count: usize,
 889      pub local_capability_count: usize,
 890      pub local_member_count: usize,
 891      pub local_available_count: usize,
 892      pub discovered_circle_count: usize,
 893      pub discovered_capacity: usize,
 894  }
 895  
 896  
 897  #[cfg(test)]
 898  mod tests {
 899      use super::*;
 900      
 901      #[test]
 902      fn test_circle_announcement() {
 903          let founder = AgentProfile::new("founder").with_pubkey([1u8; 32]);
 904          let circle = AgentCircle::new("Test Circle", founder);
 905          
 906          let announcement = CircleAnnouncement::from_circle(&circle);
 907          
 908          assert_eq!(announcement.circle_id, circle.id);
 909          assert_eq!(announcement.name, "Test Circle");
 910          assert_eq!(announcement.member_count, 1);
 911          assert_eq!(announcement.available_count, 1);
 912      }
 913      
 914      #[test]
 915      fn test_capability_keys() {
 916          let mut founder = AgentProfile::new("founder").with_pubkey([1u8; 32]);
 917          founder.capabilities.push(Capability::inference("llama-7b"));
 918          
 919          let circle = AgentCircle::new("Test", founder);
 920          let announcement = CircleAnnouncement::from_circle(&circle);
 921          
 922          let keys = announcement.capability_keys();
 923          assert_eq!(keys.len(), 1);
 924      }
 925      
 926      #[test]
 927      fn test_task_creation() {
 928          let task = Task::new("inference:llama-7b", vec![1, 2, 3], [1u8; 32])
 929              .with_priority(8)
 930              .with_max_latency(5000);
 931          
 932          assert_eq!(task.capability, "inference:llama-7b");
 933          assert_eq!(task.priority, 8);
 934          assert_eq!(task.max_latency_ms, 5000);
 935      }
 936      
 937      #[tokio::test]
 938      async fn test_router_registration() {
 939          let router = TaskRouter::new([1u8; 32]);
 940          
 941          let founder = AgentProfile::new("founder")
 942              .with_pubkey([2u8; 32])
 943              .with_capability(Capability::inference("llama-7b"));
 944          
 945          let circle = AgentCircle::new("Test Circle", founder);
 946          router.register_circle(circle).await;
 947          
 948          let stats = router.stats().await;
 949          assert_eq!(stats.circle_count, 1);
 950          assert_eq!(stats.total_members, 1);
 951      }
 952      
 953      #[tokio::test]
 954      async fn test_router_find_capable() {
 955          let router = TaskRouter::new([1u8; 32]);
 956          
 957          let founder = AgentProfile::new("founder")
 958              .with_pubkey([2u8; 32])
 959              .with_capability(Capability::inference("llama-7b"));
 960          
 961          let circle = AgentCircle::new("LLM Circle", founder);
 962          router.register_circle(circle).await;
 963          
 964          let llm_circles = router.find_capable("inference:llama-7b").await;
 965          assert_eq!(llm_circles.len(), 1);
 966          
 967          let empty = router.find_capable("embedding:bge").await;
 968          assert!(empty.is_empty());
 969      }
 970      
 971      #[tokio::test]
 972      async fn test_router_route_task() {
 973          let router = TaskRouter::new([1u8; 32]);
 974          
 975          let founder = AgentProfile::new("founder")
 976              .with_pubkey([2u8; 32])
 977              .with_capability(Capability::inference("llama-7b"));
 978          
 979          let circle = AgentCircle::new("Test", founder);
 980          router.register_circle(circle).await;
 981          
 982          let task = Task::new("inference:llama-7b", vec![], [3u8; 32]);
 983          let agent = router.route(&task).await.unwrap();
 984          
 985          assert_eq!(agent.id, "founder");
 986      }
 987      
 988      #[test]
 989      fn test_capability_matches() {
 990          assert!(capability_matches(
 991              &Capability::inference("llama-7b"),
 992              "inference:llama-7b"
 993          ));
 994          
 995          assert!(capability_matches(
 996              &Capability::inference("llama-7b"),
 997              "inference:*"
 998          ));
 999          
1000          assert!(!capability_matches(
1001              &Capability::inference("llama-7b"),
1002              "embedding:bge"
1003          ));
1004      }
1005      
1006      // ========================================================================
1007      // MeshDiscoveryRouter Tests
1008      // ========================================================================
1009      
1010      #[tokio::test]
1011      async fn test_mesh_router_local_registration() {
1012          let mesh = MeshDiscoveryRouter::new([0xAA; 32]);
1013          
1014          let founder = AgentProfile::new("founder")
1015              .with_pubkey([0xBB; 32])
1016              .with_capability(Capability::inference("llama-7b"));
1017          
1018          let circle = AgentCircle::new("LLM Circle", founder);
1019          mesh.register_local_circle(circle).await;
1020          
1021          let now = 1700000000;
1022          let stats = mesh.stats(now).await;
1023          
1024          assert_eq!(stats.local_circle_count, 1);
1025          assert_eq!(stats.local_member_count, 1);
1026          assert_eq!(stats.local_available_count, 1);
1027      }
1028      
1029      #[tokio::test]
1030      async fn test_mesh_router_local_routing() {
1031          let mesh = MeshDiscoveryRouter::new([0xAA; 32]);
1032          
1033          let founder = AgentProfile::new("founder")
1034              .with_pubkey([0xBB; 32])
1035              .with_capability(Capability::inference("llama-7b"));
1036          
1037          let circle = AgentCircle::new("LLM Circle", founder);
1038          mesh.register_local_circle(circle).await;
1039          
1040          let task = Task::new("inference:llama-7b", vec![], [0xCC; 32]);
1041          let now = 1700000000;
1042          
1043          let target = mesh.route_task(&task, now).await.unwrap();
1044          
1045          match target {
1046              RoutingTarget::LocalAgent { agent_id, .. } => {
1047                  assert_eq!(agent_id, "founder");
1048              }
1049              _ => panic!("Expected local agent routing"),
1050          }
1051      }
1052      
1053      #[tokio::test]
1054      async fn test_mesh_router_discovery_ingestion() {
1055          let mesh = MeshDiscoveryRouter::new([0xAA; 32]);
1056          
1057          // Create a "discovered" circle announcement
1058          let announcement = CircleAnnouncement {
1059              circle_id: [0x01; 32],
1060              name: "Remote LLM Circle".to_string(),
1061              description: None,
1062              capabilities: vec!["inference:claude".to_string()],
1063              member_count: 5,
1064              available_count: 3,
1065              open_membership: true,
1066              timestamp: 1700000000,
1067              signature: vec![],
1068          };
1069          
1070          let published = PublishedCircle::new([0x01; 32], announcement, 1700000000);
1071          let payload = published.to_bytes().unwrap();
1072          
1073          // Ingest discovery result
1074          let count = mesh.ingest_discovery(vec![payload]).await;
1075          assert_eq!(count, 1);
1076          
1077          let now = 1700000000;
1078          let stats = mesh.stats(now).await;
1079          
1080          assert_eq!(stats.discovered_circle_count, 1);
1081          assert_eq!(stats.discovered_capacity, 3);
1082      }
1083      
1084      #[tokio::test]
1085      async fn test_mesh_router_cross_mesh_fallback() {
1086          let mesh = MeshDiscoveryRouter::new([0xAA; 32]);
1087          
1088          // No local circles - but we have a discovered one
1089          let announcement = CircleAnnouncement {
1090              circle_id: [0x02; 32],
1091              name: "Remote Embedding Circle".to_string(),
1092              description: None,
1093              capabilities: vec!["embedding:bge".to_string()],
1094              member_count: 3,
1095              available_count: 2,
1096              open_membership: false,
1097              timestamp: 1700000000,
1098              signature: vec![],
1099          };
1100          
1101          let published = PublishedCircle::new([0x02; 32], announcement.clone(), 1700000000);
1102          mesh.ingest_discovery(vec![published.to_bytes().unwrap()]).await;
1103          
1104          let task = Task::new("embedding:bge", vec![], [0xCC; 32]);
1105          let now = 1700000000;
1106          
1107          let target = mesh.route_task(&task, now).await.unwrap();
1108          
1109          match target {
1110              RoutingTarget::RemoteCircle { circle_id, announcement: ann } => {
1111                  assert_eq!(circle_id, [0x02; 32]);
1112                  assert_eq!(ann.name, "Remote Embedding Circle");
1113              }
1114              _ => panic!("Expected remote circle routing"),
1115          }
1116      }
1117      
1118      #[tokio::test]
1119      async fn test_mesh_router_plan_announce() {
1120          let mesh = MeshDiscoveryRouter::new([0xAA; 32]);
1121          
1122          let founder = AgentProfile::new("founder")
1123              .with_pubkey([0xBB; 32])
1124              .with_capability(Capability::inference("mistral-7b"));
1125          
1126          let circle = AgentCircle::new("Mistral Circle", founder);
1127          mesh.register_local_circle(circle).await;
1128          
1129          let ops = mesh.plan_announce_all().await;
1130          
1131          assert_eq!(ops.len(), 1);
1132          match &ops[0] {
1133              DhtOp::Store { capability_keys, .. } => {
1134                  assert_eq!(capability_keys.len(), 1);
1135              }
1136              _ => panic!("Expected Store op"),
1137          }
1138      }
1139      
1140      #[tokio::test]
1141      async fn test_mesh_router_no_capable_circles() {
1142          let mesh = MeshDiscoveryRouter::new([0xAA; 32]);
1143          
1144          let task = Task::new("transcription:whisper", vec![], [0xCC; 32]);
1145          let now = 1700000000;
1146          
1147          let result = mesh.route_task(&task, now).await;
1148          
1149          assert!(result.is_err());
1150          assert!(result.unwrap_err().contains("No circles found"));
1151      }
1152  }