upload_manifest_service.rs
1 use rocket::http::ContentType; 2 use sqlx::{Pool, Transaction}; 3 use std::{ 4 fs, 5 io::Write, 6 path::{Path, PathBuf}, 7 }; 8 use uuid::Uuid; 9 10 use crate::{ 11 config::Config, 12 db::{self, blob_repository, manifest_layer_repository, manifest_repository, DB}, 13 models::manifest::Manifest, 14 registry_error::{RegistryError, RegistryResult}, 15 types::manifest::{DockerImageManifestV2, APPLICATION_CONTENT_TYPE_TOP}, 16 }; 17 18 pub async fn upload_manifest( 19 db_pool: &Pool<DB>, 20 config: &Config, 21 namespace: &str, 22 reference: &str, 23 manifest_type: &ContentType, 24 data: Vec<u8>, 25 ) -> RegistryResult<(Uuid, String, Option<String>)> { 26 let calculated_digest = format!("sha256:{}", sha256::digest(data.as_slice())); 27 28 let image_manifest = DockerImageManifestV2::parse(manifest_type, data.clone())?; 29 30 let mut transaction = db::new_transaction(db_pool).await?; 31 32 let image_blob = blob_repository::find_by_repository_and_digest( 33 &mut transaction, 34 namespace, 35 &image_manifest.config.digest, 36 ) 37 .await? 38 .ok_or(RegistryError::InvalidDigest)?; 39 40 let manifest = if reference.starts_with("sha256:") { 41 info!("Reference assumed to be digest: {reference}"); 42 save_manifest_by_digest( 43 &mut transaction, 44 namespace, 45 manifest_type, 46 image_blob.id, 47 &calculated_digest, 48 ) 49 .await? 50 } else { 51 info!("Reference assumed to be tag: {reference}"); 52 save_manifest_by_tag( 53 &mut transaction, 54 namespace, 55 reference, 56 manifest_type, 57 image_blob.id, 58 &calculated_digest, 59 ) 60 .await? 61 }; 62 63 for layer in image_manifest.layers.iter() { 64 let blob = blob_repository::find_by_repository_and_digest( 65 &mut transaction, 66 namespace, 67 &layer.digest, 68 ) 69 .await? 70 .ok_or(RegistryError::InvalidDigest)?; // TODO: This should probably return BLOB_UNKNOWN 71 72 match manifest_layer_repository::find_by_manifest_and_blob( 73 &mut transaction, 74 manifest.id.clone(), 75 blob.id.clone(), 76 ) 77 .await? 78 { 79 Some(_) => { 80 info!("Manifest layer already exists, skipping DB insertion"); 81 } 82 None => { 83 manifest_layer_repository::insert( 84 &mut transaction, 85 manifest.id, 86 blob.id, 87 layer.media_type.clone(), 88 layer.size, 89 ) 90 .await?; 91 } 92 } 93 } 94 95 save_file(manifest.id, config, data)?; 96 97 transaction.commit().await?; 98 99 Ok(( 100 manifest.id, 101 calculated_digest, 102 image_manifest.subject.map(|s| s.digest), 103 )) 104 } 105 106 async fn save_manifest_by_tag( 107 transaction: &mut Transaction<'_, DB>, 108 namespace: &str, 109 tag: &str, 110 manifest_type: &ContentType, 111 image_blob_id: Uuid, 112 calculated_digest: &str, 113 ) -> RegistryResult<Manifest> { 114 let manifest = 115 match manifest_repository::find_by_repository_and_tag(transaction, namespace, Some(tag)) 116 .await? 117 { 118 Some(m) => { 119 warn!("Manifest already exists, overwriting"); 120 m 121 } 122 None => { 123 let content_type = manifest_type.to_string(); 124 let Some(content_type_sub) = content_type.strip_prefix("application/") else { 125 error!("Media type does not start with `application/`! (Got {manifest_type})"); 126 return Err(RegistryError::InvalidManifestSchema( 127 "Expected application/".to_string(), 128 )); 129 }; 130 manifest_repository::insert( 131 transaction, 132 namespace, 133 image_blob_id, 134 Some(tag), 135 &calculated_digest, 136 APPLICATION_CONTENT_TYPE_TOP, 137 content_type_sub, 138 ) 139 .await? 140 } 141 }; 142 143 Ok(manifest) 144 } 145 146 async fn save_manifest_by_digest( 147 transaction: &mut Transaction<'_, DB>, 148 namespace: &str, 149 manifest_type: &ContentType, 150 image_blob_id: Uuid, 151 calculated_digest: &str, 152 ) -> RegistryResult<Manifest> { 153 let manifest = match manifest_repository::find_first_by_repository_and_digest( 154 transaction, 155 namespace, 156 calculated_digest, 157 ) 158 .await? 159 { 160 Some(m) => { 161 warn!("Manifest already exists, overwriting"); 162 m 163 } 164 None => { 165 let content_type = manifest_type.to_string(); 166 let Some(content_type_sub) = content_type.strip_prefix("application/") else { 167 error!("Media type does not start with `application/`! (Got {manifest_type})"); 168 return Err(RegistryError::InvalidManifestSchema( 169 "Expected application/".to_string(), 170 )); 171 }; 172 manifest_repository::insert( 173 transaction, 174 namespace, 175 image_blob_id, 176 None, 177 &calculated_digest, 178 APPLICATION_CONTENT_TYPE_TOP, 179 content_type_sub, 180 ) 181 .await? 182 } 183 }; 184 185 Ok(manifest) 186 } 187 188 fn get_manifests_dir(config: &Config) -> PathBuf { 189 Path::new(&config.storage_directory).join("manifests") 190 } 191 192 fn to_file_path(dir_path: PathBuf, manifest_id: Uuid) -> PathBuf { 193 let mut file_path = dir_path.join(manifest_id.to_string()); 194 file_path.set_extension("json"); 195 file_path 196 } 197 198 pub fn get_manifest_file_path(config: &Config, manifest_id: Uuid) -> PathBuf { 199 let dir = get_manifests_dir(config); 200 to_file_path(dir, manifest_id) 201 } 202 203 fn save_file(manifest_id: Uuid, config: &Config, data: Vec<u8>) -> RegistryResult<()> { 204 let manifests_dir = get_manifests_dir(config); 205 206 info!("Creating directories {manifests_dir:?}"); 207 fs::create_dir_all(manifests_dir.clone())?; 208 209 let file_path = to_file_path(manifests_dir, manifest_id); 210 211 if file_path.exists() { 212 info!("Manifest already exists at path {file_path:?}"); 213 return Ok(()); 214 } 215 216 info!("Creating file manifest file {file_path:?}"); 217 let mut file = fs::File::create(file_path)?; 218 219 file.write_all(&data)?; 220 221 Ok(()) 222 }