lib.rs
1 use distrox_api::node::AddNodeRequest; 2 use distrox_api::node::GetHeadRequest; 3 use distrox_api::node::GetNodeRequest; 4 use distrox_api::node::SetHeadRequest; 5 use futures::FutureExt; 6 use futures::StreamExt; 7 use futures::future::BoxFuture; 8 9 pub mod error; 10 mod model; 11 12 #[derive(Clone)] 13 pub struct Database { 14 path: camino::Utf8PathBuf, 15 16 data: std::sync::Arc<tokio::sync::Mutex<crate::model::Model>>, 17 } 18 19 impl Database { 20 pub async fn load(path: camino::Utf8PathBuf) -> Result<Self, crate::error::Error> { 21 let s = tokio::fs::read_to_string(&path).await?; 22 let data = serde_json::from_str(&s)?; 23 24 Ok(Self { 25 path, 26 data: std::sync::Arc::new(tokio::sync::Mutex::new(data)), 27 }) 28 } 29 30 async fn write(&self) -> Result<(), crate::error::Error> { 31 let data = self.data.lock().await; 32 let s = serde_json::to_string(&*data)?; 33 tokio::fs::write(&self.path, s).await?; 34 Ok(()) 35 } 36 } 37 38 impl tower::Service<GetHeadRequest> for Database { 39 type Response = Option<distrox_model::node::NodeId>; 40 type Error = crate::error::Error; 41 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>; 42 43 fn poll_ready( 44 &mut self, 45 _cx: &mut std::task::Context<'_>, 46 ) -> std::task::Poll<Result<(), Self::Error>> { 47 std::task::Poll::Ready(Ok(())) 48 } 49 50 fn call(&mut self, req: GetHeadRequest) -> Self::Future { 51 let this = self.clone(); 52 53 async move { 54 let pkey = req.key.into(); 55 let Some(nid) = this.data.lock().await.heads.get(&pkey).cloned() else { 56 return Ok(None); 57 }; 58 59 let nid = distrox_model::node::NodeId::try_from(nid)?; 60 Ok(Some(nid)) 61 } 62 .boxed() 63 } 64 } 65 66 impl tower::Service<SetHeadRequest> for Database { 67 type Response = (); 68 type Error = crate::error::Error; 69 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>; 70 71 fn poll_ready( 72 &mut self, 73 _cx: &mut std::task::Context<'_>, 74 ) -> std::task::Poll<Result<(), Self::Error>> { 75 std::task::Poll::Ready(Ok(())) 76 } 77 78 fn call(&mut self, req: SetHeadRequest) -> Self::Future { 79 let this = self.clone(); 80 81 async move { 82 this.data 83 .lock() 84 .await 85 .heads 86 .insert(req.key.into(), req.id.into()); 87 Ok(()) 88 } 89 .boxed() 90 } 91 } 92 93 impl tower::Service<GetNodeRequest> for Database { 94 type Response = Option<( 95 distrox_model::node::Node, 96 nonempty::NonEmpty<distrox_model::node::Signature>, 97 )>; 98 type Error = crate::error::Error; 99 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>; 100 101 fn poll_ready( 102 &mut self, 103 _cx: &mut std::task::Context<'_>, 104 ) -> std::task::Poll<Result<(), Self::Error>> { 105 std::task::Poll::Ready(Ok(())) 106 } 107 108 fn call(&mut self, req: GetNodeRequest) -> Self::Future { 109 let this = self.clone(); 110 async move { 111 this.data 112 .lock() 113 .await 114 .nodes 115 .get(&crate::model::NodeId::from(req.id)) 116 .cloned() 117 .map(|model::NodeAndSignatures { node, signatures }| { 118 let s = signatures.map(distrox_model::node::Signature::from); 119 distrox_model::node::Node::try_from(node).map(|node| (node, s)) 120 }) 121 .transpose() 122 .map_err(Self::Error::from) 123 } 124 .boxed() 125 } 126 } 127 128 impl tower::Service<AddNodeRequest> for Database { 129 type Response = (); 130 type Error = crate::error::Error; 131 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>; 132 133 fn poll_ready( 134 &mut self, 135 _cx: &mut std::task::Context<'_>, 136 ) -> std::task::Poll<Result<(), Self::Error>> { 137 std::task::Poll::Ready(Ok(())) 138 } 139 140 fn call(&mut self, req: AddNodeRequest) -> Self::Future { 141 let this = self.clone(); 142 143 async move { 144 let node = req.node.try_into()?; 145 let signatures = req.signature.map(Into::into); 146 this.data 147 .lock() 148 .await 149 .nodes 150 .insert(req.id.into(), model::NodeAndSignatures { node, signatures }); 151 152 this.write().await 153 } 154 .boxed() 155 } 156 } 157 158 impl tower::Service<distrox_api::content::ContentAddRequest> for Database { 159 type Response = (); 160 type Error = crate::error::Error; 161 type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>; 162 163 fn poll_ready( 164 &mut self, 165 _cx: &mut std::task::Context<'_>, 166 ) -> std::task::Poll<Result<(), Self::Error>> { 167 std::task::Poll::Ready(Ok(())) 168 } 169 170 fn call(&mut self, req: distrox_api::content::ContentAddRequest) -> Self::Future { 171 let this = self.clone(); 172 173 async move { 174 this.data.lock().await.contents.insert( 175 crate::model::ContentId::from(req.id), 176 crate::model::Content::from(req.content), 177 ); 178 179 this.write().await 180 } 181 .boxed() 182 } 183 } 184 185 impl tower::Service<distrox_api::content::ContentGetRequest> for Database { 186 type Response = Option<distrox_model::content::Content>; 187 type Error = crate::error::Error; 188 type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>; 189 190 fn poll_ready( 191 &mut self, 192 _cx: &mut std::task::Context<'_>, 193 ) -> std::task::Poll<Result<(), Self::Error>> { 194 std::task::Poll::Ready(Ok(())) 195 } 196 197 fn call(&mut self, req: distrox_api::content::ContentGetRequest) -> Self::Future { 198 let this = self.clone(); 199 200 async move { 201 Ok(this 202 .data 203 .lock() 204 .await 205 .contents 206 .get(&crate::model::ContentId::from(req.id)) 207 .cloned() 208 .map(distrox_model::content::Content::from)) 209 } 210 .boxed() 211 } 212 } 213 214 impl tower::Service<distrox_api::payload::PayloadGetRequest> for Database { 215 type Response = Option<futures::stream::BoxStream<'static, Result<u8, Self::Error>>>; 216 type Error = crate::error::Error; 217 type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>; 218 219 fn poll_ready( 220 &mut self, 221 _cx: &mut std::task::Context<'_>, 222 ) -> std::task::Poll<Result<(), Self::Error>> { 223 std::task::Poll::Ready(Ok(())) 224 } 225 226 fn call( 227 &mut self, 228 distrox_api::payload::PayloadGetRequest { id }: distrox_api::payload::PayloadGetRequest, 229 ) -> Self::Future { 230 let this = self.clone(); 231 let id = model::PayloadId::from(id); 232 233 async move { 234 tracing::debug!("Fetching payload from database"); 235 let Some(payload) = this.data.lock().await.payloads.get(&id).cloned() else { 236 return Ok(None); 237 }; 238 Ok(Some(futures::stream::iter(payload).map(Ok).boxed())) 239 } 240 .boxed() 241 } 242 }