diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2020-11-06 13:25:27 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2020-11-06 13:28:35 +0100 |
commit | 16fc9a21748af695e2ea44e823bef1dc34996d3f (patch) | |
tree | 5e09dd7da73522ab40f8c2d387946f21d0a81301 | |
parent | 6605f88bd7b87dfb91dedf6d4f12eadd9a19e6c9 (diff) |
Add logging output and error context messages
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r-- | src/db/models/submit.rs | 6 | ||||
-rw-r--r-- | src/endpoint/configured.rs | 18 | ||||
-rw-r--r-- | src/endpoint/scheduler.rs | 10 | ||||
-rw-r--r-- | src/main.rs | 22 | ||||
-rw-r--r-- | src/orchestrator/orchestrator.rs | 9 | ||||
-rw-r--r-- | src/package/tree.rs | 6 |
6 files changed, 55 insertions, 16 deletions
diff --git a/src/db/models/submit.rs b/src/db/models/submit.rs index b8aed64..37f5189 100644 --- a/src/db/models/submit.rs +++ b/src/db/models/submit.rs @@ -1,6 +1,8 @@ use std::ops::Deref; +use anyhow::anyhow; use anyhow::Error; +use anyhow::Context; use anyhow::Result; use diesel::PgConnection; use diesel::prelude::*; @@ -63,11 +65,13 @@ impl Submit { diesel::insert_into(submits::table) .values(&new_submit) .on_conflict_do_nothing() - .execute(database_connection)?; + .execute(database_connection) + .context("Inserting new submit into submits table")?; dsl::submits .filter(uuid.eq(uuid)) .first::<Submit>(database_connection) + .context("Loading submit") .map_err(Error::from) } } diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 296d369..6b0659a 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -96,7 +96,10 @@ impl Endpoint { match req { None => Ok(()), Some(v) => { - let avail = ep.docker().version().await?; + let avail = ep.docker() + .version() + .await + .with_context(|| anyhow!("Getting version of endpoint: {}", ep.name))?; if !v.contains(&avail.version) { Err(anyhow!("Incompatible docker version on endpoint {}: {}", @@ -112,7 +115,10 @@ impl Endpoint { match req { None => Ok(()), Some(v) => { - let avail = ep.docker().version().await?; + let avail = ep.docker() + .version() + .await + .with_context(|| anyhow!("Getting API version of endpoint: {}", ep.name))?; if !v.contains(&avail.api_version) { Err(anyhow!("Incompatible docker API version on endpoint {}: {}", @@ -177,7 +183,8 @@ impl Endpoint { let create_info = self.docker .containers() .create(&builder_opts) - .await?; + .await + .with_context(|| anyhow!("Creating container on '{}'", self.name))?; if let Some(warnings) = create_info.warnings.as_ref() { for warning in warnings { @@ -219,7 +226,8 @@ impl Endpoint { .map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}", container_id, self.name))) .await?; - let tar_stream = container.copy_from(&PathBuf::from("/outputs/")) + let tar_stream = container + .copy_from(&PathBuf::from("/outputs/")) .map(|item| item.map_err(Error::from)); staging @@ -227,6 +235,8 @@ impl Endpoint { .map_err(|_| anyhow!("Lock poisoned"))? .write_files_from_tar_stream(tar_stream) .await + .with_context(|| anyhow!("Copying the TAR stream to the staging store")) + .map_err(Error::from) } pub async fn number_of_running_containers(&self) -> Result<usize> { diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 43092eb..01033e2 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -4,6 +4,7 @@ use std::sync::RwLock; use std::path::PathBuf; use anyhow::anyhow; +use anyhow::Context; use anyhow::Result; use tokio::stream::StreamExt; use tokio::sync::mpsc::UnboundedSender; @@ -102,11 +103,14 @@ pub struct JobHandle { impl JobHandle { pub async fn get_result(self) -> Result<Vec<PathBuf>> { - let res = self.endpoint + let ep = self.endpoint .read() - .map_err(|_| anyhow!("Lock poisoned"))? + .map_err(|_| anyhow!("Lock poisoned"))?; + + let res = ep .run_job(self.job, self.sender, self.staging_store) - .await?; + .await + .with_context(|| anyhow!("Running job on '{}'", ep.name()))?; Ok(res) } diff --git a/src/main.rs b/src/main.rs index 91b58e3..3966601 100644 --- a/src/main.rs +++ b/src/main.rs @@ -139,7 +139,9 @@ async fn build<'a>(matches: &ArgMatches, info!("Submit {}, started {}", submit_id, now); let image_name = matches.value_of("image").map(String::from).map(ImageName::from).unwrap(); // safe by clap + debug!("Getting repository HEAD"); let hash_str = crate::util::git::get_repo_head_commit_hash(repo_path)?; + trace!("Repository HEAD = {}", hash_str); let phases = config.available_phases(); let endpoint_configurations = config.docker().endpoints() @@ -154,6 +156,7 @@ async fn build<'a>(matches: &ArgMatches, .build() }) .collect(); + info!("Endpoint config build"); let pname = matches.value_of("package_name") .map(String::from) @@ -163,6 +166,7 @@ async fn build<'a>(matches: &ArgMatches, let pvers = matches.value_of("package_version") .map(String::from) .map(PackageVersion::from); + info!("We want {} ({:?})", pname, pvers); let packages = if let Some(pvers) = pvers { repo.find(&pname, &pvers) @@ -211,11 +215,12 @@ async fn build<'a>(matches: &ArgMatches, Ok(tree) as Result<Tree> }; + trace!("Setting up database jobs for Package, GitHash, Image"); let db_package = async { Package::create_or_fetch(&database_connection, &package) }; let db_githash = async { GitHash::create_or_fetch(&database_connection, &hash_str) }; let db_image = async { Image::create_or_fetch(&database_connection, &image_name) }; - + trace!("Running database jobs for Package, GitHash, Image"); let (tree, db_package, db_githash, db_image) = tokio::join!( tree, db_package, @@ -226,6 +231,8 @@ async fn build<'a>(matches: &ArgMatches, let (tree, db_package, db_githash, db_image) = (tree?, db_package?, db_githash?, db_image?); + trace!("Database jobs for Package, GitHash, Image finished successfully"); + trace!("Creating Submit in database"); let submit = Submit::create(&database_connection, &tree, &now, @@ -233,10 +240,14 @@ async fn build<'a>(matches: &ArgMatches, &db_image, &db_package, &db_githash)?; + trace!("Creating Submit in database finished successfully"); + trace!("Setting up job sets"); let jobsets = JobSet::sets_from_tree(tree, image_name, phases.clone())?; + trace!("Setting up job sets finished successfully"); - OrchestratorSetup::builder() + trace!("Setting up Orchestrator"); + let orch = OrchestratorSetup::builder() .endpoint_config(endpoint_configurations) .staging_store(staging_dir.await?) .release_store(release_dir.await?) @@ -246,9 +257,10 @@ async fn build<'a>(matches: &ArgMatches, .jobsets(jobsets) .build() .setup() - .await? - .run() - .await + .await?; + + info!("Running orchestrator..."); + orch.run().await } async fn what_depends(matches: &ArgMatches, repo: Repository, progress: ProgressBar) -> Result<()> { diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index c53a7db..a2eb7e5 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -74,9 +74,12 @@ impl Orchestrator { let results = { // run the jobs in the set let unordered = futures::stream::FuturesUnordered::new(); for runnable in jobset.into_runables(&merged_store) { + let runnable = runnable?; + trace!("Runnable {} for package {}", runnable.uuid(), runnable.package().name()); let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>(); - let jobhandle = self.scheduler.schedule_job(runnable?, sender).await?; + let jobhandle = self.scheduler.schedule_job(runnable, sender).await?; + trace!("Jobhandle -> {:?}", jobhandle); unordered.push(async move { jobhandle.get_result().await }); @@ -93,7 +96,9 @@ impl Orchestrator { .read() .map_err(|_| anyhow!("Lock Poisoned"))?; + trace!("Checking results..."); for path in results.iter() { + trace!("Checking path: {}", path.display()); if !staging_store_lock.path_exists_in_store_root(&path) { return Err(anyhow!("Result path {} is missing from staging store", path.display())) .with_context(|| anyhow!("Should be: {}/{}", staging_store_lock.root_path().display(), path.display())) @@ -107,7 +112,9 @@ impl Orchestrator { .write() .map_err(|_| anyhow!("Lock Poisoned"))?; + trace!("Loading results into staging store"); for path in results.iter() { + trace!("Loading path: {}", path.display()); staging_store_lock.load_from_path(&path) .context("Loading artifacts into staging store")?; } diff --git a/src/package/tree.rs b/src/package/tree.rs index 94d726b..1831a65 100644 --- a/src/package/tree.rs +++ b/src/package/tree.rs @@ -49,7 +49,7 @@ impl Tree { }) .collect::<Result<Vec<()>>>()?; - trace!("Inserting subtree: {:?}", subtree); + trace!("Inserting subtree: {:?} -> {:?}", ($pack), subtree); ($this).root.insert(($pack), subtree); Ok(()) }} @@ -60,7 +60,9 @@ impl Tree { } trace!("Making package Tree for {:?}", p); - mk_add_package_tree!(self, p, repo, self, progress) + let r = mk_add_package_tree!(self, p, repo, self, progress); + trace!("Finished makeing package Tree"); + r } /// Get packages of the tree |