/ radicle / src / node / address / store.rs
store.rs
  1  use std::path::Path;
  2  use std::str::FromStr;
  3  use std::{fmt, io};
  4  
  5  use localtime::LocalTime;
  6  use sqlite as sql;
  7  use thiserror::Error;
  8  
  9  use crate::node;
 10  use crate::node::address::{KnownAddress, Source};
 11  use crate::node::{Address, Alias, AliasError, AliasStore, NodeId};
 12  use crate::prelude::Timestamp;
 13  use crate::sql::transaction;
 14  
 15  use super::types;
 16  use super::AddressType;
 17  
 18  #[derive(Error, Debug)]
 19  pub enum Error {
 20      /// I/O error.
 21      #[error("i/o error: {0}")]
 22      Io(#[from] io::Error),
 23      #[error("alias error: {0}")]
 24      InvalidAlias(#[from] AliasError),
 25      /// An Internal error.
 26      #[error("internal error: {0}")]
 27      Internal(#[from] sql::Error),
 28  }
 29  
 30  /// A file-backed address book.
 31  pub struct Book {
 32      db: sql::Connection,
 33  }
 34  
 35  impl fmt::Debug for Book {
 36      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 37          write!(f, "Book(..)")
 38      }
 39  }
 40  
 41  impl Book {
 42      const SCHEMA: &str = include_str!("schema.sql");
 43  
 44      /// Open an address book at the given path. Creates a new address book if it
 45      /// doesn't exist.
 46      pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
 47          let db = sql::Connection::open(path)?;
 48          db.execute(Self::SCHEMA)?;
 49  
 50          Ok(Self { db })
 51      }
 52  
 53      /// Same as [`Self::open`], but in read-only mode. This is useful to have multiple
 54      /// open databases, as no locking is required.
 55      pub fn reader<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
 56          let db = sql::Connection::open_with_flags(path, sqlite::OpenFlags::new().set_read_only())?;
 57          db.execute(Self::SCHEMA)?;
 58  
 59          Ok(Self { db })
 60      }
 61  
 62      /// Create a new in-memory address book.
 63      pub fn memory() -> Result<Self, Error> {
 64          let db = sql::Connection::open(":memory:")?;
 65          db.execute(Self::SCHEMA)?;
 66  
 67          Ok(Self { db })
 68      }
 69  }
 70  
 71  impl Store for Book {
 72      fn get(&self, node: &NodeId) -> Result<Option<types::Node>, Error> {
 73          let mut stmt = self
 74              .db
 75              .prepare("SELECT features, alias, pow, timestamp FROM nodes WHERE id = ?")?;
 76  
 77          stmt.bind((1, node))?;
 78  
 79          if let Some(Ok(row)) = stmt.into_iter().next() {
 80              let features = row.read::<node::Features, _>("features");
 81              let alias = Alias::from_str(row.read::<&str, _>("alias"))?;
 82              let timestamp = row.read::<i64, _>("timestamp") as Timestamp;
 83              let pow = row.read::<i64, _>("pow") as u32;
 84              let mut addrs = Vec::new();
 85  
 86              let mut stmt = self
 87                  .db
 88                  .prepare("SELECT type, value, source FROM addresses WHERE node = ?")?;
 89              stmt.bind((1, node))?;
 90  
 91              for row in stmt.into_iter() {
 92                  let row = row?;
 93                  let _typ = row.read::<AddressType, _>("type");
 94                  let addr = row.read::<Address, _>("value");
 95                  let source = row.read::<Source, _>("source");
 96  
 97                  addrs.push(KnownAddress {
 98                      addr,
 99                      source,
100                      last_success: None,
101                      last_attempt: None,
102                  });
103              }
104  
105              Ok(Some(types::Node {
106                  features,
107                  alias,
108                  pow,
109                  timestamp,
110                  addrs,
111              }))
112          } else {
113              Ok(None)
114          }
115      }
116  
117      fn len(&self) -> Result<usize, Error> {
118          let row = self
119              .db
120              .prepare("SELECT COUNT(*) FROM addresses")?
121              .into_iter()
122              .next()
123              .unwrap()
124              .unwrap();
125          let count = row.read::<i64, _>(0) as usize;
126  
127          Ok(count)
128      }
129  
130      fn insert(
131          &mut self,
132          node: &NodeId,
133          features: node::Features,
134          alias: Alias,
135          pow: u32,
136          timestamp: Timestamp,
137          addrs: impl IntoIterator<Item = KnownAddress>,
138      ) -> Result<bool, Error> {
139          transaction(&self.db, move |db| {
140              let mut stmt = db.prepare(
141                  "INSERT INTO nodes (id, features, alias, pow, timestamp)
142                   VALUES (?1, ?2, ?3, ?4, ?5)
143                   ON CONFLICT DO UPDATE
144                   SET features = ?2, alias = ?3, pow = ?4, timestamp = ?5
145                   WHERE timestamp < ?5",
146              )?;
147  
148              stmt.bind((1, node))?;
149              stmt.bind((2, features))?;
150              stmt.bind((3, sql::Value::String(alias.into())))?;
151              stmt.bind((4, pow as i64))?;
152              stmt.bind((5, timestamp as i64))?;
153              stmt.next()?;
154  
155              for addr in addrs {
156                  let mut stmt = db.prepare(
157                      "INSERT INTO addresses (node, type, value, source, timestamp)
158                       VALUES (?1, ?2, ?3, ?4, ?5)
159                       ON CONFLICT DO UPDATE
160                       SET timestamp = ?5
161                       WHERE timestamp < ?5",
162                  )?;
163                  stmt.bind((1, node))?;
164                  stmt.bind((2, AddressType::from(&addr.addr)))?;
165                  stmt.bind((3, &addr.addr))?;
166                  stmt.bind((4, addr.source))?;
167                  stmt.bind((5, timestamp as i64))?;
168                  stmt.next()?;
169              }
170              Ok(db.change_count() > 0)
171          })
172          .map_err(Error::from)
173      }
174  
175      fn remove(&mut self, node: &NodeId) -> Result<bool, Error> {
176          transaction(&self.db, move |db| {
177              db.prepare("DELETE FROM nodes WHERE id = ?")?
178                  .into_iter()
179                  .bind(&[node][..])?
180                  .next();
181  
182              db.prepare("DELETE FROM addresses WHERE node = ?")?
183                  .into_iter()
184                  .bind(&[node][..])?
185                  .next();
186  
187              Ok(db.change_count() > 0)
188          })
189          .map_err(Error::from)
190      }
191  
192      fn entries(&self) -> Result<Box<dyn Iterator<Item = (NodeId, KnownAddress)>>, Error> {
193          let mut stmt = self
194              .db
195              .prepare("SELECT node, type, value, source, last_success, last_attempt FROM addresses ORDER BY node")?
196              .into_iter();
197          let mut entries = Vec::new();
198  
199          while let Some(Ok(row)) = stmt.next() {
200              let node = row.read::<NodeId, _>("node");
201              let _typ = row.read::<AddressType, _>("type");
202              let addr = row.read::<Address, _>("value");
203              let source = row.read::<Source, _>("source");
204              let last_success = row.read::<Option<i64>, _>("last_success");
205              let last_attempt = row.read::<Option<i64>, _>("last_attempt");
206              let last_success = last_success.map(|t| LocalTime::from_millis(t as u128));
207              let last_attempt = last_attempt.map(|t| LocalTime::from_millis(t as u128));
208  
209              entries.push((
210                  node,
211                  KnownAddress {
212                      addr,
213                      source,
214                      last_success,
215                      last_attempt,
216                  },
217              ));
218          }
219          Ok(Box::new(entries.into_iter()))
220      }
221  
222      fn attempted(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error> {
223          let mut stmt = self.db.prepare(
224              "UPDATE `addresses`
225               SET last_attempt = ?1
226               WHERE node = ?2
227               AND type = ?3
228               AND value = ?4",
229          )?;
230  
231          stmt.bind((1, time as i64))?;
232          stmt.bind((2, nid))?;
233          stmt.bind((3, AddressType::from(addr)))?;
234          stmt.bind((4, addr))?;
235          stmt.next()?;
236  
237          Ok(())
238      }
239  
240      fn connected(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error> {
241          let mut stmt = self.db.prepare(
242              "UPDATE `addresses`
243               SET last_success = ?1
244               WHERE node = ?2
245               AND type = ?3
246               AND value = ?4",
247          )?;
248  
249          stmt.bind((1, time as i64))?;
250          stmt.bind((2, nid))?;
251          stmt.bind((3, AddressType::from(addr)))?;
252          stmt.bind((4, addr))?;
253          stmt.next()?;
254  
255          Ok(())
256      }
257  }
258  
259  impl AliasStore for Book {
260      /// Retrieve `alias` of given node.
261      /// Calls `Self::get` under the hood.
262      fn alias(&self, nid: &NodeId) -> Option<Alias> {
263          self.get(nid)
264              .map(|node| node.map(|n| n.alias))
265              .unwrap_or(None)
266      }
267  }
268  
269  /// Address store.
270  ///
271  /// Used to store node addresses and metadata.
272  pub trait Store {
273      /// Get a known peer address.
274      fn get(&self, id: &NodeId) -> Result<Option<types::Node>, Error>;
275      /// Insert a node with associated addresses into the store.
276      ///
277      /// Returns `true` if the node or addresses were updated, and `false` otherwise.
278      fn insert(
279          &mut self,
280          node: &NodeId,
281          features: node::Features,
282          alias: Alias,
283          pow: u32,
284          timestamp: Timestamp,
285          addrs: impl IntoIterator<Item = KnownAddress>,
286      ) -> Result<bool, Error>;
287      /// Remove an address from the store.
288      fn remove(&mut self, id: &NodeId) -> Result<bool, Error>;
289      /// Returns the number of addresses.
290      fn len(&self) -> Result<usize, Error>;
291      /// Returns true if there are no addresses.
292      fn is_empty(&self) -> Result<bool, Error> {
293          self.len().map(|l| l == 0)
294      }
295      /// Get the address entries in the store.
296      fn entries(&self) -> Result<Box<dyn Iterator<Item = (NodeId, KnownAddress)>>, Error>;
297      /// Mark a node as attempted at a certain time.
298      fn attempted(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error>;
299      /// Mark a node as successfully connected at a certain time.
300      fn connected(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error>;
301  }
302  
303  impl TryFrom<&sql::Value> for Source {
304      type Error = sql::Error;
305  
306      fn try_from(value: &sql::Value) -> Result<Self, Self::Error> {
307          let err = sql::Error {
308              code: None,
309              message: Some("sql: invalid source".to_owned()),
310          };
311          match value {
312              sql::Value::String(s) => match s.as_str() {
313                  "bootstrap" => Ok(Source::Bootstrap),
314                  "peer" => Ok(Source::Peer),
315                  "imported" => Ok(Source::Imported),
316                  _ => Err(err),
317              },
318              _ => Err(err),
319          }
320      }
321  }
322  
323  impl sql::BindableWithIndex for Source {
324      fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> {
325          match self {
326              Self::Bootstrap => "bootstrap".bind(stmt, i),
327              Self::Peer => "peer".bind(stmt, i),
328              Self::Imported => "imported".bind(stmt, i),
329          }
330      }
331  }
332  
333  impl TryFrom<&sql::Value> for AddressType {
334      type Error = sql::Error;
335  
336      fn try_from(value: &sql::Value) -> Result<Self, Self::Error> {
337          let err = sql::Error {
338              code: None,
339              message: Some("sql: invalid address type".to_owned()),
340          };
341          match value {
342              sql::Value::String(s) => match s.as_str() {
343                  "ipv4" => Ok(AddressType::Ipv4),
344                  "ipv6" => Ok(AddressType::Ipv6),
345                  "dns" => Ok(AddressType::Dns),
346                  "onion" => Ok(AddressType::Onion),
347                  _ => Err(err),
348              },
349              _ => Err(err),
350          }
351      }
352  }
353  
354  impl sql::BindableWithIndex for AddressType {
355      fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> {
356          match self {
357              Self::Ipv4 => "ipv4".bind(stmt, i),
358              Self::Ipv6 => "ipv6".bind(stmt, i),
359              Self::Dns => "dns".bind(stmt, i),
360              Self::Onion => "onion".bind(stmt, i),
361          }
362      }
363  }
364  
365  #[cfg(test)]
366  mod test {
367      use std::net;
368  
369      use super::*;
370      use crate::test::arbitrary;
371      use localtime::LocalTime;
372  
373      #[test]
374      fn test_empty() {
375          let tmp = tempfile::tempdir().unwrap();
376          let path = tmp.path().join("cache");
377          let cache = Book::open(path).unwrap();
378  
379          assert!(cache.is_empty().unwrap());
380      }
381  
382      #[test]
383      fn test_get_none() {
384          let alice = arbitrary::gen::<NodeId>(1);
385          let cache = Book::memory().unwrap();
386          let result = cache.get(&alice).unwrap();
387  
388          assert!(result.is_none());
389      }
390  
391      #[test]
392      fn test_remove_nothing() {
393          let alice = arbitrary::gen::<NodeId>(1);
394          let mut cache = Book::memory().unwrap();
395          let removed = cache.remove(&alice).unwrap();
396  
397          assert!(!removed);
398      }
399  
400      #[test]
401      fn test_alias() {
402          let alice = arbitrary::gen::<NodeId>(1);
403          let mut cache = Book::memory().unwrap();
404          let features = node::Features::SEED;
405          let timestamp = LocalTime::now().as_millis();
406  
407          cache
408              .insert(&alice, features, Alias::new("alice"), 16, timestamp, [])
409              .unwrap();
410          let node = cache.get(&alice).unwrap().unwrap();
411          assert_eq!(node.alias.as_ref(), "alice");
412  
413          cache
414              .insert(&alice, features, Alias::new("bob"), 16, timestamp + 1, [])
415              .unwrap();
416          let node = cache.get(&alice).unwrap().unwrap();
417          assert_eq!(node.alias.as_ref(), "bob");
418      }
419  
420      #[test]
421      fn test_insert_and_get() {
422          let alice = arbitrary::gen::<NodeId>(1);
423          let mut cache = Book::memory().unwrap();
424          let features = node::Features::SEED;
425          let timestamp = LocalTime::now().as_millis();
426  
427          let ka = KnownAddress {
428              addr: net::SocketAddr::from(([4, 4, 4, 4], 8776)).into(),
429              source: Source::Peer,
430              last_success: None,
431              last_attempt: None,
432          };
433          let inserted = cache
434              .insert(
435                  &alice,
436                  features,
437                  Alias::new("alice"),
438                  16,
439                  timestamp,
440                  [ka.clone()],
441              )
442              .unwrap();
443          assert!(inserted);
444  
445          let node = cache.get(&alice).unwrap().unwrap();
446  
447          assert_eq!(node.features, features);
448          assert_eq!(node.pow, 16);
449          assert_eq!(node.timestamp, timestamp);
450          assert_eq!(node.alias.as_ref(), "alice");
451          assert_eq!(node.addrs, vec![ka]);
452      }
453  
454      #[test]
455      fn test_insert_duplicate() {
456          let alice = arbitrary::gen::<NodeId>(1);
457          let mut cache = Book::memory().unwrap();
458          let features = node::Features::SEED;
459          let timestamp = LocalTime::now().as_millis();
460          let alias = Alias::new("alice");
461  
462          let ka = KnownAddress {
463              addr: net::SocketAddr::from(([4, 4, 4, 4], 8776)).into(),
464              source: Source::Peer,
465              last_success: None,
466              last_attempt: None,
467          };
468          let inserted = cache
469              .insert(&alice, features, alias.clone(), 0, timestamp, [ka.clone()])
470              .unwrap();
471          assert!(inserted);
472  
473          let inserted = cache
474              .insert(&alice, features, alias, 0, timestamp, [ka])
475              .unwrap();
476          assert!(!inserted);
477  
478          assert_eq!(cache.len().unwrap(), 1);
479      }
480  
481      #[test]
482      fn test_insert_and_update() {
483          let alice = arbitrary::gen::<NodeId>(1);
484          let mut cache = Book::memory().unwrap();
485          let timestamp = LocalTime::now().as_millis();
486          let features = node::Features::SEED;
487          let alias1 = Alias::new("alice");
488          let alias2 = Alias::new("~alice~");
489          let ka = KnownAddress {
490              addr: net::SocketAddr::from(([4, 4, 4, 4], 8776)).into(),
491              source: Source::Peer,
492              last_success: None,
493              last_attempt: None,
494          };
495  
496          let updated = cache
497              .insert(&alice, features, alias1, 0, timestamp, [ka.clone()])
498              .unwrap();
499          assert!(updated);
500  
501          let updated = cache
502              .insert(&alice, features, alias2.clone(), 0, timestamp, [])
503              .unwrap();
504          assert!(!updated, "Can't update using the same timestamp");
505  
506          let updated = cache
507              .insert(&alice, features, alias2.clone(), 0, timestamp - 1, [])
508              .unwrap();
509          assert!(!updated, "Can't update using a smaller timestamp");
510  
511          let node = cache.get(&alice).unwrap().unwrap();
512          assert_eq!(node.alias.as_ref(), "alice");
513          assert_eq!(node.timestamp, timestamp);
514          assert_eq!(node.pow, 0);
515  
516          let updated = cache
517              .insert(&alice, features, alias2.clone(), 0, timestamp + 1, [])
518              .unwrap();
519          assert!(updated, "Can update with a larger timestamp");
520  
521          let updated = cache
522              .insert(&alice, node::Features::NONE, alias2, 1, timestamp + 2, [])
523              .unwrap();
524          assert!(updated);
525  
526          let node = cache.get(&alice).unwrap().unwrap();
527          assert_eq!(node.features, node::Features::NONE);
528          assert_eq!(node.alias.as_ref(), "~alice~");
529          assert_eq!(node.timestamp, timestamp + 2);
530          assert_eq!(node.pow, 1);
531          assert_eq!(node.addrs, vec![ka]);
532      }
533  
534      #[test]
535      fn test_insert_and_remove() {
536          let alice = arbitrary::gen::<NodeId>(1);
537          let bob = arbitrary::gen::<NodeId>(1);
538          let mut cache = Book::memory().unwrap();
539          let timestamp = LocalTime::now().as_millis();
540          let features = node::Features::SEED;
541          let alice_alias = Alias::new("alice");
542          let bob_alias = Alias::new("bob");
543  
544          for addr in [
545              ([4, 4, 4, 4], 8776),
546              ([7, 7, 7, 7], 8776),
547              ([9, 9, 9, 9], 8776),
548          ] {
549              let ka = KnownAddress {
550                  addr: net::SocketAddr::from(addr).into(),
551                  source: Source::Peer,
552                  last_success: None,
553                  last_attempt: None,
554              };
555              cache
556                  .insert(
557                      &alice,
558                      features,
559                      alice_alias.clone(),
560                      0,
561                      timestamp,
562                      [ka.clone()],
563                  )
564                  .unwrap();
565              cache
566                  .insert(&bob, features, bob_alias.clone(), 0, timestamp, [ka])
567                  .unwrap();
568          }
569          assert_eq!(cache.len().unwrap(), 6);
570  
571          let removed = cache.remove(&alice).unwrap();
572          assert!(removed);
573          assert_eq!(cache.len().unwrap(), 3);
574  
575          let removed = cache.remove(&bob).unwrap();
576          assert!(removed);
577          assert_eq!(cache.len().unwrap(), 0);
578      }
579  
580      #[test]
581      fn test_entries() {
582          let ids = arbitrary::vec::<NodeId>(16);
583          let mut rng = fastrand::Rng::new();
584          let mut cache = Book::memory().unwrap();
585          let mut expected = Vec::new();
586          let timestamp = LocalTime::now().as_millis();
587          let features = node::Features::SEED;
588          let alias = Alias::new("alice");
589  
590          for id in ids {
591              let ip = rng.u32(..);
592              let addr = net::SocketAddr::from((net::Ipv4Addr::from(ip), rng.u16(..)));
593              let ka = KnownAddress {
594                  addr: addr.into(),
595                  source: Source::Bootstrap,
596                  // TODO: Test times as well.
597                  last_success: None,
598                  last_attempt: None,
599              };
600              expected.push((id, ka.clone()));
601              cache
602                  .insert(&id, features, alias.clone(), 0, timestamp, [ka])
603                  .unwrap();
604          }
605  
606          let mut actual = cache.entries().unwrap().collect::<Vec<_>>();
607  
608          actual.sort_by_key(|(i, _)| *i);
609          expected.sort_by_key(|(i, _)| *i);
610  
611          assert_eq!(cache.len().unwrap(), actual.len());
612          assert_eq!(actual, expected);
613      }
614  }