Skip to content

Commit

Permalink
Merge pull request #827 from Mossaka/ctx-propogation-from-containerd
Browse files Browse the repository at this point in the history
feat(otel): add context propogation from containerd to the shim
  • Loading branch information
jprendes authored Jan 30, 2025
2 parents 5c7825d + 9ccf673 commit 56ea23f
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 60 deletions.
26 changes: 13 additions & 13 deletions crates/containerd-shim-wasm/src/sandbox/containerd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl WriteContent {
// sync wrapper implementation from https://tokio.rs/tokio/topics/bridging
impl Client {
// wrapper around connection that will establish a connection and create a client
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
pub async fn connect(
address: impl AsRef<Path>,
namespace: impl Into<String>,
Expand All @@ -73,7 +73,7 @@ impl Client {
}

// wrapper around read that will read the entire content file
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
async fn read_content(&self, digest: impl ToString) -> Result<Vec<u8>> {
let req = ReadContentRequest {
digest: digest.to_string(),
Expand All @@ -93,7 +93,7 @@ impl Client {

// used in tests to clean up content
#[allow(dead_code)]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
async fn delete_content(&self, digest: impl ToString) -> Result<()> {
let req = DeleteContentRequest {
digest: digest.to_string(),
Expand All @@ -107,7 +107,7 @@ impl Client {
}

// wrapper around lease that will create a lease and return a guard that will delete the lease when dropped
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
async fn lease(&self, reference: String) -> Result<LeaseGuard> {
let mut lease_labels = HashMap::new();
// Unwrap is safe here since 24 hours is a valid time
Expand Down Expand Up @@ -136,7 +136,7 @@ impl Client {
))
}

#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
async fn save_content(
&self,
data: Vec<u8>,
Expand Down Expand Up @@ -258,7 +258,7 @@ impl Client {
Ok(WriteContent { lease, digest })
}

#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
async fn get_info(&self, content_digest: &Digest) -> Result<Info> {
let req = InfoRequest {
digest: content_digest.to_string(),
Expand All @@ -276,7 +276,7 @@ impl Client {
Ok(info)
}

#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
async fn update_info(&self, info: Info) -> Result<Info> {
let mut req = UpdateRequest {
info: Some(info.clone()),
Expand All @@ -299,7 +299,7 @@ impl Client {
Ok(info)
}

#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
async fn get_image(&self, image_name: impl ToString) -> Result<Image> {
let name = image_name.to_string();
let req = GetImageRequest { name };
Expand All @@ -319,7 +319,7 @@ impl Client {
Ok(image)
}

#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
fn extract_image_content_sha(&self, image: &Image) -> Result<String> {
let digest = image
.target
Expand All @@ -335,7 +335,7 @@ impl Client {
Ok(digest)
}

#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
async fn get_container(&self, container_name: impl ToString) -> Result<Container> {
let id = container_name.to_string();
let req = GetContainerRequest { id };
Expand All @@ -355,7 +355,7 @@ impl Client {
Ok(container)
}

#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
async fn get_image_manifest_and_digest(
&self,
image_name: &str,
Expand All @@ -370,7 +370,7 @@ impl Client {
// load module will query the containerd store to find an image that has an OS of type 'wasm'
// If found it continues to parse the manifest and return the layers that contains the WASM modules
// and possibly other configuration layers.
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
pub async fn load_modules<T: Engine>(
&self,
containerd_id: impl ToString,
Expand Down Expand Up @@ -508,7 +508,7 @@ impl Client {
Ok((layers, platform))
}

#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
async fn read_wasm_layer(
&self,
original_config: &oci_spec::image::Descriptor,
Expand Down
2 changes: 1 addition & 1 deletion crates/containerd-shim-wasm/src/sandbox/instance_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ struct Options {
root: Option<PathBuf>,
}

#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
pub fn determine_rootdir(
bundle: impl AsRef<Path>,
namespace: &str,
Expand Down
16 changes: 8 additions & 8 deletions crates/containerd-shim-wasm/src/sandbox/shim/instance_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub(super) struct InstanceData<T: Instance> {
}

impl<T: Instance> InstanceData<T> {
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
pub fn new(id: impl AsRef<str>, cfg: InstanceConfig) -> Result<Self> {
let id = id.as_ref().to_string();
let instance = T::new(id, &cfg)?;
Expand All @@ -26,17 +26,17 @@ impl<T: Instance> InstanceData<T> {
})
}

#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
pub fn pid(&self) -> Option<u32> {
self.pid.get().copied()
}

#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
pub fn config(&self) -> &InstanceConfig {
&self.cfg
}

#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
pub fn start(&self) -> Result<u32> {
let mut s = self.state.write().unwrap();
s.start()?;
Expand All @@ -56,15 +56,15 @@ impl<T: Instance> InstanceData<T> {
res
}

#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
pub fn kill(&self, signal: u32) -> Result<()> {
let mut s = self.state.write().unwrap();
s.kill()?;

self.instance.kill(signal)
}

#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
pub fn delete(&self) -> Result<()> {
let mut s = self.state.write().unwrap();
s.delete()?;
Expand All @@ -79,15 +79,15 @@ impl<T: Instance> InstanceData<T> {
res
}

#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
pub fn wait(&self) -> (u32, DateTime<Utc>) {
let res = self.instance.wait();
let mut s = self.state.write().unwrap();
*s = TaskState::Exited;
res
}

#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Debug"))]
pub fn wait_timeout(&self, t: impl Into<Option<Duration>>) -> Option<(u32, DateTime<Utc>)> {
let res = self.instance.wait_timeout(t);
if res.is_some() {
Expand Down
Loading

0 comments on commit 56ea23f

Please sign in to comment.