From 65a736f0676772d0c6f13c52de8e83332b4d4afd Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sun, 15 Nov 2020 10:46:35 +0100 Subject: Add error context messages for better error reporting Signed-off-by: Matthias Beyer --- src/endpoint/configured.rs | 61 +++++++++++++++++++++++++++++++--------------- 1 file changed, 41 insertions(+), 20 deletions(-) (limited to 'src/endpoint/configured.rs') diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 11db626..7949437 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -47,7 +47,8 @@ impl Debug for Endpoint { impl Endpoint { pub(in super) async fn setup(epc: EndpointConfiguration) -> Result { - let ep = Endpoint::setup_endpoint(epc.endpoint())?; + let ep = Endpoint::setup_endpoint(epc.endpoint()) + .with_context(|| anyhow!("Setting up endpoint: {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?; let versions_compat = Endpoint::check_version_compat(epc.required_docker_versions().as_ref(), &ep); let api_versions_compat = Endpoint::check_api_version_compat(epc.required_docker_api_versions().as_ref(), &ep); @@ -56,9 +57,12 @@ impl Endpoint { let (versions_compat, api_versions_compat, imgs_avail) = tokio::join!(versions_compat, api_versions_compat, imgs_avail); - let _ = versions_compat?; - let _ = api_versions_compat?; - let _ = imgs_avail?; + let _ = versions_compat + .with_context(|| anyhow!("Checking version compatibility for {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?; + let _ = api_versions_compat + .with_context(|| anyhow!("Checking API version compatibility for {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?; + let _ = imgs_avail + .with_context(|| anyhow!("Checking for available images on {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?; Ok(ep) } @@ -68,6 +72,7 @@ impl Endpoint { crate::config::EndpointType::Http => { shiplift::Uri::from_str(ep.uri()) .map(|uri| shiplift::Docker::host(uri)) + .with_context(|| anyhow!("Connecting to {}", ep.uri())) .map_err(Error::from) .map(|docker| { Endpoint::builder() @@ -102,8 +107,8 @@ impl Endpoint { .with_context(|| anyhow!("Getting version of endpoint: {}", ep.name))?; if !v.contains(&avail.version) { - Err(anyhow!("Incompatible docker version on endpoint {}: {}", - ep.name(), avail.version)) + Err(anyhow!("Incompatible docker version on endpoint {}: Expected: {}, Available: [{}]", + ep.name(), avail.version, v.join(", "))) } else { Ok(()) } @@ -121,8 +126,8 @@ impl Endpoint { .with_context(|| anyhow!("Getting API version of endpoint: {}", ep.name))?; if !v.contains(&avail.api_version) { - Err(anyhow!("Incompatible docker API version on endpoint {}: {}", - ep.name(), avail.api_version)) + Err(anyhow!("Incompatible docker API version on endpoint {}: Exepected: {}, Available: [{}]", + ep.name(), avail.api_version, v.join(", "))) } else { Ok(()) } @@ -216,7 +221,9 @@ impl Endpoint { let pkgsource = job.package_source(); let source_path = pkgsource.path(); let destination = PathBuf::from("/inputs").join({ - source_path.file_name().ok_or_else(|| anyhow!("Not a file: {}", source_path.display()))? + source_path.file_name() + .ok_or_else(|| anyhow!("Not a file: {}", source_path.display())) + .with_context(|| anyhow!("Copying package source from {} to container {}", source_path.display(), self.name))? }); trace!("Package source = {:?}", pkgsource); trace!("Source path = {:?}", source_path); @@ -232,10 +239,12 @@ impl Endpoint { .await .with_context(|| anyhow!("Getting source file: {}", source_path.display()))? .read_to_end(&mut buf) - .await?; - + .await + .with_context(|| anyhow!("Reading file {}", source_path.display()))?; - let _ = container.copy_file_into(destination, &buf).await?; + let _ = container.copy_file_into(destination, &buf) + .await + .with_context(|| anyhow!("Copying {} to container {}", source_path.display(), container_id))?; } { // Copy all Path artifacts to the container job.resources() @@ -244,22 +253,27 @@ impl Endpoint { .cloned() .map(|art| async { let artifact_file_name = art.path().file_name() - .ok_or_else(|| anyhow!("BUG: artifact {} is not a file", art.path().display()))?; + .ok_or_else(|| anyhow!("BUG: artifact {} is not a file", art.path().display())) + .with_context(|| anyhow!("Collecting artifacts for copying to container {}", container_id))?; let destination = PathBuf::from("/inputs/").join(artifact_file_name); trace!("Copying {} to container: {}:{}", art.path().display(), container_id, destination.display()); let buf = tokio::fs::read(art.path()) .await .map(Vec::from) + .with_context(|| anyhow!("Reading artifact {}, so it can be copied to container", art.path().display())) .map_err(Error::from)?; - drop(art); // ensure `art` is moved into closure - container.copy_file_into(destination, &buf) + let r = container.copy_file_into(&destination, &buf) .await - .map_err(Error::from) + .with_context(|| anyhow!("Copying artifact {} to container {} at {}", art.path().display(), container_id, destination.display())) + .map_err(Error::from); + drop(art); // ensure `art` is moved into closure + r }) .collect::>() .collect::>>() - .await?; + .await + .with_context(|| anyhow!("Copying artifacts to container {}", container_id))?; } container @@ -293,12 +307,16 @@ impl Endpoint { }) .inspect(|r| { trace!("Fetching log from container {} -> {:?}", container_id, r); }) .map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}", container_id, self.name))) - .await?; + .await + .with_context(|| anyhow!("Copying script to container, running container and getting logs: {}", container_id))?; trace!("Fetching /outputs from container {}", container_id); let tar_stream = container .copy_from(&PathBuf::from("/outputs/")) - .map(|item| item.map_err(Error::from)); + .map(|item| { + item.with_context(|| anyhow!("Copying item from container {} to host", container_id)) + .map_err(Error::from) + }); let r = { let mut writelock = staging.write().await; @@ -309,7 +327,9 @@ impl Endpoint { .with_context(|| anyhow!("Copying the TAR stream to the staging store"))? }; - container.stop(Some(std::time::Duration::new(1, 0))).await?; + container.stop(Some(std::time::Duration::new(1, 0))) + .await + .with_context(|| anyhow!("Stopping container {}", container_id))?; trace!("Returning job {} result = {:?}, container hash = {}", job.uuid(), r, container_id); let script: Script = job.script().clone(); @@ -321,6 +341,7 @@ impl Endpoint { .containers() .list(&Default::default()) .await + .with_context(|| anyhow!("Getting number of running containers on {}", self.name)) .map_err(Error::from) .map(|list| list.len()) } -- cgit v1.2.3