Skip to content

Commit

Permalink
Enhance client to push blobs, mount blobs, and push raw manifests
Browse files Browse the repository at this point in the history
Signed-off-by: Adolfo Ochagavía <adolfo@ochagavia.nl>
  • Loading branch information
aochagavia authored and flavio committed Sep 26, 2023
1 parent 1441622 commit 1c511f2
Showing 1 changed file with 107 additions and 35 deletions.
142 changes: 107 additions & 35 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,49 +354,23 @@ impl Client {
// Upload layers
stream::iter(layers)
.map(|layer| {
// This avoids moving `self` which is &mut Self
// into the async block. We only want to capture
// as &Self
let this = &self;
async move {
let digest = layer.sha256_digest();
match this
.push_blob_chunked(image_ref, &layer.data, &digest)
.await
{
Err(OciDistributionError::SpecViolationError(violation)) => {
warn!(
?violation,
"Registry is not respecting the OCI Distribution \
Specification when doing chunked push operations"
);
warn!("Attempting monolithic push");
this.push_blob_monolithically(image_ref, &layer.data, &digest)
.await?;
}
Err(e) => return Err(e),
_ => {}
};

Ok(())
this.push_blob(image_ref, &layer.data, &digest).await?;
Result::Ok(())
}
})
.boxed() // Workaround to rustc issue https://github.com/rust-lang/rust/issues/104382
.buffer_unordered(self.config.max_concurrent_upload)
.try_for_each(future::ok)
.await?;

let config_url = match self
.push_blob_chunked(image_ref, &config.data, &manifest.config.digest)
.await
{
Ok(url) => url,
Err(OciDistributionError::SpecViolationError(violation)) => {
warn!(?violation, "Registry is not respecting the OCI Distribution Specification when doing chunked push operations");
warn!("Attempting monolithic push");
self.push_blob_monolithically(image_ref, &config.data, &manifest.config.digest)
.await?
}
Err(e) => return Err(e),
};

let config_url = self
.push_blob(image_ref, &config.data, &manifest.config.digest)
.await?;
let manifest_url = self.push_manifest(image_ref, &manifest.into()).await?;

Ok(PushResponse {
Expand All @@ -405,6 +379,24 @@ impl Client {
})
}

/// Pushes a blob to the registry
pub async fn push_blob(
&self,
image_ref: &Reference,
data: &[u8],
digest: &str,
) -> Result<String> {
match self.push_blob_chunked(image_ref, data, digest).await {
Ok(url) => Ok(url),
Err(OciDistributionError::SpecViolationError(violation)) => {
warn!(?violation, "Registry is not respecting the OCI Distribution Specification when doing chunked push operations");
warn!("Attempting monolithic push");
self.push_blob_monolithically(image_ref, data, digest).await
}
Err(e) => Err(e),
}
}

/// Pushes a blob to the registry as a monolith
///
/// Returns the pullable location of the blob
Expand Down Expand Up @@ -1002,6 +994,28 @@ impl Client {
))
}

/// Mounts a blob to the provided reference, from the given source
pub async fn mount_blob(
&self,
image: &Reference,
source: &Reference,
digest: &str,
) -> Result<()> {
let base_url = self.to_v2_blob_upload_url(image);
let url = format!("{}?mount={}&from={}", base_url, digest, source.repository());

let res = RequestBuilderWrapper::from_client(self, |client| client.post(url.clone()))
.apply_auth(image, RegistryOperation::Push)?
.into_request_builder()
.send()
.await?;

self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)
.await?;

Ok(())
}

/// Pushes the manifest for a specified image
///
/// Returns pullable manifest URL
Expand All @@ -1018,12 +1032,30 @@ impl Client {
let mut ser = serde_json::Serializer::with_formatter(&mut body, CanonicalFormatter::new());
manifest.serialize(&mut ser).unwrap();

self.push_manifest_raw(image, body, manifest.content_type().parse().unwrap())
.await
}

/// Pushes the manifest, provided as raw bytes, for a specified image
///
/// Returns pullable manifest url
pub async fn push_manifest_raw(
&self,
image: &Reference,
body: Vec<u8>,
content_type: HeaderValue,
) -> Result<String> {
let url = self.to_v2_manifest_url(image);
debug!(?url, ?content_type, "push manifest");

let mut headers = HeaderMap::new();
headers.insert("Content-Type", content_type);

// Calculate the digest of the manifest, this is useful
// if the remote registry is violating the OCI Distribution Specification.
// See below for more details.
let manifest_hash = sha256_digest(&body);

debug!(?url, ?content_type, "push manifest");
let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone()))
.apply_auth(image, RegistryOperation::Push)?
.into_request_builder()
Expand Down Expand Up @@ -2383,6 +2415,46 @@ mod test {
assert_eq!(manifest.config.digest, pulled_manifest.config.digest);
}

#[tokio::test]
#[cfg(feature = "test-registry")]
async fn test_mount() {
// initialize the registry
let docker = clients::Cli::default();
let test_container = docker.run(registry_image());
let port = test_container.get_host_port_ipv4(5000);

let mut c = Client::new(ClientConfig {
protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]),
..Default::default()
});

// Create a dummy layer and push it to `layer-repository`
let layer_reference: Reference = format!("localhost:{}/layer-repository", port)
.parse()
.unwrap();
let layer_data = vec![1u8, 2, 3, 4];
let layer_digest = sha256_digest(&layer_data);
c.push_blob(&layer_reference, &[1, 2, 3, 4], &layer_digest)
.await
.expect("Failed to push");

// Mount the layer at `image-repository`
let image_reference: Reference = format!("localhost:{}/image-repository", port)
.parse()
.unwrap();
c.mount_blob(&image_reference, &layer_reference, &layer_digest)
.await
.expect("Failed to mount");

// Pull the layer from `image-repository`
let mut buf = Vec::new();
c.pull_blob(&image_reference, &layer_digest, &mut buf)
.await
.expect("Failed to pull");

assert_eq!(layer_data, buf);
}

#[tokio::test]
async fn test_platform_resolution() {
// test that we get an error when we pull a manifest list
Expand Down

0 comments on commit 1c511f2

Please sign in to comment.