mod.rs
  1  use diesel::BelongingToDsl;
  2  use diesel::ExpressionMethods;
  3  use diesel::QueryDsl;
  4  use diesel::RunQueryDsl;
  5  use diesel::SelectableHelper;
  6  use diesel::SqliteConnection;
  7  use diesel_migrations::MigrationHarness;
  8  use tracing::Level;
  9  
 10  use crate::database::error::BaseDatabaseError;
 11  use crate::error::Error;
 12  
 13  mod content;
 14  pub(crate) mod error;
 15  mod node;
 16  mod payload;
 17  
 18  pub use content::ContentPersistenceError;
 19  pub use node::NodePersistenceError;
 20  pub use payload::PayloadPersistenceError;
 21  
 22  const MIGRATIONS: diesel_migrations::EmbeddedMigrations =
 23      diesel_migrations::embed_migrations!("./migrations");
 24  
 25  #[derive(Clone)]
 26  pub struct Database {
 27      conn: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<SqliteConnection>>,
 28      span: tracing::Span,
 29  }
 30  
 31  impl Database {
 32      pub fn load(path: camino::Utf8PathBuf) -> Result<Self, Error> {
 33          let conn = diesel::r2d2::Pool::builder()
 34              .test_on_check_out(true)
 35              .build(diesel::r2d2::ConnectionManager::new(path))?;
 36  
 37          Self::new(conn)
 38      }
 39  
 40      pub fn inmemory() -> Result<Self, Error> {
 41          let conn = diesel::r2d2::Pool::builder()
 42              .test_on_check_out(true)
 43              .build(diesel::r2d2::ConnectionManager::new(":memory:"))?;
 44  
 45          Self::new(conn)
 46      }
 47  
 48      fn new(
 49          pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<SqliteConnection>>,
 50      ) -> Result<Self, Error> {
 51          // TODO: Have a look at WAL journaling mode
 52          //
 53          // https://sqlite.org/wal.html
 54          // https://stackoverflow.com/a/57717533
 55          //
 56          // and also:
 57          //
 58          // https://www.sqlite.org/pragma.html#pragma_synchronous
 59          //
 60          let pragmas = [
 61              "PRAGMA journal_mode = WAL",
 62              "PRAGMA locking_mode = EXCLUSIVE",
 63              "PRAGMA synchronous = EXTRA",
 64              "PRAGMA auto_vacuum = FULL",
 65          ];
 66  
 67          let mut conn = pool.get().map_err(BaseDatabaseError::GettingConnection)?;
 68  
 69          for pragma in pragmas {
 70              diesel::sql_query(pragma)
 71                  .execute(&mut conn)
 72                  .map_err(Error::Pragma)?;
 73          }
 74  
 75          let pending_migrations = conn
 76              .pending_migrations(MIGRATIONS)
 77              .map_err(Error::Migration)?;
 78  
 79          tracing::info!(
 80              "There are {} migrations pending, applying them now",
 81              pending_migrations.len()
 82          );
 83  
 84          for migration in pending_migrations {
 85              tracing::debug!("Applying migration: {}", migration.name());
 86  
 87              conn.run_migration(&migration).map_err(Error::Migration)?;
 88          }
 89  
 90          drop(conn);
 91  
 92          Ok(Self {
 93              conn: pool,
 94              span: tracing::error_span!("db-sqlite"),
 95          })
 96      }
 97  
 98      /// Return the latest node, with its ID
 99      #[tracing::instrument(
100          name = "latest_node",
101          follows_from = [tracing::Span::current()],
102          level = Level::ERROR,
103          parent = &self.span,
104          skip(self),
105          ret(level = Level::DEBUG),
106          err(level = Level::WARN)
107      )]
108      pub async fn latest_node(
109          &self,
110      ) -> Result<
111          Option<(
112              distrox_model::node::NodeId,
113              distrox_model::node::Node,
114              nonempty::NonEmpty<distrox_model::node::Signature>,
115          )>,
116          NodePersistenceError,
117      > {
118          tracing::debug!("Fetching latest node from database");
119          let (database_node, signatures) = {
120              use crate::schema::nodes::dsl::*;
121  
122              let mut conn = self
123                  .conn
124                  .get()
125                  .map_err(BaseDatabaseError::GettingConnection)?;
126  
127              tracing::debug!("Fetching latest node id");
128              let node = match nodes
129                  .order_by(node_id.desc())
130                  .select(crate::models::node::Node::as_select())
131                  .first(&mut conn)
132              {
133                  Err(diesel::NotFound) => return Ok(None),
134                  Err(other) => return Err(BaseDatabaseError::Query(other).into()),
135                  Ok(node) => node,
136              };
137  
138              // TODO: Optimize this into the query above
139              let signatures = match crate::models::signature::Signature::belonging_to(&node)
140                  .select(crate::models::signature::Signature::as_select())
141                  .load(&mut conn)
142              {
143                  Err(diesel::NotFound) => return Ok(None),
144                  Err(other) => return Err(BaseDatabaseError::Query(other).into()),
145                  Ok(sigs) => sigs,
146              };
147  
148              let Some(signatures) = nonempty::NonEmpty::from_vec(signatures) else {
149                  todo!()
150              };
151  
152              (node, signatures)
153          };
154          tracing::debug!("Fetching latest node from database succeeded");
155  
156          let node_id = distrox_model::node::NodeId::from_bytes(&database_node.node_id)
157              .map_err(BaseDatabaseError::from)?;
158  
159          match self.build_model_node(database_node, signatures).await? {
160              Some((node, sigs)) => Ok(Some((node_id, node, sigs))),
161              None => Ok(None),
162          }
163      }
164  
165      async fn build_model_node(
166          &self,
167          database_node: crate::models::node::Node,
168          database_signatures: nonempty::NonEmpty<crate::models::signature::Signature>,
169      ) -> Result<
170          Option<(
171              distrox_model::node::Node,
172              nonempty::NonEmpty<distrox_model::node::Signature>,
173          )>,
174          BaseDatabaseError,
175      > {
176          let span = tracing::Span::current();
177          span.record(
178              "node_id",
179              tracing::field::debug(distrox_model::node::NodeId::from_bytes(
180                  &database_node.node_id,
181              )),
182          );
183          span.record("content_id", tracing::field::Empty);
184  
185          let parents = {
186              use crate::schema::node_parents::dsl::*;
187  
188              let mut conn = self
189                  .conn
190                  .get()
191                  .map_err(BaseDatabaseError::GettingConnection)?;
192  
193              tracing::debug!(parent: &span, "Fetching parents for node");
194              match node_parents
195                  .filter(node_id.eq(&database_node.node_id))
196                  .select(crate::models::node::NodeParents::as_select())
197                  .load(&mut conn)
198              {
199                  Err(diesel::NotFound) => Vec::new(),
200                  Err(other) => return Err(BaseDatabaseError::Query(other)),
201                  Ok(ps) => ps
202                      .into_iter()
203                      .map(|p| {
204                          tracing::debug!(
205                              parent: &span,
206                              node_id = tracing::field::debug(distrox_model::node::NodeId::from_bytes(&p.node_id)),
207                              parent = tracing::field::debug(distrox_model::node::NodeId::from_bytes(&p.parent_node_id)),
208                              "Found parent for node"
209                          );
210                          distrox_model::node::NodeId::from_bytes(&p.parent_node_id)
211                      })
212                      .collect::<Result<Vec<_>, _>>()?,
213              }
214          };
215  
216          let content_id = database_node
217              .content_id
218              .as_ref()
219              .map(|cid| {
220                  span.record("content_id", tracing::field::debug(cid));
221                  tracing::debug!(parent: &span, "Found ContentId for Node");
222                  distrox_model::content::ContentId::from_bytes(cid)
223              })
224              .transpose()?;
225  
226          tracing::debug!(parent: &span, "Successfully loaded node");
227          let signatures =
228              database_signatures.map(|sig| distrox_model::node::Signature::from(sig.signature));
229  
230          let node = distrox_model::node::Node::from_raw_parts(
231              database_node.version as u64,
232              parents,
233              content_id,
234          );
235  
236          Ok(Some((node, signatures)))
237      }
238  
239      pub async fn execute_raw_sql(&self, sql: String) -> Result<usize, Error> {
240          let mut conn = self
241              .conn
242              .get()
243              .map_err(BaseDatabaseError::GettingConnection)?;
244          let rows_affected = diesel::sql_query(sql).execute(&mut conn).map_err(|error| {
245              tracing::error!(?error, "SQL execution errored");
246              Error::RawSqlError(error.to_string())
247          })?;
248  
249          tracing::info!(?rows_affected, "Execution of SQL succeeded");
250          Ok(rows_affected)
251      }
252  }
253  
254  #[cfg(test)]
255  mod tests {
256      use distrox_api::node::AddNodeRequest;
257      use distrox_model::node::Node;
258      use distrox_model::node::NodeId;
259      use distrox_model::node::Signature;
260      use multihash_codetable::MultihashDigest;
261      use tower::Service;
262      use tower::ServiceExt;
263  
264      #[tokio::test]
265      async fn test_getting_latest_node() {
266          let mut database = super::Database::inmemory().unwrap();
267          let code = multihash_codetable::Code::Blake3_256;
268  
269          let latest_node_id = {
270              let latest_node_id = NodeId::from_bytes(&code.digest(&[1]).to_bytes()).unwrap();
271              let node = Node::from_raw_parts(1, vec![], None);
272              let sig = nonempty::NonEmpty::singleton(Signature::from(vec![]));
273  
274              ServiceExt::<AddNodeRequest>::ready(&mut database)
275                  .await
276                  .unwrap()
277                  .call(AddNodeRequest {
278                      id: latest_node_id,
279                      node,
280                      signature: sig,
281                  })
282                  .await
283                  .unwrap();
284  
285              latest_node_id
286          };
287  
288          let latest = database.latest_node().await.unwrap().unwrap();
289          assert_eq!(latest.0, latest_node_id);
290  
291          let latest_node_id = {
292              let latest_node_id = NodeId::from_bytes(&code.digest(&[2]).to_bytes()).unwrap();
293              let node = Node::from_raw_parts(1, vec![], None);
294              let sig = nonempty::NonEmpty::singleton(Signature::from(vec![]));
295  
296              ServiceExt::<AddNodeRequest>::ready(&mut database)
297                  .await
298                  .unwrap()
299                  .call(AddNodeRequest {
300                      id: latest_node_id,
301                      node,
302                      signature: sig,
303                  })
304                  .await
305                  .unwrap();
306              latest_node_id
307          };
308  
309          let latest = database.latest_node().await.unwrap().unwrap();
310          assert_eq!(latest.0, latest_node_id);
311  
312          let latest_node_id = {
313              let latest_node_id = NodeId::from_bytes(&code.digest(&[3]).to_bytes()).unwrap();
314              let node = Node::from_raw_parts(1, vec![], None);
315              let sig = nonempty::NonEmpty::singleton(Signature::from(vec![]));
316  
317              ServiceExt::<AddNodeRequest>::ready(&mut database)
318                  .await
319                  .unwrap()
320                  .call(AddNodeRequest {
321                      id: latest_node_id,
322                      node,
323                      signature: sig,
324                  })
325                  .await
326                  .unwrap();
327              latest_node_id
328          };
329  
330          let latest = database.latest_node().await.unwrap().unwrap();
331          assert_eq!(latest.0, latest_node_id);
332      }
333  }