content.rs
1 use std::str::FromStr; 2 3 use diesel::Connection; 4 use diesel::ExpressionMethods; 5 use diesel::QueryDsl; 6 use diesel::RunQueryDsl; 7 use diesel::SelectableHelper; 8 use futures::FutureExt; 9 use tracing::Instrument; 10 11 use crate::database::error::BaseDatabaseError; 12 13 impl tower::Service<distrox_api::content::ContentGetRequest> for super::Database { 14 type Response = distrox_api::content::ContentGetResponse; 15 type Error = distrox_api::content::ContentError; 16 type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>; 17 18 fn poll_ready( 19 &mut self, 20 _cx: &mut std::task::Context<'_>, 21 ) -> std::task::Poll<Result<(), Self::Error>> { 22 std::task::Poll::Ready(Ok(())) 23 } 24 25 fn call(&mut self, req: distrox_api::content::ContentGetRequest) -> Self::Future { 26 let this = self.clone(); 27 28 async move { 29 let content = { 30 use crate::schema::contents::dsl::*; 31 32 let mut conn = this 33 .conn 34 .get() 35 .map_err(BaseDatabaseError::GettingConnection) 36 .map_err(ContentPersistenceError::from)?; 37 38 tracing::debug!(content_id = ?req.id, "Fetching content"); 39 match contents 40 .filter(content_id.eq(req.id.as_ref().to_bytes())) 41 .select(crate::models::content::Content::as_select()) 42 .first(&mut conn) 43 { 44 Err(diesel::NotFound) => { 45 return Ok(distrox_api::content::ContentGetResponse(None)); 46 } 47 Err(other) => { 48 return Err( 49 ContentPersistenceError::from(BaseDatabaseError::Query(other)).into(), 50 ); 51 } 52 Ok(content) => content, 53 } 54 }; 55 debug_assert!(content.content_id == req.id.inner().to_bytes()); 56 57 tracing::debug!(content_id = ?req.id, "Successfully loaded content"); 58 let content = distrox_model::content::Content { 59 version: content.version as u64, 60 timestamp: content.timestamp, 61 mime: content 62 .mimetype 63 .as_ref() 64 .map(|m| mime::Mime::from_str(m)) 65 .transpose() 66 .map_err(ContentPersistenceError::MimeFromStr)?, 67 payload: content 68 .payload_id 69 .as_ref() 70 .map(|pid| distrox_model::payload::PayloadId::from_bytes(pid)) 71 .transpose() 72 .map_err(ContentPersistenceError::from)?, 73 }; 74 75 Ok(distrox_api::content::ContentGetResponse(Some(content))) 76 } 77 .instrument(tracing::error_span!(parent: &self.span, "get-content")) 78 .boxed() 79 } 80 } 81 82 impl tower::Service<distrox_api::content::ContentAddRequest> for super::Database { 83 type Response = distrox_api::content::ContentAddResponse; 84 type Error = distrox_api::content::ContentError; 85 type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>; 86 87 fn poll_ready( 88 &mut self, 89 _cx: &mut std::task::Context<'_>, 90 ) -> std::task::Poll<Result<(), Self::Error>> { 91 std::task::Poll::Ready(Ok(())) 92 } 93 94 fn call(&mut self, req: distrox_api::content::ContentAddRequest) -> Self::Future { 95 let this = self.clone(); 96 97 async move { 98 let content_id = req.id.inner().to_bytes(); 99 let payload_id = req.content.payload.map(|pid| pid.inner().to_bytes()); 100 let mimetype = req.content.mime.map(|m| m.to_string()); 101 102 let new_content = crate::models::content::NewContent { 103 content_id: &content_id, 104 version: req.content.version as i32, 105 timestamp: req.content.timestamp, 106 mimetype: mimetype.as_deref(), 107 payload_id: payload_id.as_deref(), 108 }; 109 110 tracing::debug!("Starting transaction to insert content"); 111 this.conn 112 .get() 113 .map_err(BaseDatabaseError::GettingConnection) 114 .map_err(ContentPersistenceError::from)? 115 .transaction(|conn| { 116 { 117 tracing::debug!(content_id = ?content_id, "Inserting Content"); 118 if let Err(error) = diesel::insert_into(crate::schema::contents::table) 119 .values(new_content) 120 .on_conflict(crate::schema::contents::content_id) 121 .do_nothing() 122 .execute(conn) 123 { 124 tracing::warn!(?error, "Failed to insert, rolling back transaction"); 125 return Err(diesel::result::Error::RollbackTransaction); 126 } 127 } 128 Ok(distrox_api::content::ContentAddResponse(())) 129 }) 130 .map_err(BaseDatabaseError::Transaction) 131 .map_err(ContentPersistenceError::from) 132 .map_err(distrox_api::content::ContentError::from) 133 } 134 .instrument(tracing::error_span!(parent: &self.span, "add-content")) 135 .boxed() 136 } 137 } 138 139 #[derive(Debug, thiserror::Error)] 140 pub enum ContentPersistenceError { 141 #[error(transparent)] 142 Base(#[from] super::error::BaseDatabaseError), 143 144 #[error("Error while parsing MIME from database")] 145 MimeFromStr(#[source] mime::FromStrError), 146 147 #[error("Failed to deserialize JSON from database")] 148 ContentDeserError(#[source] crate::models::content::ContentDeserError), 149 150 #[error("Failed to serialize Content type to JSON for putting into database")] 151 ContentSerError(#[source] crate::models::content::ContentSerError), 152 153 #[error(transparent)] 154 PayloadIdError(#[from] distrox_model::payload::PayloadIdError), 155 } 156 157 impl From<ContentPersistenceError> for distrox_api::content::ContentError { 158 fn from(value: ContentPersistenceError) -> Self { 159 distrox_api::content::ContentError(Box::new(value)) 160 } 161 }