/ abzu-inference / src / discovery.rs
discovery.rs
  1  //! Circle Discovery over DHT
  2  //!
  3  //! Bridges Agent Circles with the Abzu DHT for decentralized discovery.
  4  //! Circles publish their presence and capabilities to the mesh,
  5  //! allowing other agents to find collaborators.
  6  //!
  7  //! ## DHT Key Structure
  8  //!
  9  //! | Query Type | DHT Key |
 10  //! |------------|---------|
 11  //! | Find circles by capability | `BLAKE3("capability:" ∥ capability_name)` |
 12  //! | Find circle by ID | `BLAKE3("circle:" ∥ circle_id)` |
 13  //! | Find agent by pubkey | Agent's public key directly |
 14  
 15  use serde::{Deserialize, Serialize};
 16  use std::collections::HashMap;
 17  
 18  use crate::mesh::CircleAnnouncement;
 19  
 20  /// DHT value type for circle announcements
 21  /// Uses ValueType::Application (255) with this discriminator prefix
 22  pub const CIRCLE_ANNOUNCE_PREFIX: &[u8] = b"abzu.circle.v1:";
 23  
 24  /// Key prefixes for different discovery queries
 25  pub const CAPABILITY_PREFIX: &[u8] = b"capability:";
 26  pub const CIRCLE_PREFIX: &[u8] = b"circle:";
 27  
 28  /// A published circle announcement ready for DHT storage
 29  #[derive(Debug, Clone, Serialize, Deserialize)]
 30  pub struct PublishedCircle {
 31      /// Circle ID (BLAKE3 hash of founder + nonce)
 32      pub circle_id: [u8; 32],
 33      /// The announcement payload
 34      pub announcement: CircleAnnouncement,
 35      /// Signature over the announcement (by circle's signing key)
 36      pub signature: Vec<u8>,
 37      /// Timestamp of publication
 38      pub published_at: u64,
 39      /// TTL in seconds (circles should re-announce periodically)
 40      pub ttl: u64,
 41  }
 42  
 43  impl PublishedCircle {
 44      /// Default TTL: 1 hour (circles should heartbeat)
 45      pub const DEFAULT_TTL: u64 = 3600;
 46      
 47      /// Create a new published circle (signing happens at transport layer)
 48      pub fn new(circle_id: [u8; 32], announcement: CircleAnnouncement, now: u64) -> Self {
 49          Self {
 50              circle_id,
 51              announcement,
 52              signature: vec![0u8; 64], // Placeholder - signed at transport
 53              published_at: now,
 54              ttl: Self::DEFAULT_TTL,
 55          }
 56      }
 57      
 58      /// Serialize to bytes for DHT storage (using bincode)
 59      pub fn to_bytes(&self) -> Result<Vec<u8>, String> {
 60          let mut payload = CIRCLE_ANNOUNCE_PREFIX.to_vec();
 61          let data = bincode::serialize(&self).map_err(|e| e.to_string())?;
 62          payload.extend(data);
 63          Ok(payload)
 64      }
 65      
 66      /// Deserialize from DHT-stored bytes
 67      pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
 68          if !bytes.starts_with(CIRCLE_ANNOUNCE_PREFIX) {
 69              return Err("Invalid prefix for circle announcement".to_string());
 70          }
 71          let data = &bytes[CIRCLE_ANNOUNCE_PREFIX.len()..];
 72          bincode::deserialize(data).map_err(|e| e.to_string())
 73      }
 74      
 75      /// Check if this announcement has expired
 76      pub fn is_expired(&self, now: u64) -> bool {
 77          now > self.published_at + self.ttl
 78      }
 79  }
 80  
 81  /// Discovery query: what to search for in the DHT
 82  #[derive(Debug, Clone)]
 83  pub enum DiscoveryQuery {
 84      /// Find circles that offer a specific capability (as string, e.g. "inference:llama-7b")
 85      ByCapability(String),
 86      /// Find a specific circle by ID
 87      ByCircleId([u8; 32]),
 88      /// Find circles where a specific agent is a member
 89      ByAgentMember([u8; 32]),
 90      /// Find all circles (local cache only - no DHT broadcast)
 91      AllKnown,
 92  }
 93  
 94  impl DiscoveryQuery {
 95      /// Generate the DHT key for this query
 96      pub fn dht_key(&self) -> Option<[u8; 32]> {
 97          match self {
 98              DiscoveryQuery::ByCapability(cap) => {
 99                  let mut input = CAPABILITY_PREFIX.to_vec();
100                  input.extend(cap.as_bytes());
101                  let hash = blake3::hash(&input);
102                  Some(*hash.as_bytes())
103              }
104              DiscoveryQuery::ByCircleId(id) => {
105                  // Circle ID is already a hash, use it directly
106                  Some(*id)
107              }
108              DiscoveryQuery::ByAgentMember(pubkey) => {
109                  // Agent membership discovery uses agent's key
110                  Some(*pubkey)
111              }
112              DiscoveryQuery::AllKnown => None, // Local-only
113          }
114      }
115  }
116  
117  /// Result from a discovery query
118  #[derive(Debug, Clone)]
119  pub struct DiscoveryResult {
120      /// Found circles matching the query
121      pub circles: Vec<PublishedCircle>,
122      /// Query latency in milliseconds
123      pub query_ms: u64,
124      /// Whether results came from cache or network
125      pub from_cache: bool,
126  }
127  
128  /// Local cache for discovered circles
129  #[derive(Debug, Default)]
130  pub struct CircleCache {
131      /// Circles by ID
132      by_id: HashMap<[u8; 32], PublishedCircle>,
133      /// Index: capability -> circle IDs
134      by_capability: HashMap<String, Vec<[u8; 32]>>,
135  }
136  
137  impl CircleCache {
138      pub fn new() -> Self {
139          Self::default()
140      }
141      
142      /// Insert a discovered circle into cache
143      pub fn insert(&mut self, circle: PublishedCircle) {
144          let id = circle.circle_id;
145          
146          // Index by capabilities (CircleAnnouncement.capabilities is Vec<String>)
147          for cap in &circle.announcement.capabilities {
148              self.by_capability
149                  .entry(cap.clone())
150                  .or_default()
151                  .push(id);
152          }
153          
154          // Store the circle
155          self.by_id.insert(id, circle);
156      }
157      
158      /// Query the cache
159      pub fn query(&self, query: &DiscoveryQuery, now: u64) -> Vec<PublishedCircle> {
160          match query {
161              DiscoveryQuery::ByCapability(cap) => {
162                  self.by_capability
163                      .get(cap)
164                      .map(|ids| {
165                          ids.iter()
166                              .filter_map(|id| self.by_id.get(id))
167                              .filter(|c| !c.is_expired(now))
168                              .cloned()
169                              .collect()
170                      })
171                      .unwrap_or_default()
172              }
173              DiscoveryQuery::ByCircleId(id) => {
174                  self.by_id
175                      .get(id)
176                      .filter(|c| !c.is_expired(now))
177                      .cloned()
178                      .into_iter()
179                      .collect()
180              }
181              DiscoveryQuery::ByAgentMember(_pubkey) => {
182                  // CircleAnnouncement doesn't expose member list for privacy
183                  // This would require a separate index or DHT query
184                  vec![]
185              }
186              DiscoveryQuery::AllKnown => {
187                  self.by_id
188                      .values()
189                      .filter(|c| !c.is_expired(now))
190                      .cloned()
191                      .collect()
192              }
193          }
194      }
195      
196      /// Remove expired entries
197      pub fn gc(&mut self, now: u64) {
198          // Collect expired IDs
199          let expired: Vec<[u8; 32]> = self.by_id
200              .iter()
201              .filter(|(_, c)| c.is_expired(now))
202              .map(|(id, _)| *id)
203              .collect();
204          
205          // Remove from main storage
206          for id in &expired {
207              self.by_id.remove(id);
208          }
209          
210          // Clean up indices
211          for ids in self.by_capability.values_mut() {
212              ids.retain(|id| !expired.contains(id));
213          }
214      }
215      
216      /// Number of cached circles
217      pub fn len(&self) -> usize {
218          self.by_id.len()
219      }
220      
221      /// Is cache empty?
222      pub fn is_empty(&self) -> bool {
223          self.by_id.is_empty()
224      }
225  }
226  
227  /// DHT operations returned to the caller (pure logic, no I/O)
228  #[derive(Debug, Clone)]
229  pub enum DhtOp {
230      /// Store a circle announcement at these keys
231      Store {
232          /// Primary key: circle ID
233          circle_key: [u8; 32],
234          /// Secondary keys: capability lookups
235          capability_keys: Vec<[u8; 32]>,
236          /// The payload to store
237          payload: Vec<u8>,
238          /// TTL for the DHT store
239          ttl: u64,
240      },
241      /// Query the DHT for values at this key
242      Query {
243          key: [u8; 32],
244      },
245  }
246  
247  /// Plan circle publication (returns ops for the I/O layer to execute)
248  pub fn plan_publish(circle: &PublishedCircle) -> Result<DhtOp, String> {
249      let payload = circle.to_bytes()?;
250      
251      // Generate capability keys so other agents can find us
252      let capability_keys: Vec<[u8; 32]> = circle.announcement.capabilities
253          .iter()
254          .map(|cap| {
255              let query = DiscoveryQuery::ByCapability(cap.clone());
256              query.dht_key().unwrap()
257          })
258          .collect();
259      
260      Ok(DhtOp::Store {
261          circle_key: circle.circle_id,
262          capability_keys,
263          payload,
264          ttl: circle.ttl,
265      })
266  }
267  
268  /// Plan a discovery query (returns ops for the I/O layer to execute)
269  pub fn plan_discovery(query: &DiscoveryQuery) -> Option<DhtOp> {
270      query.dht_key().map(|key| DhtOp::Query { key })
271  }
272  
273  #[cfg(test)]
274  mod tests {
275      use super::*;
276      
277      fn now() -> u64 {
278          1700000000
279      }
280      
281      fn make_announcement() -> CircleAnnouncement {
282          CircleAnnouncement {
283              circle_id: [0x01; 32],
284              name: "Test Circle".to_string(),
285              description: Some("A test circle".to_string()),
286              capabilities: vec![
287                  "inference:llama-7b".to_string(),
288                  "custom:rust_dev".to_string(),
289              ],
290              member_count: 3,
291              available_count: 2,
292              open_membership: true,
293              timestamp: now(),
294              signature: vec![],
295          }
296      }
297      
298      fn make_circle() -> PublishedCircle {
299          PublishedCircle::new([0x01; 32], make_announcement(), now())
300      }
301      
302      #[test]
303      fn test_serialization() {
304          let circle = make_circle();
305          let bytes = circle.to_bytes().unwrap();
306          
307          assert!(bytes.starts_with(CIRCLE_ANNOUNCE_PREFIX));
308          
309          let decoded = PublishedCircle::from_bytes(&bytes).unwrap();
310          assert_eq!(decoded.circle_id, circle.circle_id);
311          assert_eq!(decoded.announcement.name, "Test Circle");
312      }
313      
314      #[test]
315      fn test_expiry() {
316          let circle = make_circle();
317          
318          // Not expired at publication time
319          assert!(!circle.is_expired(now()));
320          
321          // Not expired 30 minutes later
322          assert!(!circle.is_expired(now() + 1800));
323          
324          // Expired after TTL
325          assert!(circle.is_expired(now() + PublishedCircle::DEFAULT_TTL + 1));
326      }
327      
328      #[test]
329      fn test_cache_insert_and_query() {
330          let mut cache = CircleCache::new();
331          let circle = make_circle();
332          
333          cache.insert(circle.clone());
334          
335          // Query by ID
336          let results = cache.query(&DiscoveryQuery::ByCircleId([0x01; 32]), now());
337          assert_eq!(results.len(), 1);
338          
339          // Query by capability (string-based)
340          let results = cache.query(
341              &DiscoveryQuery::ByCapability("inference:llama-7b".to_string()),
342              now(),
343          );
344          assert_eq!(results.len(), 1);
345          
346          // Query by custom capability
347          let results = cache.query(
348              &DiscoveryQuery::ByCapability("custom:rust_dev".to_string()),
349              now(),
350          );
351          assert_eq!(results.len(), 1);
352          
353          // Query unknown capability
354          let results = cache.query(
355              &DiscoveryQuery::ByCapability("unknown:foo".to_string()),
356              now(),
357          );
358          assert_eq!(results.len(), 0);
359      }
360      
361      #[test]
362      fn test_cache_gc() {
363          let mut cache = CircleCache::new();
364          let circle = make_circle();
365          
366          cache.insert(circle.clone());
367          assert_eq!(cache.len(), 1);
368          
369          // GC before expiry - should keep circle
370          cache.gc(now() + 1000);
371          assert_eq!(cache.len(), 1);
372          
373          // GC after expiry - should remove
374          cache.gc(now() + PublishedCircle::DEFAULT_TTL + 1);
375          assert_eq!(cache.len(), 0);
376      }
377      
378      #[test]
379      fn test_discovery_query_keys() {
380          // Capability query generates deterministic key
381          let query1 = DiscoveryQuery::ByCapability("inference:llama".to_string());
382          let query2 = DiscoveryQuery::ByCapability("inference:llama".to_string());
383          assert_eq!(query1.dht_key(), query2.dht_key());
384          
385          // Different capabilities -> different keys
386          let query3 = DiscoveryQuery::ByCapability("storage:ipfs".to_string());
387          assert_ne!(query1.dht_key(), query3.dht_key());
388          
389          // AllKnown has no DHT key (local only)
390          assert!(DiscoveryQuery::AllKnown.dht_key().is_none());
391      }
392      
393      #[test]
394      fn test_plan_publish() {
395          let circle = make_circle();
396          let op = plan_publish(&circle).unwrap();
397          
398          match op {
399              DhtOp::Store { circle_key, capability_keys, payload, ttl } => {
400                  assert_eq!(circle_key, [0x01; 32]);
401                  assert_eq!(capability_keys.len(), 2); // inference + custom
402                  assert!(!payload.is_empty());
403                  assert_eq!(ttl, PublishedCircle::DEFAULT_TTL);
404              }
405              _ => panic!("Expected Store op"),
406          }
407      }
408      
409      #[test]
410      fn test_plan_discovery() {
411          let query = DiscoveryQuery::ByCapability("inference:llama".to_string());
412          let op = plan_discovery(&query).unwrap();
413          
414          match op {
415              DhtOp::Query { key } => {
416                  assert_eq!(key, query.dht_key().unwrap());
417              }
418              _ => panic!("Expected Query op"),
419          }
420      }
421  }