mesh.rs
1 //! Mesh Integration: DHT-based discovery and task routing for Agent Circles. 2 //! 3 //! This module connects Agent Circles to the Abzu mesh network, enabling: 4 //! 5 //! - **Circle discovery**: Announce and find circles via DHT 6 //! - **Task routing**: Distribute inference requests to capable agents 7 //! - **Load balancing**: Spread work across available capacity 8 //! 9 //! ## Architecture 10 //! 11 //! ```text 12 //! ┌────────────────────────────────────────────────────────────┐ 13 //! │ DHT Layer │ 14 //! │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ 15 //! │ │ Circle Alpha │ │ Circle Beta │ │ Circle Gamma │ │ 16 //! │ │ inference:* │ │ embedding:* │ │ image:* │ │ 17 //! │ └──────────────┘ └──────────────┘ └──────────────┘ │ 18 //! └────────────────────────────────────────────────────────────┘ 19 //! │ │ │ 20 //! ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ 21 //! │ Agent │ │ Agent │ │ Agent │ 22 //! │ Router │ │ Router │ │ Router │ 23 //! └─────────┘ └─────────┘ └─────────┘ 24 //! ``` 25 26 use crate::agent_circles::{AgentCircle, AgentProfile, AgentStatus, Capability}; 27 use crate::discovery::{CircleCache, DiscoveryQuery, DhtOp, PublishedCircle, plan_publish, plan_discovery}; 28 use crate::reputation::{ReputationTracker, TaskRecord}; 29 use serde::{Deserialize, Serialize}; 30 use std::collections::HashMap; 31 use std::sync::Arc; 32 use tokio::sync::RwLock; 33 34 /// Circle announcement for DHT publication. 35 #[derive(Debug, Clone, Serialize, Deserialize)] 36 pub struct CircleAnnouncement { 37 /// Circle ID 38 pub circle_id: [u8; 32], 39 40 /// Circle name 41 pub name: String, 42 43 /// Description 44 pub description: Option<String>, 45 46 /// Aggregated capabilities (for discovery) 47 pub capabilities: Vec<String>, 48 49 /// Member count 50 pub member_count: usize, 51 52 /// Available capacity (members ready to work) 53 pub available_count: usize, 54 55 /// Whether open for new members 56 pub open_membership: bool, 57 58 /// Announcement timestamp (Unix ms) 59 pub timestamp: u64, 60 61 /// Announcer's signature 62 pub signature: Vec<u8>, 63 } 64 65 impl CircleAnnouncement { 66 /// Create an announcement from an AgentCircle 67 pub fn from_circle(circle: &AgentCircle) -> Self { 68 let available_count = circle.members 69 .iter() 70 .filter(|m| m.status == AgentStatus::Available) 71 .count(); 72 73 Self { 74 circle_id: circle.id, 75 name: circle.name.clone(), 76 description: circle.description.clone(), 77 capabilities: circle.capabilities.iter().cloned().collect(), 78 member_count: circle.members.len(), 79 available_count, 80 open_membership: circle.open_membership, 81 timestamp: std::time::SystemTime::now() 82 .duration_since(std::time::UNIX_EPOCH) 83 .unwrap() 84 .as_millis() as u64, 85 signature: vec![], // Must be signed by caller 86 } 87 } 88 89 /// Derive DHT key for this announcement 90 pub fn dht_key(&self) -> [u8; 32] { 91 let mut hasher = blake3::Hasher::new(); 92 hasher.update(b"abzu:agent-circle:announcement:"); 93 hasher.update(&self.circle_id); 94 *hasher.finalize().as_bytes() 95 } 96 97 /// Derive capability topic keys for discovery 98 pub fn capability_keys(&self) -> Vec<[u8; 32]> { 99 self.capabilities 100 .iter() 101 .map(|cap| { 102 let mut hasher = blake3::Hasher::new(); 103 hasher.update(b"abzu:agent-circle:capability:"); 104 hasher.update(cap.as_bytes()); 105 *hasher.finalize().as_bytes() 106 }) 107 .collect() 108 } 109 } 110 111 /// Task to be routed to an agent circle. 112 #[derive(Debug, Clone, Serialize, Deserialize)] 113 pub struct Task { 114 /// Unique task ID 115 pub id: [u8; 32], 116 117 /// Required capability 118 pub capability: String, 119 120 /// Task payload (capability-specific) 121 pub payload: Vec<u8>, 122 123 /// Priority (higher = more urgent) 124 pub priority: u8, 125 126 /// Maximum latency allowed (ms, 0 = no limit) 127 pub max_latency_ms: u64, 128 129 /// Requester's public key 130 pub requester: [u8; 32], 131 132 /// Created timestamp 133 pub created_at: u64, 134 } 135 136 impl Task { 137 /// Create a new task 138 pub fn new(capability: impl Into<String>, payload: Vec<u8>, requester: [u8; 32]) -> Self { 139 let mut id = [0u8; 32]; 140 let nonce: [u8; 16] = rand::random(); 141 let mut hasher = blake3::Hasher::new(); 142 hasher.update(&requester); 143 hasher.update(&nonce); 144 id.copy_from_slice(hasher.finalize().as_bytes()); 145 146 Self { 147 id, 148 capability: capability.into(), 149 payload, 150 priority: 5, // Medium default 151 max_latency_ms: 0, 152 requester, 153 created_at: std::time::SystemTime::now() 154 .duration_since(std::time::UNIX_EPOCH) 155 .unwrap() 156 .as_millis() as u64, 157 } 158 } 159 160 /// Set priority (0-10) 161 pub fn with_priority(mut self, priority: u8) -> Self { 162 self.priority = priority.min(10); 163 self 164 } 165 166 /// Set max latency requirement 167 pub fn with_max_latency(mut self, ms: u64) -> Self { 168 self.max_latency_ms = ms; 169 self 170 } 171 } 172 173 /// Result of a completed task. 174 #[derive(Debug, Clone, Serialize, Deserialize)] 175 pub struct TaskResult { 176 /// Task ID this result is for 177 pub task_id: [u8; 32], 178 179 /// Whether the task succeeded 180 pub success: bool, 181 182 /// Result payload (if success) 183 pub payload: Option<Vec<u8>>, 184 185 /// Error message (if failure) 186 pub error: Option<String>, 187 188 /// Agent that completed the task 189 pub completed_by: [u8; 32], 190 191 /// Completion timestamp 192 pub completed_at: u64, 193 194 /// Execution time (ms) 195 pub execution_time_ms: u64, 196 } 197 198 /// Routing strategy for task distribution. 199 #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] 200 pub enum RoutingStrategy { 201 /// Lowest latency first 202 #[default] 203 LowestLatency, 204 205 /// Random selection among capable agents 206 Random, 207 208 /// Round-robin across capable agents 209 RoundRobin, 210 211 /// Highest reputation first 212 HighestReputation, 213 214 /// Least loaded agent 215 LeastLoaded, 216 } 217 218 /// Task router for distributing work across agent circles. 219 pub struct TaskRouter { 220 /// Known circles (by ID) 221 circles: Arc<RwLock<HashMap<[u8; 32], AgentCircle>>>, 222 223 /// Capability index (capability -> circle IDs) 224 capability_index: Arc<RwLock<HashMap<String, Vec<[u8; 32]>>>>, 225 226 /// Round-robin counters (for RoundRobin strategy) 227 rr_counters: Arc<RwLock<HashMap<String, usize>>>, 228 229 /// Routing strategy 230 strategy: RoutingStrategy, 231 232 /// Our node's public key 233 our_pubkey: [u8; 32], 234 } 235 236 impl TaskRouter { 237 /// Create a new task router 238 pub fn new(our_pubkey: [u8; 32]) -> Self { 239 Self { 240 circles: Arc::new(RwLock::new(HashMap::new())), 241 capability_index: Arc::new(RwLock::new(HashMap::new())), 242 rr_counters: Arc::new(RwLock::new(HashMap::new())), 243 strategy: RoutingStrategy::default(), 244 our_pubkey, 245 } 246 } 247 248 /// Set routing strategy 249 pub fn with_strategy(mut self, strategy: RoutingStrategy) -> Self { 250 self.strategy = strategy; 251 self 252 } 253 254 /// Register a circle with the router 255 pub async fn register_circle(&self, circle: AgentCircle) { 256 let circle_id = circle.id; 257 258 // Update capability index 259 { 260 let mut index = self.capability_index.write().await; 261 for cap in &circle.capabilities { 262 index.entry(cap.clone()) 263 .or_insert_with(Vec::new) 264 .push(circle_id); 265 } 266 } 267 268 // Store circle 269 { 270 let mut circles = self.circles.write().await; 271 circles.insert(circle_id, circle); 272 } 273 } 274 275 /// Unregister a circle 276 pub async fn unregister_circle(&self, circle_id: &[u8; 32]) { 277 let circle = { 278 let mut circles = self.circles.write().await; 279 circles.remove(circle_id) 280 }; 281 282 // Clean up capability index 283 if let Some(circle) = circle { 284 let mut index = self.capability_index.write().await; 285 for cap in &circle.capabilities { 286 if let Some(ids) = index.get_mut(cap) { 287 ids.retain(|id| id != circle_id); 288 } 289 } 290 } 291 } 292 293 /// Find circles with a specific capability 294 pub async fn find_capable(&self, capability: &str) -> Vec<AgentCircle> { 295 let circle_ids = { 296 let index = self.capability_index.read().await; 297 index.get(capability).cloned().unwrap_or_default() 298 }; 299 300 let circles = self.circles.read().await; 301 circle_ids.iter() 302 .filter_map(|id| circles.get(id).cloned()) 303 .collect() 304 } 305 306 /// Route a task to the best available agent 307 pub async fn route(&self, task: &Task) -> Result<AgentProfile, String> { 308 let circles = self.find_capable(&task.capability).await; 309 310 if circles.is_empty() { 311 return Err(format!("No circles with capability: {}", task.capability)); 312 } 313 314 // Collect all available agents with the capability 315 let mut candidates: Vec<(&AgentCircle, &AgentProfile)> = vec![]; 316 317 for circle in &circles { 318 for member in &circle.members { 319 if member.status == AgentStatus::Available { 320 // Check if member has the specific capability 321 let has_cap = member.capabilities.iter().any(|c| { 322 capability_matches(c, &task.capability) 323 }); 324 325 if has_cap { 326 candidates.push((circle, member)); 327 } 328 } 329 } 330 } 331 332 if candidates.is_empty() { 333 return Err("No available agents for capability".into()); 334 } 335 336 // Apply routing strategy 337 let selected = match self.strategy { 338 RoutingStrategy::LowestLatency => { 339 // For now, just pick first (latency tracking TODO) 340 candidates.first() 341 } 342 RoutingStrategy::Random => { 343 use rand::seq::SliceRandom; 344 candidates.choose(&mut rand::thread_rng()) 345 } 346 RoutingStrategy::RoundRobin => { 347 let mut counters = self.rr_counters.write().await; 348 let counter = counters.entry(task.capability.clone()).or_insert(0); 349 let idx = *counter % candidates.len(); 350 *counter = counter.wrapping_add(1); 351 candidates.get(idx) 352 } 353 RoutingStrategy::HighestReputation => { 354 candidates.iter() 355 .max_by(|a, b| { 356 a.1.reputation.partial_cmp(&b.1.reputation).unwrap() 357 }) 358 } 359 RoutingStrategy::LeastLoaded => { 360 // Use member count as proxy for load (TODO: actual load tracking) 361 candidates.iter() 362 .min_by_key(|(circle, _)| circle.members.len()) 363 } 364 }; 365 366 selected 367 .map(|(_, agent)| (*agent).clone()) 368 .ok_or_else(|| "No agent selected".into()) 369 } 370 371 /// Get statistics about registered circles 372 pub async fn stats(&self) -> RouterStats { 373 let circles = self.circles.read().await; 374 let index = self.capability_index.read().await; 375 376 let total_members: usize = circles.values().map(|c| c.members.len()).sum(); 377 let available_members: usize = circles.values() 378 .flat_map(|c| c.members.iter()) 379 .filter(|m| m.status == AgentStatus::Available) 380 .count(); 381 382 RouterStats { 383 circle_count: circles.len(), 384 capability_count: index.len(), 385 total_members, 386 available_members, 387 } 388 } 389 } 390 391 /// Check if a capability matches a query string 392 fn capability_matches(cap: &Capability, query: &str) -> bool { 393 let cap_name = match cap { 394 Capability::Inference { model, .. } => format!("inference:{}", model), 395 Capability::Embedding { model, .. } => format!("embedding:{}", model), 396 Capability::ImageGeneration { model } => format!("image:{}", model), 397 Capability::Transcription { model } => format!("transcription:{}", model), 398 Capability::Speech { model } => format!("speech:{}", model), 399 Capability::CodeExecution { runtime } => format!("code:{}", runtime), 400 Capability::WebBrowsing => "web".into(), 401 Capability::Storage { .. } => "storage".into(), 402 Capability::Custom { name, .. } => format!("custom:{}", name), 403 }; 404 405 // Exact match or wildcard 406 cap_name == query || query.ends_with(":*") && cap_name.starts_with(&query[..query.len()-1]) 407 } 408 409 /// Router statistics. 410 #[derive(Debug, Clone, Default)] 411 pub struct RouterStats { 412 pub circle_count: usize, 413 pub capability_count: usize, 414 pub total_members: usize, 415 pub available_members: usize, 416 } 417 418 // ============================================================================ 419 // DHT-Integrated Mesh Discovery 420 // ============================================================================ 421 422 /// Mesh discovery router with DHT integration. 423 /// 424 /// Combines local circles (direct membership) with discovered circles (via DHT) 425 /// for cross-mesh task routing. Pure logic - I/O layer executes DHT operations. 426 /// 427 /// ## Example 428 /// 429 /// ```ignore 430 /// let mesh = MeshDiscoveryRouter::new([0x01; 32]); 431 /// 432 /// // Register our local circle 433 /// mesh.register_local_circle(my_circle).await; 434 /// 435 /// // Publish to DHT (returns op for I/O layer) 436 /// let ops = mesh.plan_announce_all().await; 437 /// for op in ops { 438 /// dht_io.execute(op).await; 439 /// } 440 /// 441 /// // Discover remote circles (returns query op) 442 /// let query_op = mesh.plan_discover_capability("inference:llama".into()); 443 /// let responses = dht_io.execute(query_op).await; 444 /// mesh.ingest_discovery(responses).await; 445 /// 446 /// // Route now considers both local AND discovered circles 447 /// let target = mesh.route_task(&task).await?; 448 /// ``` 449 pub struct MeshDiscoveryRouter { 450 /// Our node's public key 451 pub our_pubkey: [u8; 32], 452 453 /// Local circles (we're a member of these) 454 local_circles: Arc<RwLock<HashMap<[u8; 32], AgentCircle>>>, 455 456 /// Discovered circles from DHT (cached) 457 discovered_cache: Arc<RwLock<CircleCache>>, 458 459 /// Capability index for local circles 460 local_capability_index: Arc<RwLock<HashMap<String, Vec<[u8; 32]>>>>, 461 462 /// Our announcements (for re-publishing) 463 our_announcements: Arc<RwLock<HashMap<[u8; 32], PublishedCircle>>>, 464 465 /// Circle-level reputation tracking (for discovered circles) 466 circle_reputation: Arc<RwLock<ReputationTracker>>, 467 468 /// Routing strategy 469 strategy: RoutingStrategy, 470 } 471 472 impl MeshDiscoveryRouter { 473 /// Create a new mesh discovery router. 474 pub fn new(our_pubkey: [u8; 32]) -> Self { 475 Self { 476 our_pubkey, 477 local_circles: Arc::new(RwLock::new(HashMap::new())), 478 discovered_cache: Arc::new(RwLock::new(CircleCache::new())), 479 local_capability_index: Arc::new(RwLock::new(HashMap::new())), 480 our_announcements: Arc::new(RwLock::new(HashMap::new())), 481 circle_reputation: Arc::new(RwLock::new(ReputationTracker::new())), 482 strategy: RoutingStrategy::default(), 483 } 484 } 485 486 /// Set routing strategy. 487 pub fn with_strategy(mut self, strategy: RoutingStrategy) -> Self { 488 self.strategy = strategy; 489 self 490 } 491 492 // ------------------------------------------------------------------------ 493 // Local Circle Management 494 // ------------------------------------------------------------------------ 495 496 /// Register a local circle (we're a member). 497 pub async fn register_local_circle(&self, circle: AgentCircle) { 498 let circle_id = circle.id; 499 500 // Update capability index 501 { 502 let mut index = self.local_capability_index.write().await; 503 for cap in &circle.capabilities { 504 index.entry(cap.clone()) 505 .or_default() 506 .push(circle_id); 507 } 508 } 509 510 // Create announcement for DHT publishing 511 let announcement = CircleAnnouncement::from_circle(&circle); 512 let now = std::time::SystemTime::now() 513 .duration_since(std::time::UNIX_EPOCH) 514 .unwrap() 515 .as_secs(); 516 let published = PublishedCircle::new(circle_id, announcement, now); 517 518 { 519 let mut announcements = self.our_announcements.write().await; 520 announcements.insert(circle_id, published); 521 } 522 523 // Store circle 524 { 525 let mut circles = self.local_circles.write().await; 526 circles.insert(circle_id, circle); 527 } 528 } 529 530 /// Unregister a local circle. 531 pub async fn unregister_local_circle(&self, circle_id: &[u8; 32]) { 532 // Remove from local storage 533 let circle = { 534 let mut circles = self.local_circles.write().await; 535 circles.remove(circle_id) 536 }; 537 538 // Clean up capability index 539 if let Some(circle) = circle { 540 let mut index = self.local_capability_index.write().await; 541 for cap in &circle.capabilities { 542 if let Some(ids) = index.get_mut(cap) { 543 ids.retain(|id| id != circle_id); 544 } 545 } 546 } 547 548 // Remove announcement 549 { 550 let mut announcements = self.our_announcements.write().await; 551 announcements.remove(circle_id); 552 } 553 } 554 555 // ------------------------------------------------------------------------ 556 // DHT Operations (Pure Logic - I/O layer executes) 557 // ------------------------------------------------------------------------ 558 559 /// Plan DHT announce for all our circles. 560 /// Returns DhtOps that the I/O layer should execute. 561 pub async fn plan_announce_all(&self) -> Vec<DhtOp> { 562 let announcements = self.our_announcements.read().await; 563 announcements.values() 564 .filter_map(|p| plan_publish(p).ok()) 565 .collect() 566 } 567 568 /// Plan DHT announce for a specific circle. 569 pub async fn plan_announce(&self, circle_id: &[u8; 32]) -> Option<DhtOp> { 570 let announcements = self.our_announcements.read().await; 571 announcements.get(circle_id) 572 .and_then(|p| plan_publish(p).ok()) 573 } 574 575 /// Plan discovery query by capability. 576 pub fn plan_discover_capability(&self, capability: String) -> Option<DhtOp> { 577 let query = DiscoveryQuery::ByCapability(capability); 578 plan_discovery(&query) 579 } 580 581 /// Plan discovery query by circle ID. 582 pub fn plan_discover_circle(&self, circle_id: [u8; 32]) -> Option<DhtOp> { 583 let query = DiscoveryQuery::ByCircleId(circle_id); 584 plan_discovery(&query) 585 } 586 587 /// Ingest discovered circles from DHT responses. 588 /// Call this with the payload from successful DHT queries. 589 pub async fn ingest_discovery(&self, payloads: Vec<Vec<u8>>) -> usize { 590 let mut cache = self.discovered_cache.write().await; 591 let mut count = 0; 592 593 for payload in payloads { 594 if let Ok(circle) = PublishedCircle::from_bytes(&payload) { 595 cache.insert(circle); 596 count += 1; 597 } 598 } 599 600 count 601 } 602 603 /// Garbage collect expired discovered circles. 604 pub async fn gc_discovered(&self, now: u64) { 605 let mut cache = self.discovered_cache.write().await; 606 cache.gc(now); 607 } 608 609 // ------------------------------------------------------------------------ 610 // Circle Reputation Tracking 611 // ------------------------------------------------------------------------ 612 613 /// Record a task completion for a discovered circle. 614 /// This updates the circle's reputation based on task outcomes. 615 pub async fn record_circle_task( 616 &self, 617 circle_id: [u8; 32], 618 record: TaskRecord, 619 ) { 620 let mut reputation = self.circle_reputation.write().await; 621 reputation.record(circle_id, record); 622 } 623 624 /// Get reputation score for a discovered circle. 625 pub async fn get_circle_reputation(&self, circle_id: &[u8; 32]) -> Option<f32> { 626 let reputation = self.circle_reputation.read().await; 627 reputation.get(circle_id).map(|s| s.score) 628 } 629 630 /// Get top circles by reputation. 631 pub async fn top_circles(&self, n: usize) -> Vec<([u8; 32], f32)> { 632 let reputation = self.circle_reputation.read().await; 633 reputation.top_agents(n) 634 .into_iter() 635 .map(|(id, score)| (id, score.score)) 636 .collect() 637 } 638 639 /// Get circles that should be avoided (low reputation). 640 pub async fn blacklisted_circles(&self) -> Vec<[u8; 32]> { 641 let reputation = self.circle_reputation.read().await; 642 reputation.blacklisted_agents() 643 } 644 645 // ------------------------------------------------------------------------ 646 // Announcement Triggers 647 // ------------------------------------------------------------------------ 648 649 /// Trigger an announcement when a circle's state changes. 650 /// Returns DhtOps to publish if an announcement is warranted. 651 pub async fn on_circle_state_change( 652 &self, 653 circle_id: [u8; 32], 654 now: u64, 655 _ttl: u64, 656 ) -> Vec<DhtOp> { 657 let circles = self.local_circles.read().await; 658 659 if let Some(circle) = circles.get(&circle_id) { 660 // Create announcement for updated circle state 661 let announcement = CircleAnnouncement::from_circle(circle); 662 let published = PublishedCircle::new( 663 circle_id, 664 announcement.clone(), 665 now, 666 ); 667 668 // Update our cached announcement 669 drop(circles); // Release read lock 670 let mut our_announcements = self.our_announcements.write().await; 671 our_announcements.insert(circle_id, published.clone()); 672 673 // Plan the DHT store - returns single op wrapped in vec 674 if let Ok(op) = plan_publish(&published) { 675 return vec![op]; 676 } 677 } 678 679 vec![] 680 } 681 682 /// Trigger announcements for all circles where members have changed. 683 /// Call this after agent joins/leaves or status changes. 684 pub async fn on_member_change( 685 &self, 686 circle_id: [u8; 32], 687 now: u64, 688 ttl: u64, 689 ) -> Vec<DhtOp> { 690 self.on_circle_state_change(circle_id, now, ttl).await 691 } 692 693 /// Trigger announcements for all circles where capabilities have changed. 694 pub async fn on_capability_change( 695 &self, 696 circle_id: [u8; 32], 697 now: u64, 698 ttl: u64, 699 ) -> Vec<DhtOp> { 700 self.on_circle_state_change(circle_id, now, ttl).await 701 } 702 703 // ------------------------------------------------------------------------ 704 // Routing (Local + Discovered) 705 // ------------------------------------------------------------------------ 706 707 /// Find all circles (local + discovered) with a capability. 708 pub async fn find_capable_circles(&self, capability: &str, now: u64) -> MeshDiscoveryResult { 709 let mut result = MeshDiscoveryResult::default(); 710 711 // Local circles 712 { 713 let index = self.local_capability_index.read().await; 714 if let Some(ids) = index.get(capability) { 715 let circles = self.local_circles.read().await; 716 for id in ids { 717 if let Some(circle) = circles.get(id) { 718 result.local_circles.push(circle.clone()); 719 } 720 } 721 } 722 } 723 724 // Discovered circles 725 { 726 let cache = self.discovered_cache.read().await; 727 let query = DiscoveryQuery::ByCapability(capability.to_string()); 728 result.discovered_circles = cache.query(&query, now); 729 } 730 731 result 732 } 733 734 /// Route a task to the best available target. 735 /// Returns either a local agent or a remote circle endpoint. 736 pub async fn route_task(&self, task: &Task, now: u64) -> Result<RoutingTarget, String> { 737 let discovery = self.find_capable_circles(&task.capability, now).await; 738 739 // First, try local circles (lower latency) 740 if !discovery.local_circles.is_empty() { 741 let mut candidates: Vec<(&AgentCircle, &AgentProfile)> = vec![]; 742 743 for circle in &discovery.local_circles { 744 for member in &circle.members { 745 if member.status == AgentStatus::Available { 746 let has_cap = member.capabilities.iter() 747 .any(|c| capability_matches(c, &task.capability)); 748 if has_cap { 749 candidates.push((circle, member)); 750 } 751 } 752 } 753 } 754 755 if !candidates.is_empty() { 756 let selected = self.select_candidate(&candidates, task).await; 757 if let Some((circle, agent)) = selected { 758 return Ok(RoutingTarget::LocalAgent { 759 circle_id: circle.id, 760 agent_pubkey: agent.pubkey, 761 agent_id: agent.id.clone(), 762 }); 763 } 764 } 765 } 766 767 // Fall back to discovered circles (cross-mesh) 768 if !discovery.discovered_circles.is_empty() { 769 // Pick best discovered circle based on strategy 770 let reputation = self.circle_reputation.read().await; 771 let selected = match self.strategy { 772 RoutingStrategy::LowestLatency | RoutingStrategy::LeastLoaded => { 773 // Prefer circles with more available capacity 774 discovery.discovered_circles.iter() 775 .max_by_key(|c| c.announcement.available_count) 776 } 777 RoutingStrategy::HighestReputation => { 778 // Select circle with highest reputation score 779 discovery.discovered_circles.iter() 780 .max_by(|a, b| { 781 let rep_a = reputation.get(&a.circle_id).map(|s| s.score).unwrap_or(0.5); 782 let rep_b = reputation.get(&b.circle_id).map(|s| s.score).unwrap_or(0.5); 783 rep_a.partial_cmp(&rep_b).unwrap_or(std::cmp::Ordering::Equal) 784 }) 785 } 786 _ => discovery.discovered_circles.first(), 787 }; 788 789 if let Some(circle) = selected { 790 return Ok(RoutingTarget::RemoteCircle { 791 circle_id: circle.circle_id, 792 announcement: circle.announcement.clone(), 793 }); 794 } 795 } 796 797 Err(format!("No circles found for capability: {}", task.capability)) 798 } 799 800 /// Select best candidate from local options. 801 async fn select_candidate<'a>( 802 &self, 803 candidates: &[(&'a AgentCircle, &'a AgentProfile)], 804 _task: &Task, 805 ) -> Option<(&'a AgentCircle, &'a AgentProfile)> { 806 match self.strategy { 807 RoutingStrategy::LowestLatency => candidates.first().cloned(), 808 RoutingStrategy::Random => { 809 use rand::seq::SliceRandom; 810 candidates.choose(&mut rand::thread_rng()).cloned() 811 } 812 RoutingStrategy::HighestReputation => { 813 candidates.iter() 814 .max_by(|a, b| { 815 a.1.reputation.partial_cmp(&b.1.reputation) 816 .unwrap_or(std::cmp::Ordering::Equal) 817 }) 818 .cloned() 819 } 820 RoutingStrategy::LeastLoaded => { 821 candidates.iter() 822 .min_by_key(|(circle, _)| circle.members.len()) 823 .cloned() 824 } 825 RoutingStrategy::RoundRobin => { 826 // Simplified: just pick first for now 827 candidates.first().cloned() 828 } 829 } 830 } 831 832 /// Get mesh statistics. 833 pub async fn stats(&self, now: u64) -> MeshStats { 834 let local_circles = self.local_circles.read().await; 835 let discovered_cache = self.discovered_cache.read().await; 836 let local_index = self.local_capability_index.read().await; 837 838 let local_members: usize = local_circles.values().map(|c| c.members.len()).sum(); 839 let local_available: usize = local_circles.values() 840 .flat_map(|c| c.members.iter()) 841 .filter(|m| m.status == AgentStatus::Available) 842 .count(); 843 844 let discovered_circles = discovered_cache.query(&DiscoveryQuery::AllKnown, now); 845 let discovered_capacity: usize = discovered_circles.iter() 846 .map(|c| c.announcement.available_count) 847 .sum(); 848 849 MeshStats { 850 local_circle_count: local_circles.len(), 851 local_capability_count: local_index.len(), 852 local_member_count: local_members, 853 local_available_count: local_available, 854 discovered_circle_count: discovered_circles.len(), 855 discovered_capacity, 856 } 857 } 858 } 859 860 /// Result of mesh discovery. 861 #[derive(Debug, Clone, Default)] 862 pub struct MeshDiscoveryResult { 863 /// Circles we're a member of 864 pub local_circles: Vec<AgentCircle>, 865 /// Circles discovered via DHT 866 pub discovered_circles: Vec<PublishedCircle>, 867 } 868 869 /// Routing target for a task. 870 #[derive(Debug, Clone)] 871 pub enum RoutingTarget { 872 /// Route to a local agent (same mesh) 873 LocalAgent { 874 circle_id: [u8; 32], 875 agent_pubkey: [u8; 32], 876 agent_id: String, 877 }, 878 /// Route to a remote circle (cross-mesh via DHT) 879 RemoteCircle { 880 circle_id: [u8; 32], 881 announcement: CircleAnnouncement, 882 }, 883 } 884 885 /// Mesh statistics. 886 #[derive(Debug, Clone, Default)] 887 pub struct MeshStats { 888 pub local_circle_count: usize, 889 pub local_capability_count: usize, 890 pub local_member_count: usize, 891 pub local_available_count: usize, 892 pub discovered_circle_count: usize, 893 pub discovered_capacity: usize, 894 } 895 896 897 #[cfg(test)] 898 mod tests { 899 use super::*; 900 901 #[test] 902 fn test_circle_announcement() { 903 let founder = AgentProfile::new("founder").with_pubkey([1u8; 32]); 904 let circle = AgentCircle::new("Test Circle", founder); 905 906 let announcement = CircleAnnouncement::from_circle(&circle); 907 908 assert_eq!(announcement.circle_id, circle.id); 909 assert_eq!(announcement.name, "Test Circle"); 910 assert_eq!(announcement.member_count, 1); 911 assert_eq!(announcement.available_count, 1); 912 } 913 914 #[test] 915 fn test_capability_keys() { 916 let mut founder = AgentProfile::new("founder").with_pubkey([1u8; 32]); 917 founder.capabilities.push(Capability::inference("llama-7b")); 918 919 let circle = AgentCircle::new("Test", founder); 920 let announcement = CircleAnnouncement::from_circle(&circle); 921 922 let keys = announcement.capability_keys(); 923 assert_eq!(keys.len(), 1); 924 } 925 926 #[test] 927 fn test_task_creation() { 928 let task = Task::new("inference:llama-7b", vec![1, 2, 3], [1u8; 32]) 929 .with_priority(8) 930 .with_max_latency(5000); 931 932 assert_eq!(task.capability, "inference:llama-7b"); 933 assert_eq!(task.priority, 8); 934 assert_eq!(task.max_latency_ms, 5000); 935 } 936 937 #[tokio::test] 938 async fn test_router_registration() { 939 let router = TaskRouter::new([1u8; 32]); 940 941 let founder = AgentProfile::new("founder") 942 .with_pubkey([2u8; 32]) 943 .with_capability(Capability::inference("llama-7b")); 944 945 let circle = AgentCircle::new("Test Circle", founder); 946 router.register_circle(circle).await; 947 948 let stats = router.stats().await; 949 assert_eq!(stats.circle_count, 1); 950 assert_eq!(stats.total_members, 1); 951 } 952 953 #[tokio::test] 954 async fn test_router_find_capable() { 955 let router = TaskRouter::new([1u8; 32]); 956 957 let founder = AgentProfile::new("founder") 958 .with_pubkey([2u8; 32]) 959 .with_capability(Capability::inference("llama-7b")); 960 961 let circle = AgentCircle::new("LLM Circle", founder); 962 router.register_circle(circle).await; 963 964 let llm_circles = router.find_capable("inference:llama-7b").await; 965 assert_eq!(llm_circles.len(), 1); 966 967 let empty = router.find_capable("embedding:bge").await; 968 assert!(empty.is_empty()); 969 } 970 971 #[tokio::test] 972 async fn test_router_route_task() { 973 let router = TaskRouter::new([1u8; 32]); 974 975 let founder = AgentProfile::new("founder") 976 .with_pubkey([2u8; 32]) 977 .with_capability(Capability::inference("llama-7b")); 978 979 let circle = AgentCircle::new("Test", founder); 980 router.register_circle(circle).await; 981 982 let task = Task::new("inference:llama-7b", vec![], [3u8; 32]); 983 let agent = router.route(&task).await.unwrap(); 984 985 assert_eq!(agent.id, "founder"); 986 } 987 988 #[test] 989 fn test_capability_matches() { 990 assert!(capability_matches( 991 &Capability::inference("llama-7b"), 992 "inference:llama-7b" 993 )); 994 995 assert!(capability_matches( 996 &Capability::inference("llama-7b"), 997 "inference:*" 998 )); 999 1000 assert!(!capability_matches( 1001 &Capability::inference("llama-7b"), 1002 "embedding:bge" 1003 )); 1004 } 1005 1006 // ======================================================================== 1007 // MeshDiscoveryRouter Tests 1008 // ======================================================================== 1009 1010 #[tokio::test] 1011 async fn test_mesh_router_local_registration() { 1012 let mesh = MeshDiscoveryRouter::new([0xAA; 32]); 1013 1014 let founder = AgentProfile::new("founder") 1015 .with_pubkey([0xBB; 32]) 1016 .with_capability(Capability::inference("llama-7b")); 1017 1018 let circle = AgentCircle::new("LLM Circle", founder); 1019 mesh.register_local_circle(circle).await; 1020 1021 let now = 1700000000; 1022 let stats = mesh.stats(now).await; 1023 1024 assert_eq!(stats.local_circle_count, 1); 1025 assert_eq!(stats.local_member_count, 1); 1026 assert_eq!(stats.local_available_count, 1); 1027 } 1028 1029 #[tokio::test] 1030 async fn test_mesh_router_local_routing() { 1031 let mesh = MeshDiscoveryRouter::new([0xAA; 32]); 1032 1033 let founder = AgentProfile::new("founder") 1034 .with_pubkey([0xBB; 32]) 1035 .with_capability(Capability::inference("llama-7b")); 1036 1037 let circle = AgentCircle::new("LLM Circle", founder); 1038 mesh.register_local_circle(circle).await; 1039 1040 let task = Task::new("inference:llama-7b", vec![], [0xCC; 32]); 1041 let now = 1700000000; 1042 1043 let target = mesh.route_task(&task, now).await.unwrap(); 1044 1045 match target { 1046 RoutingTarget::LocalAgent { agent_id, .. } => { 1047 assert_eq!(agent_id, "founder"); 1048 } 1049 _ => panic!("Expected local agent routing"), 1050 } 1051 } 1052 1053 #[tokio::test] 1054 async fn test_mesh_router_discovery_ingestion() { 1055 let mesh = MeshDiscoveryRouter::new([0xAA; 32]); 1056 1057 // Create a "discovered" circle announcement 1058 let announcement = CircleAnnouncement { 1059 circle_id: [0x01; 32], 1060 name: "Remote LLM Circle".to_string(), 1061 description: None, 1062 capabilities: vec!["inference:claude".to_string()], 1063 member_count: 5, 1064 available_count: 3, 1065 open_membership: true, 1066 timestamp: 1700000000, 1067 signature: vec![], 1068 }; 1069 1070 let published = PublishedCircle::new([0x01; 32], announcement, 1700000000); 1071 let payload = published.to_bytes().unwrap(); 1072 1073 // Ingest discovery result 1074 let count = mesh.ingest_discovery(vec![payload]).await; 1075 assert_eq!(count, 1); 1076 1077 let now = 1700000000; 1078 let stats = mesh.stats(now).await; 1079 1080 assert_eq!(stats.discovered_circle_count, 1); 1081 assert_eq!(stats.discovered_capacity, 3); 1082 } 1083 1084 #[tokio::test] 1085 async fn test_mesh_router_cross_mesh_fallback() { 1086 let mesh = MeshDiscoveryRouter::new([0xAA; 32]); 1087 1088 // No local circles - but we have a discovered one 1089 let announcement = CircleAnnouncement { 1090 circle_id: [0x02; 32], 1091 name: "Remote Embedding Circle".to_string(), 1092 description: None, 1093 capabilities: vec!["embedding:bge".to_string()], 1094 member_count: 3, 1095 available_count: 2, 1096 open_membership: false, 1097 timestamp: 1700000000, 1098 signature: vec![], 1099 }; 1100 1101 let published = PublishedCircle::new([0x02; 32], announcement.clone(), 1700000000); 1102 mesh.ingest_discovery(vec![published.to_bytes().unwrap()]).await; 1103 1104 let task = Task::new("embedding:bge", vec![], [0xCC; 32]); 1105 let now = 1700000000; 1106 1107 let target = mesh.route_task(&task, now).await.unwrap(); 1108 1109 match target { 1110 RoutingTarget::RemoteCircle { circle_id, announcement: ann } => { 1111 assert_eq!(circle_id, [0x02; 32]); 1112 assert_eq!(ann.name, "Remote Embedding Circle"); 1113 } 1114 _ => panic!("Expected remote circle routing"), 1115 } 1116 } 1117 1118 #[tokio::test] 1119 async fn test_mesh_router_plan_announce() { 1120 let mesh = MeshDiscoveryRouter::new([0xAA; 32]); 1121 1122 let founder = AgentProfile::new("founder") 1123 .with_pubkey([0xBB; 32]) 1124 .with_capability(Capability::inference("mistral-7b")); 1125 1126 let circle = AgentCircle::new("Mistral Circle", founder); 1127 mesh.register_local_circle(circle).await; 1128 1129 let ops = mesh.plan_announce_all().await; 1130 1131 assert_eq!(ops.len(), 1); 1132 match &ops[0] { 1133 DhtOp::Store { capability_keys, .. } => { 1134 assert_eq!(capability_keys.len(), 1); 1135 } 1136 _ => panic!("Expected Store op"), 1137 } 1138 } 1139 1140 #[tokio::test] 1141 async fn test_mesh_router_no_capable_circles() { 1142 let mesh = MeshDiscoveryRouter::new([0xAA; 32]); 1143 1144 let task = Task::new("transcription:whisper", vec![], [0xCC; 32]); 1145 let now = 1700000000; 1146 1147 let result = mesh.route_task(&task, now).await; 1148 1149 assert!(result.is_err()); 1150 assert!(result.unwrap_err().contains("No circles found")); 1151 } 1152 }