Skip to content

Commit

Permalink
Extend precompile to support a DAG
Browse files Browse the repository at this point in the history
Signed-off-by: Brian H <[email protected]>
  • Loading branch information
fibonacci1729 committed Jan 15, 2025
1 parent bc781fc commit 76b018f
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 97 deletions.
21 changes: 17 additions & 4 deletions crates/containerd-shim-wasm/src/container/engine.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::collections::BTreeSet;
use std::fs::File;
use std::future::Future;
use std::io::Read;

use anyhow::{bail, Context, Result};
Expand Down Expand Up @@ -60,10 +62,11 @@ pub trait Engine: Clone + Send + Sync + 'static {
/// This is used to precompile the layers before they are run and will be called if `can_precompile` returns `true`.
/// It is called only the first time a module is run and the resulting bytes will be cached in the containerd content store.
/// The cached, precompiled layers will be reloaded on subsequent runs.
/// The runtime is expected to return the same number of layers passed in, if the layer cannot be precompiled it should return `None` for that layer.
/// In some edge cases it is possible that the layers may already be precompiled and None should be returned in this case.
fn precompile(&self, _layers: &[WasmLayer]) -> Result<Vec<Option<Vec<u8>>>> {
bail!("precompile not supported");
fn precompile(
&self,
_layers: &[WasmLayer],
) -> impl Future<Output = Result<Vec<PrecompiledLayer>>> + Send {
async move { bail!("precompile not supported") }
}

/// Can_precompile lets the shim know if the runtime supports precompilation.
Expand All @@ -81,3 +84,13 @@ pub trait Engine: Clone + Send + Sync + 'static {
None
}
}

/// A `PrecompiledLayer` represents the precompiled bytes of a layer and the digests of parent layers (if any) used to process it.
pub struct PrecompiledLayer {
/// The media type this layer represents.
pub media_type: String,
/// The bytes of the precompiled layer.
pub bytes: Vec<u8>,
/// Digests of this layers' parents.
pub parents: BTreeSet<String>,
}
2 changes: 1 addition & 1 deletion crates/containerd-shim-wasm/src/container/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod wasm;

pub(crate) use context::WasiContext;
pub use context::{Entrypoint, RuntimeContext, Source};
pub use engine::Engine;
pub use engine::{Engine, PrecompiledLayer};
pub use instance::Instance;
pub use path::PathResolve;
pub use wasm::WasmBinaryType;
Expand Down
234 changes: 148 additions & 86 deletions crates/containerd-shim-wasm/src/sandbox/containerd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::collections::HashMap;
use std::path::Path;
use std::str::FromStr;

use containerd_client;
use containerd_client::services::v1::containers_client::ContainersClient;
Expand All @@ -17,14 +18,14 @@ use containerd_client::tonic::transport::Channel;
use containerd_client::tonic::Streaming;
use containerd_client::{tonic, with_namespace};
use futures::TryStreamExt;
use oci_spec::image::{Arch, Digest, ImageManifest, MediaType, Platform};
use oci_spec::image::{Arch, DescriptorBuilder, Digest, ImageManifest, MediaType, Platform};
use sha256::digest;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Code, Request};

