/ backend / src / api / container_spec / blobs / upload_blob_section.rs
upload_blob_section.rs
  1  use rocket::{http::Header, State};
  2  use sqlx::Pool;
  3  
  4  use crate::api::container_spec::{Auth, DOCKER_UPLOAD_UUID_HEADER_NAME};
  5  use crate::range;
  6  use crate::registry_error::RegistryError;
  7  use crate::{
  8      config::Config, db::DB, header, location, models::upload_session::UploadSession,
  9      registry_error::RegistryResult, services::upload_blob_service, types::session_id::SessionId,
 10  };
 11  
 12  use super::utils::{
 13      content_length::ContentLength, content_range::ContentRange, octet_stream::OctetStream,
 14  };
 15  
 16  #[derive(Responder, Debug)]
 17  pub struct UploadBlobResponseData<'a> {
 18      data: (),
 19      location: Header<'a>,
 20      range: Header<'a>,
 21      docker_upload_uuid: Header<'a>,
 22  }
 23  
 24  #[derive(Responder)]
 25  pub enum UploadBlobResponse<'a> {
 26      #[response(status = 202)]
 27      Success(UploadBlobResponseData<'a>),
 28      #[response(status = 416)]
 29      OutOfOrder(()),
 30      #[response(status = 416)]
 31      AlreadyUploaded(()),
 32      #[response(status = 500)]
 33      Failure(&'a str),
 34  }
 35  
 36  #[patch("/v2/<name>/blobs/uploads/<session_id>", data = "<blob>")]
 37  pub async fn patch_upload_blob<'a>(
 38      db_pool: &State<Pool<DB>>,
 39      config: &State<Config>,
 40      _auth: Auth,
 41      content_length: ContentLength,
 42      content_range: Option<ContentRange>,
 43      name: &str,
 44      session_id: &str,
 45      blob: OctetStream,
 46  ) -> UploadBlobResponse<'a> {
 47      let next_session = match handle_chunked_upload(
 48          db_pool,
 49          config,
 50          session_id,
 51          name,
 52          blob,
 53          content_length,
 54          content_range,
 55      )
 56      .await
 57      {
 58          Ok(next_session) => next_session,
 59          Err(RegistryError::BlobPartAlreadyUploaded) => {
 60              warn!("The request blob part has already been uploaeded!");
 61              return UploadBlobResponse::AlreadyUploaded(());
 62          }
 63          Err(RegistryError::InvalidStartIndex) => {
 64              warn!("Received invalid start index of content range");
 65              return UploadBlobResponse::OutOfOrder(());
 66          }
 67          Err(err) => {
 68              warn!("Failed to upload blob due to err {err:?}");
 69              return UploadBlobResponse::Failure("Failed to upload blob");
 70          }
 71      };
 72  
 73      UploadBlobResponse::Success(UploadBlobResponseData {
 74          data: (),
 75          location: location!(name, next_session.id),
 76          range: range!(next_session),
 77          docker_upload_uuid: header!(DOCKER_UPLOAD_UUID_HEADER_NAME, next_session.id.to_string()),
 78      })
 79  }
 80  
 81  async fn handle_chunked_upload(
 82      db_pool: &Pool<DB>,
 83      config: &Config,
 84      session_id: &str,
 85      name: &str,
 86      blob: OctetStream,
 87      content_length: ContentLength,
 88      content_range: Option<ContentRange>,
 89  ) -> RegistryResult<UploadSession> {
 90      let session_id = SessionId::parse(session_id)?;
 91  
 92      if let Some(content_range) = content_range.as_ref() {
 93          content_range.validate(&content_length)?;
 94      }
 95  
 96      content_length.validate_data_length(blob.data.len())?;
 97  
 98      let new_session = upload_blob_service::upload_blob(
 99          db_pool,
100          name,
101          session_id,
102          config,
103          blob.data,
104          content_range.map(|o| o.range_start),
105      )
106      .await
107      .map_err(|err| {
108          error!("Failed to upload blob, err: {err:?}");
109          err
110      })?;
111  
112      Ok(new_session)
113  }