upload_blob_section.rs
1 use rocket::{http::Header, State}; 2 use sqlx::Pool; 3 4 use crate::api::container_spec::{Auth, DOCKER_UPLOAD_UUID_HEADER_NAME}; 5 use crate::range; 6 use crate::registry_error::RegistryError; 7 use crate::{ 8 config::Config, db::DB, header, location, models::upload_session::UploadSession, 9 registry_error::RegistryResult, services::upload_blob_service, types::session_id::SessionId, 10 }; 11 12 use super::utils::{ 13 content_length::ContentLength, content_range::ContentRange, octet_stream::OctetStream, 14 }; 15 16 #[derive(Responder, Debug)] 17 pub struct UploadBlobResponseData<'a> { 18 data: (), 19 location: Header<'a>, 20 range: Header<'a>, 21 docker_upload_uuid: Header<'a>, 22 } 23 24 #[derive(Responder)] 25 pub enum UploadBlobResponse<'a> { 26 #[response(status = 202)] 27 Success(UploadBlobResponseData<'a>), 28 #[response(status = 416)] 29 OutOfOrder(()), 30 #[response(status = 416)] 31 AlreadyUploaded(()), 32 #[response(status = 500)] 33 Failure(&'a str), 34 } 35 36 #[patch("/v2/<name>/blobs/uploads/<session_id>", data = "<blob>")] 37 pub async fn patch_upload_blob<'a>( 38 db_pool: &State<Pool<DB>>, 39 config: &State<Config>, 40 _auth: Auth, 41 content_length: ContentLength, 42 content_range: Option<ContentRange>, 43 name: &str, 44 session_id: &str, 45 blob: OctetStream, 46 ) -> UploadBlobResponse<'a> { 47 let next_session = match handle_chunked_upload( 48 db_pool, 49 config, 50 session_id, 51 name, 52 blob, 53 content_length, 54 content_range, 55 ) 56 .await 57 { 58 Ok(next_session) => next_session, 59 Err(RegistryError::BlobPartAlreadyUploaded) => { 60 warn!("The request blob part has already been uploaeded!"); 61 return UploadBlobResponse::AlreadyUploaded(()); 62 } 63 Err(RegistryError::InvalidStartIndex) => { 64 warn!("Received invalid start index of content range"); 65 return UploadBlobResponse::OutOfOrder(()); 66 } 67 Err(err) => { 68 warn!("Failed to upload blob due to err {err:?}"); 69 return UploadBlobResponse::Failure("Failed to upload blob"); 70 } 71 }; 72 73 UploadBlobResponse::Success(UploadBlobResponseData { 74 data: (), 75 location: location!(name, next_session.id), 76 range: range!(next_session), 77 docker_upload_uuid: header!(DOCKER_UPLOAD_UUID_HEADER_NAME, next_session.id.to_string()), 78 }) 79 } 80 81 async fn handle_chunked_upload( 82 db_pool: &Pool<DB>, 83 config: &Config, 84 session_id: &str, 85 name: &str, 86 blob: OctetStream, 87 content_length: ContentLength, 88 content_range: Option<ContentRange>, 89 ) -> RegistryResult<UploadSession> { 90 let session_id = SessionId::parse(session_id)?; 91 92 if let Some(content_range) = content_range.as_ref() { 93 content_range.validate(&content_length)?; 94 } 95 96 content_length.validate_data_length(blob.data.len())?; 97 98 let new_session = upload_blob_service::upload_blob( 99 db_pool, 100 name, 101 session_id, 102 config, 103 blob.data, 104 content_range.map(|o| o.range_start), 105 ) 106 .await 107 .map_err(|err| { 108 error!("Failed to upload blob, err: {err:?}"); 109 err 110 })?; 111 112 Ok(new_session) 113 }