discovery.rs
1 //! Circle Discovery over DHT 2 //! 3 //! Bridges Agent Circles with the Abzu DHT for decentralized discovery. 4 //! Circles publish their presence and capabilities to the mesh, 5 //! allowing other agents to find collaborators. 6 //! 7 //! ## DHT Key Structure 8 //! 9 //! | Query Type | DHT Key | 10 //! |------------|---------| 11 //! | Find circles by capability | `BLAKE3("capability:" ∥ capability_name)` | 12 //! | Find circle by ID | `BLAKE3("circle:" ∥ circle_id)` | 13 //! | Find agent by pubkey | Agent's public key directly | 14 15 use serde::{Deserialize, Serialize}; 16 use std::collections::HashMap; 17 18 use crate::mesh::CircleAnnouncement; 19 20 /// DHT value type for circle announcements 21 /// Uses ValueType::Application (255) with this discriminator prefix 22 pub const CIRCLE_ANNOUNCE_PREFIX: &[u8] = b"abzu.circle.v1:"; 23 24 /// Key prefixes for different discovery queries 25 pub const CAPABILITY_PREFIX: &[u8] = b"capability:"; 26 pub const CIRCLE_PREFIX: &[u8] = b"circle:"; 27 28 /// A published circle announcement ready for DHT storage 29 #[derive(Debug, Clone, Serialize, Deserialize)] 30 pub struct PublishedCircle { 31 /// Circle ID (BLAKE3 hash of founder + nonce) 32 pub circle_id: [u8; 32], 33 /// The announcement payload 34 pub announcement: CircleAnnouncement, 35 /// Signature over the announcement (by circle's signing key) 36 pub signature: Vec<u8>, 37 /// Timestamp of publication 38 pub published_at: u64, 39 /// TTL in seconds (circles should re-announce periodically) 40 pub ttl: u64, 41 } 42 43 impl PublishedCircle { 44 /// Default TTL: 1 hour (circles should heartbeat) 45 pub const DEFAULT_TTL: u64 = 3600; 46 47 /// Create a new published circle (signing happens at transport layer) 48 pub fn new(circle_id: [u8; 32], announcement: CircleAnnouncement, now: u64) -> Self { 49 Self { 50 circle_id, 51 announcement, 52 signature: vec![0u8; 64], // Placeholder - signed at transport 53 published_at: now, 54 ttl: Self::DEFAULT_TTL, 55 } 56 } 57 58 /// Serialize to bytes for DHT storage (using bincode) 59 pub fn to_bytes(&self) -> Result<Vec<u8>, String> { 60 let mut payload = CIRCLE_ANNOUNCE_PREFIX.to_vec(); 61 let data = bincode::serialize(&self).map_err(|e| e.to_string())?; 62 payload.extend(data); 63 Ok(payload) 64 } 65 66 /// Deserialize from DHT-stored bytes 67 pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> { 68 if !bytes.starts_with(CIRCLE_ANNOUNCE_PREFIX) { 69 return Err("Invalid prefix for circle announcement".to_string()); 70 } 71 let data = &bytes[CIRCLE_ANNOUNCE_PREFIX.len()..]; 72 bincode::deserialize(data).map_err(|e| e.to_string()) 73 } 74 75 /// Check if this announcement has expired 76 pub fn is_expired(&self, now: u64) -> bool { 77 now > self.published_at + self.ttl 78 } 79 } 80 81 /// Discovery query: what to search for in the DHT 82 #[derive(Debug, Clone)] 83 pub enum DiscoveryQuery { 84 /// Find circles that offer a specific capability (as string, e.g. "inference:llama-7b") 85 ByCapability(String), 86 /// Find a specific circle by ID 87 ByCircleId([u8; 32]), 88 /// Find circles where a specific agent is a member 89 ByAgentMember([u8; 32]), 90 /// Find all circles (local cache only - no DHT broadcast) 91 AllKnown, 92 } 93 94 impl DiscoveryQuery { 95 /// Generate the DHT key for this query 96 pub fn dht_key(&self) -> Option<[u8; 32]> { 97 match self { 98 DiscoveryQuery::ByCapability(cap) => { 99 let mut input = CAPABILITY_PREFIX.to_vec(); 100 input.extend(cap.as_bytes()); 101 let hash = blake3::hash(&input); 102 Some(*hash.as_bytes()) 103 } 104 DiscoveryQuery::ByCircleId(id) => { 105 // Circle ID is already a hash, use it directly 106 Some(*id) 107 } 108 DiscoveryQuery::ByAgentMember(pubkey) => { 109 // Agent membership discovery uses agent's key 110 Some(*pubkey) 111 } 112 DiscoveryQuery::AllKnown => None, // Local-only 113 } 114 } 115 } 116 117 /// Result from a discovery query 118 #[derive(Debug, Clone)] 119 pub struct DiscoveryResult { 120 /// Found circles matching the query 121 pub circles: Vec<PublishedCircle>, 122 /// Query latency in milliseconds 123 pub query_ms: u64, 124 /// Whether results came from cache or network 125 pub from_cache: bool, 126 } 127 128 /// Local cache for discovered circles 129 #[derive(Debug, Default)] 130 pub struct CircleCache { 131 /// Circles by ID 132 by_id: HashMap<[u8; 32], PublishedCircle>, 133 /// Index: capability -> circle IDs 134 by_capability: HashMap<String, Vec<[u8; 32]>>, 135 } 136 137 impl CircleCache { 138 pub fn new() -> Self { 139 Self::default() 140 } 141 142 /// Insert a discovered circle into cache 143 pub fn insert(&mut self, circle: PublishedCircle) { 144 let id = circle.circle_id; 145 146 // Index by capabilities (CircleAnnouncement.capabilities is Vec<String>) 147 for cap in &circle.announcement.capabilities { 148 self.by_capability 149 .entry(cap.clone()) 150 .or_default() 151 .push(id); 152 } 153 154 // Store the circle 155 self.by_id.insert(id, circle); 156 } 157 158 /// Query the cache 159 pub fn query(&self, query: &DiscoveryQuery, now: u64) -> Vec<PublishedCircle> { 160 match query { 161 DiscoveryQuery::ByCapability(cap) => { 162 self.by_capability 163 .get(cap) 164 .map(|ids| { 165 ids.iter() 166 .filter_map(|id| self.by_id.get(id)) 167 .filter(|c| !c.is_expired(now)) 168 .cloned() 169 .collect() 170 }) 171 .unwrap_or_default() 172 } 173 DiscoveryQuery::ByCircleId(id) => { 174 self.by_id 175 .get(id) 176 .filter(|c| !c.is_expired(now)) 177 .cloned() 178 .into_iter() 179 .collect() 180 } 181 DiscoveryQuery::ByAgentMember(_pubkey) => { 182 // CircleAnnouncement doesn't expose member list for privacy 183 // This would require a separate index or DHT query 184 vec![] 185 } 186 DiscoveryQuery::AllKnown => { 187 self.by_id 188 .values() 189 .filter(|c| !c.is_expired(now)) 190 .cloned() 191 .collect() 192 } 193 } 194 } 195 196 /// Remove expired entries 197 pub fn gc(&mut self, now: u64) { 198 // Collect expired IDs 199 let expired: Vec<[u8; 32]> = self.by_id 200 .iter() 201 .filter(|(_, c)| c.is_expired(now)) 202 .map(|(id, _)| *id) 203 .collect(); 204 205 // Remove from main storage 206 for id in &expired { 207 self.by_id.remove(id); 208 } 209 210 // Clean up indices 211 for ids in self.by_capability.values_mut() { 212 ids.retain(|id| !expired.contains(id)); 213 } 214 } 215 216 /// Number of cached circles 217 pub fn len(&self) -> usize { 218 self.by_id.len() 219 } 220 221 /// Is cache empty? 222 pub fn is_empty(&self) -> bool { 223 self.by_id.is_empty() 224 } 225 } 226 227 /// DHT operations returned to the caller (pure logic, no I/O) 228 #[derive(Debug, Clone)] 229 pub enum DhtOp { 230 /// Store a circle announcement at these keys 231 Store { 232 /// Primary key: circle ID 233 circle_key: [u8; 32], 234 /// Secondary keys: capability lookups 235 capability_keys: Vec<[u8; 32]>, 236 /// The payload to store 237 payload: Vec<u8>, 238 /// TTL for the DHT store 239 ttl: u64, 240 }, 241 /// Query the DHT for values at this key 242 Query { 243 key: [u8; 32], 244 }, 245 } 246 247 /// Plan circle publication (returns ops for the I/O layer to execute) 248 pub fn plan_publish(circle: &PublishedCircle) -> Result<DhtOp, String> { 249 let payload = circle.to_bytes()?; 250 251 // Generate capability keys so other agents can find us 252 let capability_keys: Vec<[u8; 32]> = circle.announcement.capabilities 253 .iter() 254 .map(|cap| { 255 let query = DiscoveryQuery::ByCapability(cap.clone()); 256 query.dht_key().unwrap() 257 }) 258 .collect(); 259 260 Ok(DhtOp::Store { 261 circle_key: circle.circle_id, 262 capability_keys, 263 payload, 264 ttl: circle.ttl, 265 }) 266 } 267 268 /// Plan a discovery query (returns ops for the I/O layer to execute) 269 pub fn plan_discovery(query: &DiscoveryQuery) -> Option<DhtOp> { 270 query.dht_key().map(|key| DhtOp::Query { key }) 271 } 272 273 #[cfg(test)] 274 mod tests { 275 use super::*; 276 277 fn now() -> u64 { 278 1700000000 279 } 280 281 fn make_announcement() -> CircleAnnouncement { 282 CircleAnnouncement { 283 circle_id: [0x01; 32], 284 name: "Test Circle".to_string(), 285 description: Some("A test circle".to_string()), 286 capabilities: vec![ 287 "inference:llama-7b".to_string(), 288 "custom:rust_dev".to_string(), 289 ], 290 member_count: 3, 291 available_count: 2, 292 open_membership: true, 293 timestamp: now(), 294 signature: vec![], 295 } 296 } 297 298 fn make_circle() -> PublishedCircle { 299 PublishedCircle::new([0x01; 32], make_announcement(), now()) 300 } 301 302 #[test] 303 fn test_serialization() { 304 let circle = make_circle(); 305 let bytes = circle.to_bytes().unwrap(); 306 307 assert!(bytes.starts_with(CIRCLE_ANNOUNCE_PREFIX)); 308 309 let decoded = PublishedCircle::from_bytes(&bytes).unwrap(); 310 assert_eq!(decoded.circle_id, circle.circle_id); 311 assert_eq!(decoded.announcement.name, "Test Circle"); 312 } 313 314 #[test] 315 fn test_expiry() { 316 let circle = make_circle(); 317 318 // Not expired at publication time 319 assert!(!circle.is_expired(now())); 320 321 // Not expired 30 minutes later 322 assert!(!circle.is_expired(now() + 1800)); 323 324 // Expired after TTL 325 assert!(circle.is_expired(now() + PublishedCircle::DEFAULT_TTL + 1)); 326 } 327 328 #[test] 329 fn test_cache_insert_and_query() { 330 let mut cache = CircleCache::new(); 331 let circle = make_circle(); 332 333 cache.insert(circle.clone()); 334 335 // Query by ID 336 let results = cache.query(&DiscoveryQuery::ByCircleId([0x01; 32]), now()); 337 assert_eq!(results.len(), 1); 338 339 // Query by capability (string-based) 340 let results = cache.query( 341 &DiscoveryQuery::ByCapability("inference:llama-7b".to_string()), 342 now(), 343 ); 344 assert_eq!(results.len(), 1); 345 346 // Query by custom capability 347 let results = cache.query( 348 &DiscoveryQuery::ByCapability("custom:rust_dev".to_string()), 349 now(), 350 ); 351 assert_eq!(results.len(), 1); 352 353 // Query unknown capability 354 let results = cache.query( 355 &DiscoveryQuery::ByCapability("unknown:foo".to_string()), 356 now(), 357 ); 358 assert_eq!(results.len(), 0); 359 } 360 361 #[test] 362 fn test_cache_gc() { 363 let mut cache = CircleCache::new(); 364 let circle = make_circle(); 365 366 cache.insert(circle.clone()); 367 assert_eq!(cache.len(), 1); 368 369 // GC before expiry - should keep circle 370 cache.gc(now() + 1000); 371 assert_eq!(cache.len(), 1); 372 373 // GC after expiry - should remove 374 cache.gc(now() + PublishedCircle::DEFAULT_TTL + 1); 375 assert_eq!(cache.len(), 0); 376 } 377 378 #[test] 379 fn test_discovery_query_keys() { 380 // Capability query generates deterministic key 381 let query1 = DiscoveryQuery::ByCapability("inference:llama".to_string()); 382 let query2 = DiscoveryQuery::ByCapability("inference:llama".to_string()); 383 assert_eq!(query1.dht_key(), query2.dht_key()); 384 385 // Different capabilities -> different keys 386 let query3 = DiscoveryQuery::ByCapability("storage:ipfs".to_string()); 387 assert_ne!(query1.dht_key(), query3.dht_key()); 388 389 // AllKnown has no DHT key (local only) 390 assert!(DiscoveryQuery::AllKnown.dht_key().is_none()); 391 } 392 393 #[test] 394 fn test_plan_publish() { 395 let circle = make_circle(); 396 let op = plan_publish(&circle).unwrap(); 397 398 match op { 399 DhtOp::Store { circle_key, capability_keys, payload, ttl } => { 400 assert_eq!(circle_key, [0x01; 32]); 401 assert_eq!(capability_keys.len(), 2); // inference + custom 402 assert!(!payload.is_empty()); 403 assert_eq!(ttl, PublishedCircle::DEFAULT_TTL); 404 } 405 _ => panic!("Expected Store op"), 406 } 407 } 408 409 #[test] 410 fn test_plan_discovery() { 411 let query = DiscoveryQuery::ByCapability("inference:llama".to_string()); 412 let op = plan_discovery(&query).unwrap(); 413 414 match op { 415 DhtOp::Query { key } => { 416 assert_eq!(key, query.dht_key().unwrap()); 417 } 418 _ => panic!("Expected Query op"), 419 } 420 } 421 }