summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-11-09 15:54:22 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-11-09 15:54:22 +0100
commite225befa36513c47cdb218e22f88892a83bff8e2 (patch)
tree21ba0a5a1099460454dce1a8bb4111f893dc3f00
parent7ed8527bb5ad350e03fd80ae4c7412435379456c (diff)
Add more trace output
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--Cargo.toml1
-rw-r--r--src/endpoint/configured.rs6
-rw-r--r--src/endpoint/scheduler.rs3
-rw-r--r--src/filestore/staging.rs6
-rw-r--r--src/orchestrator/orchestrator.rs3
5 files changed, 18 insertions, 1 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 2abe366..270b1de 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -39,6 +39,7 @@ atty = "0.2"
csv = "1.1"
chrono = "0.4"
git2 = "0.13"
+result-inspect = "0.1"
url = { version = "2", features = ["serde"] }
tokio = { version = "0.2", features = ["full"] }
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 08fcac1..c8108d9 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -231,10 +231,13 @@ impl Endpoint {
container
.copy_file_into(script_path, script)
+ .inspect(|r| { trace!("Copying script to container {} -> {:?}", container_id, r); })
.map(|r| r.with_context(|| anyhow!("Copying the script into the container {} on '{}'", container_id, self.name)))
.then(|_| container.start())
+ .inspect(|r| { trace!("Starting container {} -> {:?}", container_id, r); })
.map(|r| r.with_context(|| anyhow!("Starting the container {} on '{}'", container_id, self.name)))
.then(|_| {
+ trace!("Moving logs to log sink for container {}", container_id);
buffer_stream_to_line_stream(container.exec(&exec_opts))
.map(|line| {
trace!("['{}':{}] Found log line: {:?}", self.name, container_id, line);
@@ -254,9 +257,11 @@ impl Endpoint {
})
.collect::<Result<Vec<_>>>()
})
+ .inspect(|r| { trace!("Fetching log from container {} -> {:?}", container_id, r); })
.map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}", container_id, self.name)))
.await?;
+ trace!("Fetching /outputs from container {}", container_id);
let tar_stream = container
.copy_from(&PathBuf::from("/outputs/"))
.map(|item| item.map_err(Error::from));
@@ -270,6 +275,7 @@ impl Endpoint {
container.stop(Some(std::time::Duration::new(1, 0))).await?;
+ trace!("Returning job {} result = {:?}", job.uuid(), r);
Ok(r)
}
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 82fac8c..cbb53de 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -102,11 +102,14 @@ impl JobHandle {
.read()
.map_err(|_| anyhow!("Lock poisoned"))?;
+ let job_id = self.job.uuid().clone();
+ trace!("Running on Job {} on Endpoint {}", job_id, ep.name());
let res = ep
.run_job(self.job, self.sender, self.staging_store)
.await
.with_context(|| anyhow!("Running job on '{}'", ep.name()))?;
+ trace!("Found result for job {}: {:?}", job_id, res);
Ok(res)
}
diff --git a/src/filestore/staging.rs b/src/filestore/staging.rs
index d471d44..880b274 100644
--- a/src/filestore/staging.rs
+++ b/src/filestore/staging.rs
@@ -10,6 +10,7 @@ use futures::stream::Stream;
use indicatif::ProgressBar;
use resiter::Filter;
use resiter::Map;
+use result_inspect::ResultInspect;
use tar;
use crate::filestore::Artifact;
@@ -52,10 +53,13 @@ impl StagingStore {
Ok(p)
})
.map_ok(|path| dest.join(path))
+ .inspect(|p| trace!("Path in tar archive: {:?}", p))
.filter_ok(|p| p.is_file())
+ .inspect(|p| trace!("Taking from archive: {:?}", p))
.collect::<Result<Vec<_>>>()
.context("Collecting outputs of TAR archive")?;
+ trace!("Unpacking archive to {}", dest.display());
tar::Archive::new(&bytes[..])
.unpack(dest)
.context("Unpacking TAR")
@@ -64,8 +68,10 @@ impl StagingStore {
})
.context("Concatenating the output bytestream")?
.into_iter()
+ .inspect(|p| trace!("Trying to load into staging store: {}", p.display()))
.map(|path| {
self.0.load_from_path(&path)
+ .inspect(|r| trace!("Loading from path = {:?}", r))
.with_context(|| anyhow!("Loading from path: {}", path.display()))
.map_err(Error::from)
.map(|art| art.path().clone())
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index e511669..5fee202 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -96,6 +96,7 @@ impl Orchestrator {
unordered_results.push(async move {
let r = jobhandle.get_result().await;
+ trace!("Found result in job {}: {:?}", job_id, r);
bar.tick();
r
});
@@ -162,7 +163,7 @@ impl Orchestrator {
.read()
.map_err(|_| anyhow!("Lock Poisoned"))?;
- trace!("Checking results...");
+ trace!("Checking {} results...", results.len());
for path in results.iter() {
trace!("Checking path: {}", path.display());
if !staging_store_lock.path_exists_in_store_root(&path) {