/ backend / src / services / upload_manifest_service.rs
upload_manifest_service.rs
  1  use rocket::http::ContentType;
  2  use sqlx::{Pool, Transaction};
  3  use std::{
  4      fs,
  5      io::Write,
  6      path::{Path, PathBuf},
  7  };
  8  use uuid::Uuid;
  9  
 10  use crate::{
 11      config::Config,
 12      db::{self, blob_repository, manifest_layer_repository, manifest_repository, DB},
 13      models::manifest::Manifest,
 14      registry_error::{RegistryError, RegistryResult},
 15      types::manifest::{DockerImageManifestV2, APPLICATION_CONTENT_TYPE_TOP},
 16  };
 17  
 18  pub async fn upload_manifest(
 19      db_pool: &Pool<DB>,
 20      config: &Config,
 21      namespace: &str,
 22      reference: &str,
 23      manifest_type: &ContentType,
 24      data: Vec<u8>,
 25  ) -> RegistryResult<(Uuid, String, Option<String>)> {
 26      let calculated_digest = format!("sha256:{}", sha256::digest(data.as_slice()));
 27  
 28      let image_manifest = DockerImageManifestV2::parse(manifest_type, data.clone())?;
 29  
 30      let mut transaction = db::new_transaction(db_pool).await?;
 31  
 32      let image_blob = blob_repository::find_by_repository_and_digest(
 33          &mut transaction,
 34          namespace,
 35          &image_manifest.config.digest,
 36      )
 37      .await?
 38      .ok_or(RegistryError::InvalidDigest)?;
 39  
 40      let manifest = if reference.starts_with("sha256:") {
 41          info!("Reference assumed to be digest: {reference}");
 42          save_manifest_by_digest(
 43              &mut transaction,
 44              namespace,
 45              manifest_type,
 46              image_blob.id,
 47              &calculated_digest,
 48          )
 49          .await?
 50      } else {
 51          info!("Reference assumed to be tag: {reference}");
 52          save_manifest_by_tag(
 53              &mut transaction,
 54              namespace,
 55              reference,
 56              manifest_type,
 57              image_blob.id,
 58              &calculated_digest,
 59          )
 60          .await?
 61      };
 62  
 63      for layer in image_manifest.layers.iter() {
 64          let blob = blob_repository::find_by_repository_and_digest(
 65              &mut transaction,
 66              namespace,
 67              &layer.digest,
 68          )
 69          .await?
 70          .ok_or(RegistryError::InvalidDigest)?; // TODO: This should probably return BLOB_UNKNOWN
 71  
 72          match manifest_layer_repository::find_by_manifest_and_blob(
 73              &mut transaction,
 74              manifest.id.clone(),
 75              blob.id.clone(),
 76          )
 77          .await?
 78          {
 79              Some(_) => {
 80                  info!("Manifest layer already exists, skipping DB insertion");
 81              }
 82              None => {
 83                  manifest_layer_repository::insert(
 84                      &mut transaction,
 85                      manifest.id,
 86                      blob.id,
 87                      layer.media_type.clone(),
 88                      layer.size,
 89                  )
 90                  .await?;
 91              }
 92          }
 93      }
 94  
 95      save_file(manifest.id, config, data)?;
 96  
 97      transaction.commit().await?;
 98  
 99      Ok((
100          manifest.id,
101          calculated_digest,
102          image_manifest.subject.map(|s| s.digest),
103      ))
104  }
105  
106  async fn save_manifest_by_tag(
107      transaction: &mut Transaction<'_, DB>,
108      namespace: &str,
109      tag: &str,
110      manifest_type: &ContentType,
111      image_blob_id: Uuid,
112      calculated_digest: &str,
113  ) -> RegistryResult<Manifest> {
114      let manifest =
115          match manifest_repository::find_by_repository_and_tag(transaction, namespace, Some(tag))
116              .await?
117          {
118              Some(m) => {
119                  warn!("Manifest already exists, overwriting");
120                  m
121              }
122              None => {
123                  let content_type = manifest_type.to_string();
124                  let Some(content_type_sub) = content_type.strip_prefix("application/") else {
125                      error!("Media type does not start with `application/`! (Got {manifest_type})");
126                      return Err(RegistryError::InvalidManifestSchema(
127                          "Expected application/".to_string(),
128                      ));
129                  };
130                  manifest_repository::insert(
131                      transaction,
132                      namespace,
133                      image_blob_id,
134                      Some(tag),
135                      &calculated_digest,
136                      APPLICATION_CONTENT_TYPE_TOP,
137                      content_type_sub,
138                  )
139                  .await?
140              }
141          };
142  
143      Ok(manifest)
144  }
145  
146  async fn save_manifest_by_digest(
147      transaction: &mut Transaction<'_, DB>,
148      namespace: &str,
149      manifest_type: &ContentType,
150      image_blob_id: Uuid,
151      calculated_digest: &str,
152  ) -> RegistryResult<Manifest> {
153      let manifest = match manifest_repository::find_first_by_repository_and_digest(
154          transaction,
155          namespace,
156          calculated_digest,
157      )
158      .await?
159      {
160          Some(m) => {
161              warn!("Manifest already exists, overwriting");
162              m
163          }
164          None => {
165              let content_type = manifest_type.to_string();
166              let Some(content_type_sub) = content_type.strip_prefix("application/") else {
167                  error!("Media type does not start with `application/`! (Got {manifest_type})");
168                  return Err(RegistryError::InvalidManifestSchema(
169                      "Expected application/".to_string(),
170                  ));
171              };
172              manifest_repository::insert(
173                  transaction,
174                  namespace,
175                  image_blob_id,
176                  None,
177                  &calculated_digest,
178                  APPLICATION_CONTENT_TYPE_TOP,
179                  content_type_sub,
180              )
181              .await?
182          }
183      };
184  
185      Ok(manifest)
186  }
187  
188  fn get_manifests_dir(config: &Config) -> PathBuf {
189      Path::new(&config.storage_directory).join("manifests")
190  }
191  
192  fn to_file_path(dir_path: PathBuf, manifest_id: Uuid) -> PathBuf {
193      let mut file_path = dir_path.join(manifest_id.to_string());
194      file_path.set_extension("json");
195      file_path
196  }
197  
198  pub fn get_manifest_file_path(config: &Config, manifest_id: Uuid) -> PathBuf {
199      let dir = get_manifests_dir(config);
200      to_file_path(dir, manifest_id)
201  }
202  
203  fn save_file(manifest_id: Uuid, config: &Config, data: Vec<u8>) -> RegistryResult<()> {
204      let manifests_dir = get_manifests_dir(config);
205  
206      info!("Creating directories {manifests_dir:?}");
207      fs::create_dir_all(manifests_dir.clone())?;
208  
209      let file_path = to_file_path(manifests_dir, manifest_id);
210  
211      if file_path.exists() {
212          info!("Manifest already exists at path {file_path:?}");
213          return Ok(());
214      }
215  
216      info!("Creating file manifest file {file_path:?}");
217      let mut file = fs::File::create(file_path)?;
218  
219      file.write_all(&data)?;
220  
221      Ok(())
222  }