summaryrefslogtreecommitdiffstats
path: root/src/endpoint/configured.rs
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-11-15 10:46:35 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-11-15 10:46:35 +0100
commit65a736f0676772d0c6f13c52de8e83332b4d4afd (patch)
tree71c296ee4add9b107195581318da56c54d27057f /src/endpoint/configured.rs
parent55fe4d7ebb9183b452693965ee3f1162681a589c (diff)
Add error context messages for better error reporting
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/endpoint/configured.rs')
-rw-r--r--src/endpoint/configured.rs61
1 files changed, 41 insertions, 20 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 11db626..7949437 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -47,7 +47,8 @@ impl Debug for Endpoint {
impl Endpoint {
pub(in super) async fn setup(epc: EndpointConfiguration) -> Result<Self> {
- let ep = Endpoint::setup_endpoint(epc.endpoint())?;
+ let ep = Endpoint::setup_endpoint(epc.endpoint())
+ .with_context(|| anyhow!("Setting up endpoint: {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?;
let versions_compat = Endpoint::check_version_compat(epc.required_docker_versions().as_ref(), &ep);
let api_versions_compat = Endpoint::check_api_version_compat(epc.required_docker_api_versions().as_ref(), &ep);
@@ -56,9 +57,12 @@ impl Endpoint {
let (versions_compat, api_versions_compat, imgs_avail) =
tokio::join!(versions_compat, api_versions_compat, imgs_avail);
- let _ = versions_compat?;
- let _ = api_versions_compat?;
- let _ = imgs_avail?;
+ let _ = versions_compat
+ .with_context(|| anyhow!("Checking version compatibility for {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?;
+ let _ = api_versions_compat
+ .with_context(|| anyhow!("Checking API version compatibility for {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?;
+ let _ = imgs_avail
+ .with_context(|| anyhow!("Checking for available images on {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?;
Ok(ep)
}
@@ -68,6 +72,7 @@ impl Endpoint {
crate::config::EndpointType::Http => {
shiplift::Uri::from_str(ep.uri())
.map(|uri| shiplift::Docker::host(uri))
+ .with_context(|| anyhow!("Connecting to {}", ep.uri()))
.map_err(Error::from)
.map(|docker| {
Endpoint::builder()
@@ -102,8 +107,8 @@ impl Endpoint {
.with_context(|| anyhow!("Getting version of endpoint: {}", ep.name))?;
if !v.contains(&avail.version) {
- Err(anyhow!("Incompatible docker version on endpoint {}: {}",
- ep.name(), avail.version))
+ Err(anyhow!("Incompatible docker version on endpoint {}: Expected: {}, Available: [{}]",
+ ep.name(), avail.version, v.join(", ")))
} else {
Ok(())
}
@@ -121,8 +126,8 @@ impl Endpoint {
.with_context(|| anyhow!("Getting API version of endpoint: {}", ep.name))?;
if !v.contains(&avail.api_version) {
- Err(anyhow!("Incompatible docker API version on endpoint {}: {}",
- ep.name(), avail.api_version))
+ Err(anyhow!("Incompatible docker API version on endpoint {}: Exepected: {}, Available: [{}]",
+ ep.name(), avail.api_version, v.join(", ")))
} else {
Ok(())
}
@@ -216,7 +221,9 @@ impl Endpoint {
let pkgsource = job.package_source();
let source_path = pkgsource.path();
let destination = PathBuf::from("/inputs").join({
- source_path.file_name().ok_or_else(|| anyhow!("Not a file: {}", source_path.display()))?
+ source_path.file_name()
+ .ok_or_else(|| anyhow!("Not a file: {}", source_path.display()))
+ .with_context(|| anyhow!("Copying package source from {} to container {}", source_path.display(), self.name))?
});
trace!("Package source = {:?}", pkgsource);
trace!("Source path = {:?}", source_path);
@@ -232,10 +239,12 @@ impl Endpoint {
.await
.with_context(|| anyhow!("Getting source file: {}", source_path.display()))?
.read_to_end(&mut buf)
- .await?;
-
+ .await
+ .with_context(|| anyhow!("Reading file {}", source_path.display()))?;
- let _ = container.copy_file_into(destination, &buf).await?;
+ let _ = container.copy_file_into(destination, &buf)
+ .await
+ .with_context(|| anyhow!("Copying {} to container {}", source_path.display(), container_id))?;
}
{ // Copy all Path artifacts to the container
job.resources()
@@ -244,22 +253,27 @@ impl Endpoint {
.cloned()
.map(|art| async {
let artifact_file_name = art.path().file_name()
- .ok_or_else(|| anyhow!("BUG: artifact {} is not a file", art.path().display()))?;
+ .ok_or_else(|| anyhow!("BUG: artifact {} is not a file", art.path().display()))
+ .with_context(|| anyhow!("Collecting artifacts for copying to container {}", container_id))?;
let destination = PathBuf::from("/inputs/").join(artifact_file_name);
trace!("Copying {} to container: {}:{}", art.path().display(), container_id, destination.display());
let buf = tokio::fs::read(art.path())
.await
.map(Vec::from)
+ .with_context(|| anyhow!("Reading artifact {}, so it can be copied to container", art.path().display()))
.map_err(Error::from)?;
- drop(art); // ensure `art` is moved into closure
- container.copy_file_into(destination, &buf)
+ let r = container.copy_file_into(&destination, &buf)
.await
- .map_err(Error::from)
+ .with_context(|| anyhow!("Copying artifact {} to container {} at {}", art.path().display(), container_id, destination.display()))
+ .map_err(Error::from);
+ drop(art); // ensure `art` is moved into closure
+ r
})
.collect::<futures::stream::FuturesUnordered<_>>()
.collect::<Result<Vec<_>>>()
- .await?;
+ .await
+ .with_context(|| anyhow!("Copying artifacts to container {}", container_id))?;
}
container
@@ -293,12 +307,16 @@ impl Endpoint {
})
.inspect(|r| { trace!("Fetching log from container {} -> {:?}", container_id, r); })
.map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}", container_id, self.name)))
- .await?;
+ .await
+ .with_context(|| anyhow!("Copying script to container, running container and getting logs: {}", container_id))?;
trace!("Fetching /outputs from container {}", container_id);
let tar_stream = container
.copy_from(&PathBuf::from("/outputs/"))
- .map(|item| item.map_err(Error::from));
+ .map(|item| {
+ item.with_context(|| anyhow!("Copying item from container {} to host", container_id))
+ .map_err(Error::from)
+ });
let r = {
let mut writelock = staging.write().await;
@@ -309,7 +327,9 @@ impl Endpoint {
.with_context(|| anyhow!("Copying the TAR stream to the staging store"))?
};
- container.stop(Some(std::time::Duration::new(1, 0))).await?;
+ container.stop(Some(std::time::Duration::new(1, 0)))
+ .await
+ .with_context(|| anyhow!("Stopping container {}", container_id))?;
trace!("Returning job {} result = {:?}, container hash = {}", job.uuid(), r, container_id);
let script: Script = job.script().clone();
@@ -321,6 +341,7 @@ impl Endpoint {
.containers()
.list(&Default::default())
.await
+ .with_context(|| anyhow!("Getting number of running containers on {}", self.name))
.map_err(Error::from)
.map(|list| list.len())
}