/ backend / src / api / container_spec / blobs / finalize_blob_upload.rs
finalize_blob_upload.rs
  1  use rocket::{http::Header, State};
  2  use sqlx::Pool;
  3  
  4  use crate::api::container_spec::Auth;
  5  use crate::header;
  6  use crate::{
  7      api::container_spec::{blobs::utils::octet_stream::OctetStream, LOCATION_HEADER_NAME},
  8      config::Config,
  9      db::DB,
 10      registry_error::RegistryResult,
 11      services::upload_blob_service,
 12      types::session_id::SessionId,
 13  };
 14  
 15  use super::utils::content_length::ContentLength;
 16  
 17  #[derive(Responder, Debug)]
 18  pub struct FinishBlobUploadResponseData<'a> {
 19      response: &'a str,
 20      location: Header<'a>,
 21  }
 22  
 23  #[derive(Responder, Debug)]
 24  pub enum FinishBlobUploadResponse<'a> {
 25      #[response(status = 201)]
 26      Success(FinishBlobUploadResponseData<'a>),
 27      #[response(status = 500)]
 28      Failure(&'a str),
 29      #[response(status = 400)]
 30      InvalidSessionId(&'a str),
 31  }
 32  
 33  #[put("/v2/<name>/blobs/uploads/<session_id>?<digest>", data = "<blob>")]
 34  pub async fn put_upload_blob<'a>(
 35      _auth: Auth,
 36      name: &str,
 37      session_id: &'a str,
 38      digest: &'a str,
 39      content_length: ContentLength,
 40      blob: Option<OctetStream>,
 41      config: &State<Config>,
 42      db_pool: &State<Pool<DB>>,
 43  ) -> FinishBlobUploadResponse<'a> {
 44      if let Err(err) = finalize_blob_upload(
 45          db_pool,
 46          config,
 47          content_length,
 48          session_id,
 49          name,
 50          blob,
 51          digest,
 52      )
 53      .await
 54      {
 55          warn!("Failed to finalize blob upload due to error: {err:?}");
 56          return FinishBlobUploadResponse::Failure("Failed to finalize blob upload");
 57      };
 58  
 59      FinishBlobUploadResponse::Success(FinishBlobUploadResponseData {
 60          response: "Blob upload finished",
 61          location: header!(LOCATION_HEADER_NAME, format!("/v2/{name}/blobs/{digest}",)),
 62      })
 63  }
 64  
 65  async fn finalize_blob_upload(
 66      db_pool: &Pool<DB>,
 67      config: &Config,
 68      content_length: ContentLength,
 69      session_id: &str,
 70      name: &str,
 71      blob: Option<OctetStream>,
 72      digest: &str,
 73  ) -> RegistryResult<()> {
 74      let session_id = SessionId::parse(session_id)?;
 75  
 76      let final_session_id = if let Some(blob) = blob {
 77          let blob = blob.data;
 78  
 79          content_length.validate_data_length(blob.len())?;
 80  
 81          upload_blob_service::upload_blob(db_pool, name, session_id, config, blob, None)
 82              .await
 83              .map_err(|err| {
 84                  error!("Failed to upload final blob section, err {err:?}");
 85                  err
 86              })?
 87              .id
 88              .into()
 89      } else {
 90          session_id
 91      };
 92  
 93      let _blob_id =
 94          upload_blob_service::finish_blob_upload(db_pool, config, name, final_session_id, digest)
 95              .await
 96              .map_err(|err| {
 97                  error!("Failed to convert blob parts to finalized blob, err {err:?}");
 98                  err
 99              })?;
100  
101      Ok(())
102  }