mod.rs
1 use diesel::BelongingToDsl; 2 use diesel::ExpressionMethods; 3 use diesel::QueryDsl; 4 use diesel::RunQueryDsl; 5 use diesel::SelectableHelper; 6 use diesel::SqliteConnection; 7 use diesel_migrations::MigrationHarness; 8 use tracing::Level; 9 10 use crate::database::error::BaseDatabaseError; 11 use crate::error::Error; 12 13 mod content; 14 pub(crate) mod error; 15 mod node; 16 mod payload; 17 18 pub use content::ContentPersistenceError; 19 pub use node::NodePersistenceError; 20 pub use payload::PayloadPersistenceError; 21 22 const MIGRATIONS: diesel_migrations::EmbeddedMigrations = 23 diesel_migrations::embed_migrations!("./migrations"); 24 25 #[derive(Clone)] 26 pub struct Database { 27 conn: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<SqliteConnection>>, 28 span: tracing::Span, 29 } 30 31 impl Database { 32 pub fn load(path: camino::Utf8PathBuf) -> Result<Self, Error> { 33 let conn = diesel::r2d2::Pool::builder() 34 .test_on_check_out(true) 35 .build(diesel::r2d2::ConnectionManager::new(path))?; 36 37 Self::new(conn) 38 } 39 40 pub fn inmemory() -> Result<Self, Error> { 41 let conn = diesel::r2d2::Pool::builder() 42 .test_on_check_out(true) 43 .build(diesel::r2d2::ConnectionManager::new(":memory:"))?; 44 45 Self::new(conn) 46 } 47 48 fn new( 49 pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<SqliteConnection>>, 50 ) -> Result<Self, Error> { 51 // TODO: Have a look at WAL journaling mode 52 // 53 // https://sqlite.org/wal.html 54 // https://stackoverflow.com/a/57717533 55 // 56 // and also: 57 // 58 // https://www.sqlite.org/pragma.html#pragma_synchronous 59 // 60 let pragmas = [ 61 "PRAGMA journal_mode = WAL", 62 "PRAGMA locking_mode = EXCLUSIVE", 63 "PRAGMA synchronous = EXTRA", 64 "PRAGMA auto_vacuum = FULL", 65 ]; 66 67 let mut conn = pool.get().map_err(BaseDatabaseError::GettingConnection)?; 68 69 for pragma in pragmas { 70 diesel::sql_query(pragma) 71 .execute(&mut conn) 72 .map_err(Error::Pragma)?; 73 } 74 75 let pending_migrations = conn 76 .pending_migrations(MIGRATIONS) 77 .map_err(Error::Migration)?; 78 79 tracing::info!( 80 "There are {} migrations pending, applying them now", 81 pending_migrations.len() 82 ); 83 84 for migration in pending_migrations { 85 tracing::debug!("Applying migration: {}", migration.name()); 86 87 conn.run_migration(&migration).map_err(Error::Migration)?; 88 } 89 90 drop(conn); 91 92 Ok(Self { 93 conn: pool, 94 span: tracing::error_span!("db-sqlite"), 95 }) 96 } 97 98 /// Return the latest node, with its ID 99 #[tracing::instrument( 100 name = "latest_node", 101 follows_from = [tracing::Span::current()], 102 level = Level::ERROR, 103 parent = &self.span, 104 skip(self), 105 ret(level = Level::DEBUG), 106 err(level = Level::WARN) 107 )] 108 pub async fn latest_node( 109 &self, 110 ) -> Result< 111 Option<( 112 distrox_model::node::NodeId, 113 distrox_model::node::Node, 114 nonempty::NonEmpty<distrox_model::node::Signature>, 115 )>, 116 NodePersistenceError, 117 > { 118 tracing::debug!("Fetching latest node from database"); 119 let (database_node, signatures) = { 120 use crate::schema::nodes::dsl::*; 121 122 let mut conn = self 123 .conn 124 .get() 125 .map_err(BaseDatabaseError::GettingConnection)?; 126 127 tracing::debug!("Fetching latest node id"); 128 let node = match nodes 129 .order_by(node_id.desc()) 130 .select(crate::models::node::Node::as_select()) 131 .first(&mut conn) 132 { 133 Err(diesel::NotFound) => return Ok(None), 134 Err(other) => return Err(BaseDatabaseError::Query(other).into()), 135 Ok(node) => node, 136 }; 137 138 // TODO: Optimize this into the query above 139 let signatures = match crate::models::signature::Signature::belonging_to(&node) 140 .select(crate::models::signature::Signature::as_select()) 141 .load(&mut conn) 142 { 143 Err(diesel::NotFound) => return Ok(None), 144 Err(other) => return Err(BaseDatabaseError::Query(other).into()), 145 Ok(sigs) => sigs, 146 }; 147 148 let Some(signatures) = nonempty::NonEmpty::from_vec(signatures) else { 149 todo!() 150 }; 151 152 (node, signatures) 153 }; 154 tracing::debug!("Fetching latest node from database succeeded"); 155 156 let node_id = distrox_model::node::NodeId::from_bytes(&database_node.node_id) 157 .map_err(BaseDatabaseError::from)?; 158 159 match self.build_model_node(database_node, signatures).await? { 160 Some((node, sigs)) => Ok(Some((node_id, node, sigs))), 161 None => Ok(None), 162 } 163 } 164 165 async fn build_model_node( 166 &self, 167 database_node: crate::models::node::Node, 168 database_signatures: nonempty::NonEmpty<crate::models::signature::Signature>, 169 ) -> Result< 170 Option<( 171 distrox_model::node::Node, 172 nonempty::NonEmpty<distrox_model::node::Signature>, 173 )>, 174 BaseDatabaseError, 175 > { 176 let span = tracing::Span::current(); 177 span.record( 178 "node_id", 179 tracing::field::debug(distrox_model::node::NodeId::from_bytes( 180 &database_node.node_id, 181 )), 182 ); 183 span.record("content_id", tracing::field::Empty); 184 185 let parents = { 186 use crate::schema::node_parents::dsl::*; 187 188 let mut conn = self 189 .conn 190 .get() 191 .map_err(BaseDatabaseError::GettingConnection)?; 192 193 tracing::debug!(parent: &span, "Fetching parents for node"); 194 match node_parents 195 .filter(node_id.eq(&database_node.node_id)) 196 .select(crate::models::node::NodeParents::as_select()) 197 .load(&mut conn) 198 { 199 Err(diesel::NotFound) => Vec::new(), 200 Err(other) => return Err(BaseDatabaseError::Query(other)), 201 Ok(ps) => ps 202 .into_iter() 203 .map(|p| { 204 tracing::debug!( 205 parent: &span, 206 node_id = tracing::field::debug(distrox_model::node::NodeId::from_bytes(&p.node_id)), 207 parent = tracing::field::debug(distrox_model::node::NodeId::from_bytes(&p.parent_node_id)), 208 "Found parent for node" 209 ); 210 distrox_model::node::NodeId::from_bytes(&p.parent_node_id) 211 }) 212 .collect::<Result<Vec<_>, _>>()?, 213 } 214 }; 215 216 let content_id = database_node 217 .content_id 218 .as_ref() 219 .map(|cid| { 220 span.record("content_id", tracing::field::debug(cid)); 221 tracing::debug!(parent: &span, "Found ContentId for Node"); 222 distrox_model::content::ContentId::from_bytes(cid) 223 }) 224 .transpose()?; 225 226 tracing::debug!(parent: &span, "Successfully loaded node"); 227 let signatures = 228 database_signatures.map(|sig| distrox_model::node::Signature::from(sig.signature)); 229 230 let node = distrox_model::node::Node::from_raw_parts( 231 database_node.version as u64, 232 parents, 233 content_id, 234 ); 235 236 Ok(Some((node, signatures))) 237 } 238 239 pub async fn execute_raw_sql(&self, sql: String) -> Result<usize, Error> { 240 let mut conn = self 241 .conn 242 .get() 243 .map_err(BaseDatabaseError::GettingConnection)?; 244 let rows_affected = diesel::sql_query(sql).execute(&mut conn).map_err(|error| { 245 tracing::error!(?error, "SQL execution errored"); 246 Error::RawSqlError(error.to_string()) 247 })?; 248 249 tracing::info!(?rows_affected, "Execution of SQL succeeded"); 250 Ok(rows_affected) 251 } 252 } 253 254 #[cfg(test)] 255 mod tests { 256 use distrox_api::node::AddNodeRequest; 257 use distrox_model::node::Node; 258 use distrox_model::node::NodeId; 259 use distrox_model::node::Signature; 260 use multihash_codetable::MultihashDigest; 261 use tower::Service; 262 use tower::ServiceExt; 263 264 #[tokio::test] 265 async fn test_getting_latest_node() { 266 let mut database = super::Database::inmemory().unwrap(); 267 let code = multihash_codetable::Code::Blake3_256; 268 269 let latest_node_id = { 270 let latest_node_id = NodeId::from_bytes(&code.digest(&[1]).to_bytes()).unwrap(); 271 let node = Node::from_raw_parts(1, vec![], None); 272 let sig = nonempty::NonEmpty::singleton(Signature::from(vec![])); 273 274 ServiceExt::<AddNodeRequest>::ready(&mut database) 275 .await 276 .unwrap() 277 .call(AddNodeRequest { 278 id: latest_node_id, 279 node, 280 signature: sig, 281 }) 282 .await 283 .unwrap(); 284 285 latest_node_id 286 }; 287 288 let latest = database.latest_node().await.unwrap().unwrap(); 289 assert_eq!(latest.0, latest_node_id); 290 291 let latest_node_id = { 292 let latest_node_id = NodeId::from_bytes(&code.digest(&[2]).to_bytes()).unwrap(); 293 let node = Node::from_raw_parts(1, vec![], None); 294 let sig = nonempty::NonEmpty::singleton(Signature::from(vec![])); 295 296 ServiceExt::<AddNodeRequest>::ready(&mut database) 297 .await 298 .unwrap() 299 .call(AddNodeRequest { 300 id: latest_node_id, 301 node, 302 signature: sig, 303 }) 304 .await 305 .unwrap(); 306 latest_node_id 307 }; 308 309 let latest = database.latest_node().await.unwrap().unwrap(); 310 assert_eq!(latest.0, latest_node_id); 311 312 let latest_node_id = { 313 let latest_node_id = NodeId::from_bytes(&code.digest(&[3]).to_bytes()).unwrap(); 314 let node = Node::from_raw_parts(1, vec![], None); 315 let sig = nonempty::NonEmpty::singleton(Signature::from(vec![])); 316 317 ServiceExt::<AddNodeRequest>::ready(&mut database) 318 .await 319 .unwrap() 320 .call(AddNodeRequest { 321 id: latest_node_id, 322 node, 323 signature: sig, 324 }) 325 .await 326 .unwrap(); 327 latest_node_id 328 }; 329 330 let latest = database.latest_node().await.unwrap().unwrap(); 331 assert_eq!(latest.0, latest_node_id); 332 } 333 }