summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/db/models/submit.rs6
-rw-r--r--src/endpoint/configured.rs18
-rw-r--r--src/endpoint/scheduler.rs10
-rw-r--r--src/main.rs22
-rw-r--r--src/orchestrator/orchestrator.rs9
-rw-r--r--src/package/tree.rs6
6 files changed, 55 insertions, 16 deletions
diff --git a/src/db/models/submit.rs b/src/db/models/submit.rs
index b8aed64..37f5189 100644
--- a/src/db/models/submit.rs
+++ b/src/db/models/submit.rs
@@ -1,6 +1,8 @@
use std::ops::Deref;
+use anyhow::anyhow;
use anyhow::Error;
+use anyhow::Context;
use anyhow::Result;
use diesel::PgConnection;
use diesel::prelude::*;
@@ -63,11 +65,13 @@ impl Submit {
diesel::insert_into(submits::table)
.values(&new_submit)
.on_conflict_do_nothing()
- .execute(database_connection)?;
+ .execute(database_connection)
+ .context("Inserting new submit into submits table")?;
dsl::submits
.filter(uuid.eq(uuid))
.first::<Submit>(database_connection)
+ .context("Loading submit")
.map_err(Error::from)
}
}
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 296d369..6b0659a 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -96,7 +96,10 @@ impl Endpoint {
match req {
None => Ok(()),
Some(v) => {
- let avail = ep.docker().version().await?;
+ let avail = ep.docker()
+ .version()
+ .await
+ .with_context(|| anyhow!("Getting version of endpoint: {}", ep.name))?;
if !v.contains(&avail.version) {
Err(anyhow!("Incompatible docker version on endpoint {}: {}",
@@ -112,7 +115,10 @@ impl Endpoint {
match req {
None => Ok(()),
Some(v) => {
- let avail = ep.docker().version().await?;
+ let avail = ep.docker()
+ .version()
+ .await
+ .with_context(|| anyhow!("Getting API version of endpoint: {}", ep.name))?;
if !v.contains(&avail.api_version) {
Err(anyhow!("Incompatible docker API version on endpoint {}: {}",
@@ -177,7 +183,8 @@ impl Endpoint {
let create_info = self.docker
.containers()
.create(&builder_opts)
- .await?;
+ .await
+ .with_context(|| anyhow!("Creating container on '{}'", self.name))?;
if let Some(warnings) = create_info.warnings.as_ref() {
for warning in warnings {
@@ -219,7 +226,8 @@ impl Endpoint {
.map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}", container_id, self.name)))
.await?;
- let tar_stream = container.copy_from(&PathBuf::from("/outputs/"))
+ let tar_stream = container
+ .copy_from(&PathBuf::from("/outputs/"))
.map(|item| item.map_err(Error::from));
staging
@@ -227,6 +235,8 @@ impl Endpoint {
.map_err(|_| anyhow!("Lock poisoned"))?
.write_files_from_tar_stream(tar_stream)
.await
+ .with_context(|| anyhow!("Copying the TAR stream to the staging store"))
+ .map_err(Error::from)
}
pub async fn number_of_running_containers(&self) -> Result<usize> {
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 43092eb..01033e2 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -4,6 +4,7 @@ use std::sync::RwLock;
use std::path::PathBuf;
use anyhow::anyhow;
+use anyhow::Context;
use anyhow::Result;
use tokio::stream::StreamExt;
use tokio::sync::mpsc::UnboundedSender;
@@ -102,11 +103,14 @@ pub struct JobHandle {
impl JobHandle {
pub async fn get_result(self) -> Result<Vec<PathBuf>> {
- let res = self.endpoint
+ let ep = self.endpoint
.read()
- .map_err(|_| anyhow!("Lock poisoned"))?
+ .map_err(|_| anyhow!("Lock poisoned"))?;
+
+ let res = ep
.run_job(self.job, self.sender, self.staging_store)
- .await?;
+ .await
+ .with_context(|| anyhow!("Running job on '{}'", ep.name()))?;
Ok(res)
}
diff --git a/src/main.rs b/src/main.rs
index 91b58e3..3966601 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -139,7 +139,9 @@ async fn build<'a>(matches: &ArgMatches,
info!("Submit {}, started {}", submit_id, now);
let image_name = matches.value_of("image").map(String::from).map(ImageName::from).unwrap(); // safe by clap
+ debug!("Getting repository HEAD");
let hash_str = crate::util::git::get_repo_head_commit_hash(repo_path)?;
+ trace!("Repository HEAD = {}", hash_str);
let phases = config.available_phases();
let endpoint_configurations = config.docker().endpoints()
@@ -154,6 +156,7 @@ async fn build<'a>(matches: &ArgMatches,
.build()
})
.collect();
+ info!("Endpoint config build");
let pname = matches.value_of("package_name")
.map(String::from)
@@ -163,6 +166,7 @@ async fn build<'a>(matches: &ArgMatches,
let pvers = matches.value_of("package_version")
.map(String::from)
.map(PackageVersion::from);
+ info!("We want {} ({:?})", pname, pvers);
let packages = if let Some(pvers) = pvers {
repo.find(&pname, &pvers)
@@ -211,11 +215,12 @@ async fn build<'a>(matches: &ArgMatches,
Ok(tree) as Result<Tree>
};
+ trace!("Setting up database jobs for Package, GitHash, Image");
let db_package = async { Package::create_or_fetch(&database_connection, &package) };
let db_githash = async { GitHash::create_or_fetch(&database_connection, &hash_str) };
let db_image = async { Image::create_or_fetch(&database_connection, &image_name) };
-
+ trace!("Running database jobs for Package, GitHash, Image");
let (tree, db_package, db_githash, db_image) = tokio::join!(
tree,
db_package,
@@ -226,6 +231,8 @@ async fn build<'a>(matches: &ArgMatches,
let (tree, db_package, db_githash, db_image) =
(tree?, db_package?, db_githash?, db_image?);
+ trace!("Database jobs for Package, GitHash, Image finished successfully");
+ trace!("Creating Submit in database");
let submit = Submit::create(&database_connection,
&tree,
&now,
@@ -233,10 +240,14 @@ async fn build<'a>(matches: &ArgMatches,
&db_image,
&db_package,
&db_githash)?;
+ trace!("Creating Submit in database finished successfully");
+ trace!("Setting up job sets");
let jobsets = JobSet::sets_from_tree(tree, image_name, phases.clone())?;
+ trace!("Setting up job sets finished successfully");
- OrchestratorSetup::builder()
+ trace!("Setting up Orchestrator");
+ let orch = OrchestratorSetup::builder()
.endpoint_config(endpoint_configurations)
.staging_store(staging_dir.await?)
.release_store(release_dir.await?)
@@ -246,9 +257,10 @@ async fn build<'a>(matches: &ArgMatches,
.jobsets(jobsets)
.build()
.setup()
- .await?
- .run()
- .await
+ .await?;
+
+ info!("Running orchestrator...");
+ orch.run().await
}
async fn what_depends(matches: &ArgMatches, repo: Repository, progress: ProgressBar) -> Result<()> {
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index c53a7db..a2eb7e5 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -74,9 +74,12 @@ impl Orchestrator {
let results = { // run the jobs in the set
let unordered = futures::stream::FuturesUnordered::new();
for runnable in jobset.into_runables(&merged_store) {
+ let runnable = runnable?;
+ trace!("Runnable {} for package {}", runnable.uuid(), runnable.package().name());
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>();
- let jobhandle = self.scheduler.schedule_job(runnable?, sender).await?;
+ let jobhandle = self.scheduler.schedule_job(runnable, sender).await?;
+ trace!("Jobhandle -> {:?}", jobhandle);
unordered.push(async move {
jobhandle.get_result().await
});
@@ -93,7 +96,9 @@ impl Orchestrator {
.read()
.map_err(|_| anyhow!("Lock Poisoned"))?;
+ trace!("Checking results...");
for path in results.iter() {
+ trace!("Checking path: {}", path.display());
if !staging_store_lock.path_exists_in_store_root(&path) {
return Err(anyhow!("Result path {} is missing from staging store", path.display()))
.with_context(|| anyhow!("Should be: {}/{}", staging_store_lock.root_path().display(), path.display()))
@@ -107,7 +112,9 @@ impl Orchestrator {
.write()
.map_err(|_| anyhow!("Lock Poisoned"))?;
+ trace!("Loading results into staging store");
for path in results.iter() {
+ trace!("Loading path: {}", path.display());
staging_store_lock.load_from_path(&path)
.context("Loading artifacts into staging store")?;
}
diff --git a/src/package/tree.rs b/src/package/tree.rs
index 94d726b..1831a65 100644
--- a/src/package/tree.rs
+++ b/src/package/tree.rs
@@ -49,7 +49,7 @@ impl Tree {
})
.collect::<Result<Vec<()>>>()?;
- trace!("Inserting subtree: {:?}", subtree);
+ trace!("Inserting subtree: {:?} -> {:?}", ($pack), subtree);
($this).root.insert(($pack), subtree);
Ok(())
}}
@@ -60,7 +60,9 @@ impl Tree {
}
trace!("Making package Tree for {:?}", p);
- mk_add_package_tree!(self, p, repo, self, progress)
+ let r = mk_add_package_tree!(self, p, repo, self, progress);
+ trace!("Finished makeing package Tree");
+ r
}
/// Get packages of the tree