store.rs
1 use std::path::Path; 2 use std::str::FromStr; 3 use std::{fmt, io}; 4 5 use localtime::LocalTime; 6 use sqlite as sql; 7 use thiserror::Error; 8 9 use crate::node; 10 use crate::node::address::{KnownAddress, Source}; 11 use crate::node::{Address, Alias, AliasError, AliasStore, NodeId}; 12 use crate::prelude::Timestamp; 13 use crate::sql::transaction; 14 15 use super::types; 16 use super::AddressType; 17 18 #[derive(Error, Debug)] 19 pub enum Error { 20 /// I/O error. 21 #[error("i/o error: {0}")] 22 Io(#[from] io::Error), 23 #[error("alias error: {0}")] 24 InvalidAlias(#[from] AliasError), 25 /// An Internal error. 26 #[error("internal error: {0}")] 27 Internal(#[from] sql::Error), 28 } 29 30 /// A file-backed address book. 31 pub struct Book { 32 db: sql::Connection, 33 } 34 35 impl fmt::Debug for Book { 36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 37 write!(f, "Book(..)") 38 } 39 } 40 41 impl Book { 42 const SCHEMA: &str = include_str!("schema.sql"); 43 44 /// Open an address book at the given path. Creates a new address book if it 45 /// doesn't exist. 46 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> { 47 let db = sql::Connection::open(path)?; 48 db.execute(Self::SCHEMA)?; 49 50 Ok(Self { db }) 51 } 52 53 /// Same as [`Self::open`], but in read-only mode. This is useful to have multiple 54 /// open databases, as no locking is required. 55 pub fn reader<P: AsRef<Path>>(path: P) -> Result<Self, Error> { 56 let db = sql::Connection::open_with_flags(path, sqlite::OpenFlags::new().set_read_only())?; 57 db.execute(Self::SCHEMA)?; 58 59 Ok(Self { db }) 60 } 61 62 /// Create a new in-memory address book. 63 pub fn memory() -> Result<Self, Error> { 64 let db = sql::Connection::open(":memory:")?; 65 db.execute(Self::SCHEMA)?; 66 67 Ok(Self { db }) 68 } 69 } 70 71 impl Store for Book { 72 fn get(&self, node: &NodeId) -> Result<Option<types::Node>, Error> { 73 let mut stmt = self 74 .db 75 .prepare("SELECT features, alias, pow, timestamp FROM nodes WHERE id = ?")?; 76 77 stmt.bind((1, node))?; 78 79 if let Some(Ok(row)) = stmt.into_iter().next() { 80 let features = row.read::<node::Features, _>("features"); 81 let alias = Alias::from_str(row.read::<&str, _>("alias"))?; 82 let timestamp = row.read::<i64, _>("timestamp") as Timestamp; 83 let pow = row.read::<i64, _>("pow") as u32; 84 let mut addrs = Vec::new(); 85 86 let mut stmt = self 87 .db 88 .prepare("SELECT type, value, source FROM addresses WHERE node = ?")?; 89 stmt.bind((1, node))?; 90 91 for row in stmt.into_iter() { 92 let row = row?; 93 let _typ = row.read::<AddressType, _>("type"); 94 let addr = row.read::<Address, _>("value"); 95 let source = row.read::<Source, _>("source"); 96 97 addrs.push(KnownAddress { 98 addr, 99 source, 100 last_success: None, 101 last_attempt: None, 102 }); 103 } 104 105 Ok(Some(types::Node { 106 features, 107 alias, 108 pow, 109 timestamp, 110 addrs, 111 })) 112 } else { 113 Ok(None) 114 } 115 } 116 117 fn len(&self) -> Result<usize, Error> { 118 let row = self 119 .db 120 .prepare("SELECT COUNT(*) FROM addresses")? 121 .into_iter() 122 .next() 123 .unwrap() 124 .unwrap(); 125 let count = row.read::<i64, _>(0) as usize; 126 127 Ok(count) 128 } 129 130 fn insert( 131 &mut self, 132 node: &NodeId, 133 features: node::Features, 134 alias: Alias, 135 pow: u32, 136 timestamp: Timestamp, 137 addrs: impl IntoIterator<Item = KnownAddress>, 138 ) -> Result<bool, Error> { 139 transaction(&self.db, move |db| { 140 let mut stmt = db.prepare( 141 "INSERT INTO nodes (id, features, alias, pow, timestamp) 142 VALUES (?1, ?2, ?3, ?4, ?5) 143 ON CONFLICT DO UPDATE 144 SET features = ?2, alias = ?3, pow = ?4, timestamp = ?5 145 WHERE timestamp < ?5", 146 )?; 147 148 stmt.bind((1, node))?; 149 stmt.bind((2, features))?; 150 stmt.bind((3, sql::Value::String(alias.into())))?; 151 stmt.bind((4, pow as i64))?; 152 stmt.bind((5, timestamp as i64))?; 153 stmt.next()?; 154 155 for addr in addrs { 156 let mut stmt = db.prepare( 157 "INSERT INTO addresses (node, type, value, source, timestamp) 158 VALUES (?1, ?2, ?3, ?4, ?5) 159 ON CONFLICT DO UPDATE 160 SET timestamp = ?5 161 WHERE timestamp < ?5", 162 )?; 163 stmt.bind((1, node))?; 164 stmt.bind((2, AddressType::from(&addr.addr)))?; 165 stmt.bind((3, &addr.addr))?; 166 stmt.bind((4, addr.source))?; 167 stmt.bind((5, timestamp as i64))?; 168 stmt.next()?; 169 } 170 Ok(db.change_count() > 0) 171 }) 172 .map_err(Error::from) 173 } 174 175 fn remove(&mut self, node: &NodeId) -> Result<bool, Error> { 176 transaction(&self.db, move |db| { 177 db.prepare("DELETE FROM nodes WHERE id = ?")? 178 .into_iter() 179 .bind(&[node][..])? 180 .next(); 181 182 db.prepare("DELETE FROM addresses WHERE node = ?")? 183 .into_iter() 184 .bind(&[node][..])? 185 .next(); 186 187 Ok(db.change_count() > 0) 188 }) 189 .map_err(Error::from) 190 } 191 192 fn entries(&self) -> Result<Box<dyn Iterator<Item = (NodeId, KnownAddress)>>, Error> { 193 let mut stmt = self 194 .db 195 .prepare("SELECT node, type, value, source, last_success, last_attempt FROM addresses ORDER BY node")? 196 .into_iter(); 197 let mut entries = Vec::new(); 198 199 while let Some(Ok(row)) = stmt.next() { 200 let node = row.read::<NodeId, _>("node"); 201 let _typ = row.read::<AddressType, _>("type"); 202 let addr = row.read::<Address, _>("value"); 203 let source = row.read::<Source, _>("source"); 204 let last_success = row.read::<Option<i64>, _>("last_success"); 205 let last_attempt = row.read::<Option<i64>, _>("last_attempt"); 206 let last_success = last_success.map(|t| LocalTime::from_millis(t as u128)); 207 let last_attempt = last_attempt.map(|t| LocalTime::from_millis(t as u128)); 208 209 entries.push(( 210 node, 211 KnownAddress { 212 addr, 213 source, 214 last_success, 215 last_attempt, 216 }, 217 )); 218 } 219 Ok(Box::new(entries.into_iter())) 220 } 221 222 fn attempted(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error> { 223 let mut stmt = self.db.prepare( 224 "UPDATE `addresses` 225 SET last_attempt = ?1 226 WHERE node = ?2 227 AND type = ?3 228 AND value = ?4", 229 )?; 230 231 stmt.bind((1, time as i64))?; 232 stmt.bind((2, nid))?; 233 stmt.bind((3, AddressType::from(addr)))?; 234 stmt.bind((4, addr))?; 235 stmt.next()?; 236 237 Ok(()) 238 } 239 240 fn connected(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error> { 241 let mut stmt = self.db.prepare( 242 "UPDATE `addresses` 243 SET last_success = ?1 244 WHERE node = ?2 245 AND type = ?3 246 AND value = ?4", 247 )?; 248 249 stmt.bind((1, time as i64))?; 250 stmt.bind((2, nid))?; 251 stmt.bind((3, AddressType::from(addr)))?; 252 stmt.bind((4, addr))?; 253 stmt.next()?; 254 255 Ok(()) 256 } 257 } 258 259 impl AliasStore for Book { 260 /// Retrieve `alias` of given node. 261 /// Calls `Self::get` under the hood. 262 fn alias(&self, nid: &NodeId) -> Option<Alias> { 263 self.get(nid) 264 .map(|node| node.map(|n| n.alias)) 265 .unwrap_or(None) 266 } 267 } 268 269 /// Address store. 270 /// 271 /// Used to store node addresses and metadata. 272 pub trait Store { 273 /// Get a known peer address. 274 fn get(&self, id: &NodeId) -> Result<Option<types::Node>, Error>; 275 /// Insert a node with associated addresses into the store. 276 /// 277 /// Returns `true` if the node or addresses were updated, and `false` otherwise. 278 fn insert( 279 &mut self, 280 node: &NodeId, 281 features: node::Features, 282 alias: Alias, 283 pow: u32, 284 timestamp: Timestamp, 285 addrs: impl IntoIterator<Item = KnownAddress>, 286 ) -> Result<bool, Error>; 287 /// Remove an address from the store. 288 fn remove(&mut self, id: &NodeId) -> Result<bool, Error>; 289 /// Returns the number of addresses. 290 fn len(&self) -> Result<usize, Error>; 291 /// Returns true if there are no addresses. 292 fn is_empty(&self) -> Result<bool, Error> { 293 self.len().map(|l| l == 0) 294 } 295 /// Get the address entries in the store. 296 fn entries(&self) -> Result<Box<dyn Iterator<Item = (NodeId, KnownAddress)>>, Error>; 297 /// Mark a node as attempted at a certain time. 298 fn attempted(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error>; 299 /// Mark a node as successfully connected at a certain time. 300 fn connected(&self, nid: &NodeId, addr: &Address, time: Timestamp) -> Result<(), Error>; 301 } 302 303 impl TryFrom<&sql::Value> for Source { 304 type Error = sql::Error; 305 306 fn try_from(value: &sql::Value) -> Result<Self, Self::Error> { 307 let err = sql::Error { 308 code: None, 309 message: Some("sql: invalid source".to_owned()), 310 }; 311 match value { 312 sql::Value::String(s) => match s.as_str() { 313 "bootstrap" => Ok(Source::Bootstrap), 314 "peer" => Ok(Source::Peer), 315 "imported" => Ok(Source::Imported), 316 _ => Err(err), 317 }, 318 _ => Err(err), 319 } 320 } 321 } 322 323 impl sql::BindableWithIndex for Source { 324 fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> { 325 match self { 326 Self::Bootstrap => "bootstrap".bind(stmt, i), 327 Self::Peer => "peer".bind(stmt, i), 328 Self::Imported => "imported".bind(stmt, i), 329 } 330 } 331 } 332 333 impl TryFrom<&sql::Value> for AddressType { 334 type Error = sql::Error; 335 336 fn try_from(value: &sql::Value) -> Result<Self, Self::Error> { 337 let err = sql::Error { 338 code: None, 339 message: Some("sql: invalid address type".to_owned()), 340 }; 341 match value { 342 sql::Value::String(s) => match s.as_str() { 343 "ipv4" => Ok(AddressType::Ipv4), 344 "ipv6" => Ok(AddressType::Ipv6), 345 "dns" => Ok(AddressType::Dns), 346 "onion" => Ok(AddressType::Onion), 347 _ => Err(err), 348 }, 349 _ => Err(err), 350 } 351 } 352 } 353 354 impl sql::BindableWithIndex for AddressType { 355 fn bind<I: sql::ParameterIndex>(self, stmt: &mut sql::Statement<'_>, i: I) -> sql::Result<()> { 356 match self { 357 Self::Ipv4 => "ipv4".bind(stmt, i), 358 Self::Ipv6 => "ipv6".bind(stmt, i), 359 Self::Dns => "dns".bind(stmt, i), 360 Self::Onion => "onion".bind(stmt, i), 361 } 362 } 363 } 364 365 #[cfg(test)] 366 mod test { 367 use std::net; 368 369 use super::*; 370 use crate::test::arbitrary; 371 use localtime::LocalTime; 372 373 #[test] 374 fn test_empty() { 375 let tmp = tempfile::tempdir().unwrap(); 376 let path = tmp.path().join("cache"); 377 let cache = Book::open(path).unwrap(); 378 379 assert!(cache.is_empty().unwrap()); 380 } 381 382 #[test] 383 fn test_get_none() { 384 let alice = arbitrary::gen::<NodeId>(1); 385 let cache = Book::memory().unwrap(); 386 let result = cache.get(&alice).unwrap(); 387 388 assert!(result.is_none()); 389 } 390 391 #[test] 392 fn test_remove_nothing() { 393 let alice = arbitrary::gen::<NodeId>(1); 394 let mut cache = Book::memory().unwrap(); 395 let removed = cache.remove(&alice).unwrap(); 396 397 assert!(!removed); 398 } 399 400 #[test] 401 fn test_alias() { 402 let alice = arbitrary::gen::<NodeId>(1); 403 let mut cache = Book::memory().unwrap(); 404 let features = node::Features::SEED; 405 let timestamp = LocalTime::now().as_millis(); 406 407 cache 408 .insert(&alice, features, Alias::new("alice"), 16, timestamp, []) 409 .unwrap(); 410 let node = cache.get(&alice).unwrap().unwrap(); 411 assert_eq!(node.alias.as_ref(), "alice"); 412 413 cache 414 .insert(&alice, features, Alias::new("bob"), 16, timestamp + 1, []) 415 .unwrap(); 416 let node = cache.get(&alice).unwrap().unwrap(); 417 assert_eq!(node.alias.as_ref(), "bob"); 418 } 419 420 #[test] 421 fn test_insert_and_get() { 422 let alice = arbitrary::gen::<NodeId>(1); 423 let mut cache = Book::memory().unwrap(); 424 let features = node::Features::SEED; 425 let timestamp = LocalTime::now().as_millis(); 426 427 let ka = KnownAddress { 428 addr: net::SocketAddr::from(([4, 4, 4, 4], 8776)).into(), 429 source: Source::Peer, 430 last_success: None, 431 last_attempt: None, 432 }; 433 let inserted = cache 434 .insert( 435 &alice, 436 features, 437 Alias::new("alice"), 438 16, 439 timestamp, 440 [ka.clone()], 441 ) 442 .unwrap(); 443 assert!(inserted); 444 445 let node = cache.get(&alice).unwrap().unwrap(); 446 447 assert_eq!(node.features, features); 448 assert_eq!(node.pow, 16); 449 assert_eq!(node.timestamp, timestamp); 450 assert_eq!(node.alias.as_ref(), "alice"); 451 assert_eq!(node.addrs, vec![ka]); 452 } 453 454 #[test] 455 fn test_insert_duplicate() { 456 let alice = arbitrary::gen::<NodeId>(1); 457 let mut cache = Book::memory().unwrap(); 458 let features = node::Features::SEED; 459 let timestamp = LocalTime::now().as_millis(); 460 let alias = Alias::new("alice"); 461 462 let ka = KnownAddress { 463 addr: net::SocketAddr::from(([4, 4, 4, 4], 8776)).into(), 464 source: Source::Peer, 465 last_success: None, 466 last_attempt: None, 467 }; 468 let inserted = cache 469 .insert(&alice, features, alias.clone(), 0, timestamp, [ka.clone()]) 470 .unwrap(); 471 assert!(inserted); 472 473 let inserted = cache 474 .insert(&alice, features, alias, 0, timestamp, [ka]) 475 .unwrap(); 476 assert!(!inserted); 477 478 assert_eq!(cache.len().unwrap(), 1); 479 } 480 481 #[test] 482 fn test_insert_and_update() { 483 let alice = arbitrary::gen::<NodeId>(1); 484 let mut cache = Book::memory().unwrap(); 485 let timestamp = LocalTime::now().as_millis(); 486 let features = node::Features::SEED; 487 let alias1 = Alias::new("alice"); 488 let alias2 = Alias::new("~alice~"); 489 let ka = KnownAddress { 490 addr: net::SocketAddr::from(([4, 4, 4, 4], 8776)).into(), 491 source: Source::Peer, 492 last_success: None, 493 last_attempt: None, 494 }; 495 496 let updated = cache 497 .insert(&alice, features, alias1, 0, timestamp, [ka.clone()]) 498 .unwrap(); 499 assert!(updated); 500 501 let updated = cache 502 .insert(&alice, features, alias2.clone(), 0, timestamp, []) 503 .unwrap(); 504 assert!(!updated, "Can't update using the same timestamp"); 505 506 let updated = cache 507 .insert(&alice, features, alias2.clone(), 0, timestamp - 1, []) 508 .unwrap(); 509 assert!(!updated, "Can't update using a smaller timestamp"); 510 511 let node = cache.get(&alice).unwrap().unwrap(); 512 assert_eq!(node.alias.as_ref(), "alice"); 513 assert_eq!(node.timestamp, timestamp); 514 assert_eq!(node.pow, 0); 515 516 let updated = cache 517 .insert(&alice, features, alias2.clone(), 0, timestamp + 1, []) 518 .unwrap(); 519 assert!(updated, "Can update with a larger timestamp"); 520 521 let updated = cache 522 .insert(&alice, node::Features::NONE, alias2, 1, timestamp + 2, []) 523 .unwrap(); 524 assert!(updated); 525 526 let node = cache.get(&alice).unwrap().unwrap(); 527 assert_eq!(node.features, node::Features::NONE); 528 assert_eq!(node.alias.as_ref(), "~alice~"); 529 assert_eq!(node.timestamp, timestamp + 2); 530 assert_eq!(node.pow, 1); 531 assert_eq!(node.addrs, vec![ka]); 532 } 533 534 #[test] 535 fn test_insert_and_remove() { 536 let alice = arbitrary::gen::<NodeId>(1); 537 let bob = arbitrary::gen::<NodeId>(1); 538 let mut cache = Book::memory().unwrap(); 539 let timestamp = LocalTime::now().as_millis(); 540 let features = node::Features::SEED; 541 let alice_alias = Alias::new("alice"); 542 let bob_alias = Alias::new("bob"); 543 544 for addr in [ 545 ([4, 4, 4, 4], 8776), 546 ([7, 7, 7, 7], 8776), 547 ([9, 9, 9, 9], 8776), 548 ] { 549 let ka = KnownAddress { 550 addr: net::SocketAddr::from(addr).into(), 551 source: Source::Peer, 552 last_success: None, 553 last_attempt: None, 554 }; 555 cache 556 .insert( 557 &alice, 558 features, 559 alice_alias.clone(), 560 0, 561 timestamp, 562 [ka.clone()], 563 ) 564 .unwrap(); 565 cache 566 .insert(&bob, features, bob_alias.clone(), 0, timestamp, [ka]) 567 .unwrap(); 568 } 569 assert_eq!(cache.len().unwrap(), 6); 570 571 let removed = cache.remove(&alice).unwrap(); 572 assert!(removed); 573 assert_eq!(cache.len().unwrap(), 3); 574 575 let removed = cache.remove(&bob).unwrap(); 576 assert!(removed); 577 assert_eq!(cache.len().unwrap(), 0); 578 } 579 580 #[test] 581 fn test_entries() { 582 let ids = arbitrary::vec::<NodeId>(16); 583 let mut rng = fastrand::Rng::new(); 584 let mut cache = Book::memory().unwrap(); 585 let mut expected = Vec::new(); 586 let timestamp = LocalTime::now().as_millis(); 587 let features = node::Features::SEED; 588 let alias = Alias::new("alice"); 589 590 for id in ids { 591 let ip = rng.u32(..); 592 let addr = net::SocketAddr::from((net::Ipv4Addr::from(ip), rng.u16(..))); 593 let ka = KnownAddress { 594 addr: addr.into(), 595 source: Source::Bootstrap, 596 // TODO: Test times as well. 597 last_success: None, 598 last_attempt: None, 599 }; 600 expected.push((id, ka.clone())); 601 cache 602 .insert(&id, features, alias.clone(), 0, timestamp, [ka]) 603 .unwrap(); 604 } 605 606 let mut actual = cache.entries().unwrap().collect::<Vec<_>>(); 607 608 actual.sort_by_key(|(i, _)| *i); 609 expected.sort_by_key(|(i, _)| *i); 610 611 assert_eq!(cache.len().unwrap(), actual.len()); 612 assert_eq!(actual, expected); 613 } 614 }