content.rs
  1  use std::str::FromStr;
  2  
  3  use diesel::Connection;
  4  use diesel::ExpressionMethods;
  5  use diesel::QueryDsl;
  6  use diesel::RunQueryDsl;
  7  use diesel::SelectableHelper;
  8  use futures::FutureExt;
  9  use tracing::Instrument;
 10  
 11  use crate::database::error::BaseDatabaseError;
 12  
 13  impl tower::Service<distrox_api::content::ContentGetRequest> for super::Database {
 14      type Response = distrox_api::content::ContentGetResponse;
 15      type Error = distrox_api::content::ContentError;
 16      type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
 17  
 18      fn poll_ready(
 19          &mut self,
 20          _cx: &mut std::task::Context<'_>,
 21      ) -> std::task::Poll<Result<(), Self::Error>> {
 22          std::task::Poll::Ready(Ok(()))
 23      }
 24  
 25      fn call(&mut self, req: distrox_api::content::ContentGetRequest) -> Self::Future {
 26          let this = self.clone();
 27  
 28          async move {
 29              let content = {
 30                  use crate::schema::contents::dsl::*;
 31  
 32                  let mut conn = this
 33                      .conn
 34                      .get()
 35                      .map_err(BaseDatabaseError::GettingConnection)
 36                      .map_err(ContentPersistenceError::from)?;
 37  
 38                  tracing::debug!(content_id = ?req.id, "Fetching content");
 39                  match contents
 40                      .filter(content_id.eq(req.id.as_ref().to_bytes()))
 41                      .select(crate::models::content::Content::as_select())
 42                      .first(&mut conn)
 43                  {
 44                      Err(diesel::NotFound) => {
 45                          return Ok(distrox_api::content::ContentGetResponse(None));
 46                      }
 47                      Err(other) => {
 48                          return Err(
 49                              ContentPersistenceError::from(BaseDatabaseError::Query(other)).into(),
 50                          );
 51                      }
 52                      Ok(content) => content,
 53                  }
 54              };
 55              debug_assert!(content.content_id == req.id.inner().to_bytes());
 56  
 57              tracing::debug!(content_id = ?req.id, "Successfully loaded content");
 58              let content = distrox_model::content::Content {
 59                  version: content.version as u64,
 60                  timestamp: content.timestamp,
 61                  mime: content
 62                      .mimetype
 63                      .as_ref()
 64                      .map(|m| mime::Mime::from_str(m))
 65                      .transpose()
 66                      .map_err(ContentPersistenceError::MimeFromStr)?,
 67                  payload: content
 68                      .payload_id
 69                      .as_ref()
 70                      .map(|pid| distrox_model::payload::PayloadId::from_bytes(pid))
 71                      .transpose()
 72                      .map_err(ContentPersistenceError::from)?,
 73              };
 74  
 75              Ok(distrox_api::content::ContentGetResponse(Some(content)))
 76          }
 77          .instrument(tracing::error_span!(parent: &self.span, "get-content"))
 78          .boxed()
 79      }
 80  }
 81  
 82  impl tower::Service<distrox_api::content::ContentAddRequest> for super::Database {
 83      type Response = distrox_api::content::ContentAddResponse;
 84      type Error = distrox_api::content::ContentError;
 85      type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
 86  
 87      fn poll_ready(
 88          &mut self,
 89          _cx: &mut std::task::Context<'_>,
 90      ) -> std::task::Poll<Result<(), Self::Error>> {
 91          std::task::Poll::Ready(Ok(()))
 92      }
 93  
 94      fn call(&mut self, req: distrox_api::content::ContentAddRequest) -> Self::Future {
 95          let this = self.clone();
 96  
 97          async move {
 98              let content_id = req.id.inner().to_bytes();
 99              let payload_id = req.content.payload.map(|pid| pid.inner().to_bytes());
100              let mimetype = req.content.mime.map(|m| m.to_string());
101  
102              let new_content = crate::models::content::NewContent {
103                  content_id: &content_id,
104                  version: req.content.version as i32,
105                  timestamp: req.content.timestamp,
106                  mimetype: mimetype.as_deref(),
107                  payload_id: payload_id.as_deref(),
108              };
109  
110              tracing::debug!("Starting transaction to insert content");
111              this.conn
112                  .get()
113                  .map_err(BaseDatabaseError::GettingConnection)
114                  .map_err(ContentPersistenceError::from)?
115                  .transaction(|conn| {
116                      {
117                          tracing::debug!(content_id = ?content_id, "Inserting Content");
118                          if let Err(error) = diesel::insert_into(crate::schema::contents::table)
119                              .values(new_content)
120                              .on_conflict(crate::schema::contents::content_id)
121                              .do_nothing()
122                              .execute(conn)
123                          {
124                              tracing::warn!(?error, "Failed to insert, rolling back transaction");
125                              return Err(diesel::result::Error::RollbackTransaction);
126                          }
127                      }
128                      Ok(distrox_api::content::ContentAddResponse(()))
129                  })
130                  .map_err(BaseDatabaseError::Transaction)
131                  .map_err(ContentPersistenceError::from)
132                  .map_err(distrox_api::content::ContentError::from)
133          }
134          .instrument(tracing::error_span!(parent: &self.span, "add-content"))
135          .boxed()
136      }
137  }
138  
139  #[derive(Debug, thiserror::Error)]
140  pub enum ContentPersistenceError {
141      #[error(transparent)]
142      Base(#[from] super::error::BaseDatabaseError),
143  
144      #[error("Error while parsing MIME from database")]
145      MimeFromStr(#[source] mime::FromStrError),
146  
147      #[error("Failed to deserialize JSON from database")]
148      ContentDeserError(#[source] crate::models::content::ContentDeserError),
149  
150      #[error("Failed to serialize Content type to JSON for putting into database")]
151      ContentSerError(#[source] crate::models::content::ContentSerError),
152  
153      #[error(transparent)]
154      PayloadIdError(#[from] distrox_model::payload::PayloadIdError),
155  }
156  
157  impl From<ContentPersistenceError> for distrox_api::content::ContentError {
158      fn from(value: ContentPersistenceError) -> Self {
159          distrox_api::content::ContentError(Box::new(value))
160      }
161  }