use super::lease::LeaseGuard;
use crate::container::Engine;
use crate::container::{Engine, PrecompiledLayer};
use crate::sandbox::error::{Error as ShimError, Result};
use crate::sandbox::oci::{self, WasmLayer};
use crate::with_lease;
Expand Down Expand Up @@ -406,32 +407,34 @@ impl Client {
.iter()
.filter(|x| is_wasm_layer(x.media_type(), T::supported_layers_types()));

let mut layers = vec![];
let mut all_layers = HashMap::new();
let media_type_label = precompile_label(T::name(), "media-type");
for original_config in configs {
let layer = self
.read_wasm_layer(
original_config,
can_precompile,
&precompile_id,
&mut needs_precompile,
)
.await?;
layers.push(layer);
self.read_wasm_layer(
original_config,
can_precompile,
&precompile_id,
&mut needs_precompile,
&media_type_label,
&mut all_layers,
)
.await?;
}

let layers = all_layers.into_values().collect::<Vec<_>>();

if layers.is_empty() {
log::info!("no WASM layers found in OCI image");
return Ok((vec![], platform));
}

if needs_precompile {
log::info!("precompiling layers for image: {}", container.image);
let compiled_layers = match engine.precompile(&layers) {
let compiled_layers = match engine.precompile(&layers).await {
Ok(compiled_layers) => {
if compiled_layers.len() != layers.len() {
return Err(ShimError::FailedPrecondition(
"precompile returned wrong number of layers".to_string(),
));
if compiled_layers.is_empty() {
log::info!("no precompiled layers returned");
return Ok((layers, platform));
}
compiled_layers
}
Expand All @@ -442,40 +445,48 @@ impl Client {
};

let mut layers_for_runtime = Vec::with_capacity(compiled_layers.len());
for (i, compiled_layer) in compiled_layers.iter().enumerate() {
if compiled_layer.is_none() {
log::debug!("no compiled layer using original");
layers_for_runtime.push(layers[i].clone());
continue;
}
for compiled_layer in compiled_layers.iter() {
let PrecompiledLayer {
media_type,
bytes,
parents,
} = compiled_layer;

let mut labels = HashMap::new();
let media_type_label = precompile_label(T::name(), "media-type");
labels.insert(media_type_label, media_type.clone());

let compiled_layer = compiled_layer.as_ref().unwrap();
let original_config = &layers[i].config;
let labels = HashMap::from([(
format!("{precompile_id}/original"),
original_config.digest().to_string(),
)]);
let precompiled_content = self
.save_content(compiled_layer.clone(), &precompile_id, labels)
.save_content(bytes.clone(), &precompile_id, labels)
.await?;

log::debug!(
"updating original layer {} with compiled layer {}",
original_config.digest(),
precompiled_content.digest
);
// We add two labels here:
// - one with cache key per engine instance
// - one with a gc ref flag so it doesn't get cleaned up as long as the original layer exists
let mut original_layer = self.get_info(original_config.digest()).await?;
original_layer
.labels
.insert(precompile_id.clone(), precompiled_content.digest.clone());
original_layer.labels.insert(
format!("containerd.io/gc.ref.content.precompile.{}", i),
precompiled_content.digest.clone(),
);
self.update_info(original_layer).await?;
// Update the original layers with a gc label which associates the original digests that
// were used to process and produce the new layer with the digest of the precompiled content.
// TODO: parallelize this
for parent_digest_str in parents {
let parent_digest = Digest::from_str(parent_digest_str)?;

let mut parent_layer = self.get_info(&parent_digest).await?;

let child_digest = precompiled_content.digest.clone();

log::debug!(
"updating original layer {} with compiled layer {}",
parent_digest,
child_digest,
);

let parent_label = format!("{precompile_id}/child.{child_digest}");
parent_layer
.labels
.insert(parent_label, child_digest.clone());

let gc_label =
format!("containerd.io/gc.ref.content.precompile.{child_digest}");
parent_layer.labels.insert(gc_label, child_digest.clone());

self.update_info(parent_layer).await?;
}

// The original image is considered a root object, by adding a ref to the new compiled content
// We tell containerd to not garbage collect the new content until this image is removed from the system
Expand All @@ -485,22 +496,35 @@ impl Client {
"updating image content with precompile digest to avoid garbage collection"
);
let mut image_content = self.get_info(&image_digest).await?;

image_content.labels.insert(
format!("containerd.io/gc.ref.content.precompile.{}", i),
precompiled_content.digest,
format!(
"containerd.io/gc.ref.content.precompile.{}",
precompiled_content.digest
),
precompiled_content.digest.clone(),
);
image_content
.labels
.insert(precompile_id.clone(), "true".to_string());
self.update_info(image_content).await?;

let precompiled_image_digest = Digest::from_str(&precompiled_content.digest)?;

let wasm_layer_descriptor = DescriptorBuilder::default()
.media_type(&**media_type)
.size(bytes.len() as u64)
.digest(precompiled_image_digest)
.build()?;

layers_for_runtime.push(WasmLayer {
config: original_config.clone(),
layer: compiled_layer.clone(),
config: wasm_layer_descriptor,
layer: bytes.clone(),
});

let _ = precompiled_content.lease.release().await;
}

return Ok((layers_for_runtime, platform));
};

Expand All @@ -515,44 +539,82 @@ impl Client {
can_precompile: bool,
precompile_id: &String,
needs_precompile: &mut bool,
) -> std::prelude::v1::Result<WasmLayer, ShimError> {
let mut digest_to_load = original_config.digest().clone();
if can_precompile {
let info = self.get_info(&digest_to_load).await?;
if let Some(label) = info.labels.get(precompile_id) {
// Safe to unwrap here since we already checked for the label's existence
digest_to_load = label.parse()?;
log::info!(
"layer {} has pre-compiled content: {} ",
info.digest,
&digest_to_load
);
media_type_label: &String,
all_layers: &mut HashMap<Digest, WasmLayer>,
) -> std::prelude::v1::Result<(), ShimError> {
let parent_digest = original_config.digest().clone();
let digests_to_load = if can_precompile {
let info = self.get_info(&parent_digest).await?;
let child_digests = info
.labels
.into_iter()
.filter_map(|(key, child_digest)| {
if key.starts_with(&format!("{precompile_id}/child")) {
log::debug!("layer {parent_digest} has child layer: {child_digest} ");
Some(child_digest)
} else {
None
}
})
.collect::<Vec<_>>();

if child_digests.is_empty() {
vec![parent_digest.clone()]
} else {
child_digests
.into_iter()
.map(|d| d.parse().map_err(ShimError::Oci))
.collect::<Result<Vec<Digest>>>()?
}
}
log::debug!("loading digest: {} ", &digest_to_load);
let res = self
.read_content(&digest_to_load)
.await
.map(|module| WasmLayer {
config: original_config.clone(),
layer: module,
});

match res {
Ok(res) => Ok(res),
Err(err) if digest_to_load == *original_config.digest() => Err(err),
Err(err) => {
log::error!("failed to load precompiled layer: {err}");
log::error!("falling back to original layer and marking for recompile");
*needs_precompile = can_precompile; // only mark for recompile if engine is capable
self.read_content(original_config.digest())
.await
.map(|module| WasmLayer {
config: original_config.clone(),
layer: module,
})
} else {
vec![parent_digest.clone()]
};

for digest_to_load in digests_to_load {
if all_layers.contains_key(&digest_to_load) {
log::debug!("layer {digest_to_load} already loaded");
continue;
}
log::debug!("loading digest: {digest_to_load}");

let info = self.get_info(&digest_to_load).await?;
let config_descriptor = match info.labels.get(media_type_label) {
Some(media_type) => DescriptorBuilder::default()
.media_type(&**media_type)
.size(info.size as u64)
.digest(digest_to_load.clone())
.build()?,
None => original_config.clone(),
};

let res = self
.read_content(&digest_to_load)
.await
.map(|module| WasmLayer {
config: config_descriptor,
layer: module,
});

let wasm_layer = match res {
Ok(res) => res,
Err(err) if digest_to_load == *original_config.digest() => return Err(err),
Err(err) => {
log::error!("failed to load precompiled layer: {err}");
log::error!("falling back to original layer and marking for recompile");
*needs_precompile = can_precompile; // only mark for recompile if engine is capable
self.read_content(original_config.digest())
.await
.map(|module| WasmLayer {
config: original_config.clone(),
layer: module,
})?
}
};

all_layers.insert(digest_to_load, wasm_layer);
}

Ok(())
}
}

Expand Down
Loading

0 comments on commit 76b018f

Please sign in to comment.