lib.rs
  1  use distrox_api::node::AddNodeRequest;
  2  use distrox_api::node::GetHeadRequest;
  3  use distrox_api::node::GetNodeRequest;
  4  use distrox_api::node::SetHeadRequest;
  5  use futures::FutureExt;
  6  use futures::StreamExt;
  7  use futures::future::BoxFuture;
  8  
  9  pub mod error;
 10  mod model;
 11  
 12  #[derive(Clone)]
 13  pub struct Database {
 14      path: camino::Utf8PathBuf,
 15  
 16      data: std::sync::Arc<tokio::sync::Mutex<crate::model::Model>>,
 17  }
 18  
 19  impl Database {
 20      pub async fn load(path: camino::Utf8PathBuf) -> Result<Self, crate::error::Error> {
 21          let s = tokio::fs::read_to_string(&path).await?;
 22          let data = serde_json::from_str(&s)?;
 23  
 24          Ok(Self {
 25              path,
 26              data: std::sync::Arc::new(tokio::sync::Mutex::new(data)),
 27          })
 28      }
 29  
 30      async fn write(&self) -> Result<(), crate::error::Error> {
 31          let data = self.data.lock().await;
 32          let s = serde_json::to_string(&*data)?;
 33          tokio::fs::write(&self.path, s).await?;
 34          Ok(())
 35      }
 36  }
 37  
 38  impl tower::Service<GetHeadRequest> for Database {
 39      type Response = Option<distrox_model::node::NodeId>;
 40      type Error = crate::error::Error;
 41      type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
 42  
 43      fn poll_ready(
 44          &mut self,
 45          _cx: &mut std::task::Context<'_>,
 46      ) -> std::task::Poll<Result<(), Self::Error>> {
 47          std::task::Poll::Ready(Ok(()))
 48      }
 49  
 50      fn call(&mut self, req: GetHeadRequest) -> Self::Future {
 51          let this = self.clone();
 52  
 53          async move {
 54              let pkey = req.key.into();
 55              let Some(nid) = this.data.lock().await.heads.get(&pkey).cloned() else {
 56                  return Ok(None);
 57              };
 58  
 59              let nid = distrox_model::node::NodeId::try_from(nid)?;
 60              Ok(Some(nid))
 61          }
 62          .boxed()
 63      }
 64  }
 65  
 66  impl tower::Service<SetHeadRequest> for Database {
 67      type Response = ();
 68      type Error = crate::error::Error;
 69      type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
 70  
 71      fn poll_ready(
 72          &mut self,
 73          _cx: &mut std::task::Context<'_>,
 74      ) -> std::task::Poll<Result<(), Self::Error>> {
 75          std::task::Poll::Ready(Ok(()))
 76      }
 77  
 78      fn call(&mut self, req: SetHeadRequest) -> Self::Future {
 79          let this = self.clone();
 80  
 81          async move {
 82              this.data
 83                  .lock()
 84                  .await
 85                  .heads
 86                  .insert(req.key.into(), req.id.into());
 87              Ok(())
 88          }
 89          .boxed()
 90      }
 91  }
 92  
 93  impl tower::Service<GetNodeRequest> for Database {
 94      type Response = Option<(
 95          distrox_model::node::Node,
 96          nonempty::NonEmpty<distrox_model::node::Signature>,
 97      )>;
 98      type Error = crate::error::Error;
 99      type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
100  
101      fn poll_ready(
102          &mut self,
103          _cx: &mut std::task::Context<'_>,
104      ) -> std::task::Poll<Result<(), Self::Error>> {
105          std::task::Poll::Ready(Ok(()))
106      }
107  
108      fn call(&mut self, req: GetNodeRequest) -> Self::Future {
109          let this = self.clone();
110          async move {
111              this.data
112                  .lock()
113                  .await
114                  .nodes
115                  .get(&crate::model::NodeId::from(req.id))
116                  .cloned()
117                  .map(|model::NodeAndSignatures { node, signatures }| {
118                      let s = signatures.map(distrox_model::node::Signature::from);
119                      distrox_model::node::Node::try_from(node).map(|node| (node, s))
120                  })
121                  .transpose()
122                  .map_err(Self::Error::from)
123          }
124          .boxed()
125      }
126  }
127  
128  impl tower::Service<AddNodeRequest> for Database {
129      type Response = ();
130      type Error = crate::error::Error;
131      type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
132  
133      fn poll_ready(
134          &mut self,
135          _cx: &mut std::task::Context<'_>,
136      ) -> std::task::Poll<Result<(), Self::Error>> {
137          std::task::Poll::Ready(Ok(()))
138      }
139  
140      fn call(&mut self, req: AddNodeRequest) -> Self::Future {
141          let this = self.clone();
142  
143          async move {
144              let node = req.node.try_into()?;
145              let signatures = req.signature.map(Into::into);
146              this.data
147                  .lock()
148                  .await
149                  .nodes
150                  .insert(req.id.into(), model::NodeAndSignatures { node, signatures });
151  
152              this.write().await
153          }
154          .boxed()
155      }
156  }
157  
158  impl tower::Service<distrox_api::content::ContentAddRequest> for Database {
159      type Response = ();
160      type Error = crate::error::Error;
161      type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
162  
163      fn poll_ready(
164          &mut self,
165          _cx: &mut std::task::Context<'_>,
166      ) -> std::task::Poll<Result<(), Self::Error>> {
167          std::task::Poll::Ready(Ok(()))
168      }
169  
170      fn call(&mut self, req: distrox_api::content::ContentAddRequest) -> Self::Future {
171          let this = self.clone();
172  
173          async move {
174              this.data.lock().await.contents.insert(
175                  crate::model::ContentId::from(req.id),
176                  crate::model::Content::from(req.content),
177              );
178  
179              this.write().await
180          }
181          .boxed()
182      }
183  }
184  
185  impl tower::Service<distrox_api::content::ContentGetRequest> for Database {
186      type Response = Option<distrox_model::content::Content>;
187      type Error = crate::error::Error;
188      type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
189  
190      fn poll_ready(
191          &mut self,
192          _cx: &mut std::task::Context<'_>,
193      ) -> std::task::Poll<Result<(), Self::Error>> {
194          std::task::Poll::Ready(Ok(()))
195      }
196  
197      fn call(&mut self, req: distrox_api::content::ContentGetRequest) -> Self::Future {
198          let this = self.clone();
199  
200          async move {
201              Ok(this
202                  .data
203                  .lock()
204                  .await
205                  .contents
206                  .get(&crate::model::ContentId::from(req.id))
207                  .cloned()
208                  .map(distrox_model::content::Content::from))
209          }
210          .boxed()
211      }
212  }
213  
214  impl tower::Service<distrox_api::payload::PayloadGetRequest> for Database {
215      type Response = Option<futures::stream::BoxStream<'static, Result<u8, Self::Error>>>;
216      type Error = crate::error::Error;
217      type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
218  
219      fn poll_ready(
220          &mut self,
221          _cx: &mut std::task::Context<'_>,
222      ) -> std::task::Poll<Result<(), Self::Error>> {
223          std::task::Poll::Ready(Ok(()))
224      }
225  
226      fn call(
227          &mut self,
228          distrox_api::payload::PayloadGetRequest { id }: distrox_api::payload::PayloadGetRequest,
229      ) -> Self::Future {
230          let this = self.clone();
231          let id = model::PayloadId::from(id);
232  
233          async move {
234              tracing::debug!("Fetching payload from database");
235              let Some(payload) = this.data.lock().await.payloads.get(&id).cloned() else {
236                  return Ok(None);
237              };
238              Ok(Some(futures::stream::iter(payload).map(Ok).boxed()))
239          }
240          .boxed()
241      }
242  }