summaryrefslogtreecommitdiffstats
path: root/src/endpoint/scheduler.rs
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-01-25 15:37:28 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-01-25 15:43:00 +0100
commitfffac362dd4e866a92e788ef7a2945f78adb8520 (patch)
treecd43c39168c6202b4950a23d3e356165b471b084 /src/endpoint/scheduler.rs
parentb7d8aaa8d96e7059f6cf3dd23f5b894752a30cc5 (diff)
parent53360d499e2233b30b1aaf745800cd7199009153 (diff)
Merge branch 'more-parallelism' into master
Diffstat (limited to 'src/endpoint/scheduler.rs')
-rw-r--r--src/endpoint/scheduler.rs42
1 files changed, 26 insertions, 16 deletions
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 93b7509..c2f35fc 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -29,6 +29,7 @@ use uuid::Uuid;
use crate::db::models as dbmodels;
use crate::endpoint::Endpoint;
use crate::endpoint::EndpointConfiguration;
+use crate::filestore::Artifact;
use crate::filestore::StagingStore;
use crate::job::JobResource;
use crate::job::RunnableJob;
@@ -145,7 +146,7 @@ impl std::fmt::Debug for JobHandle {
}
impl JobHandle {
- pub async fn run(self) -> Result<Vec<dbmodels::Artifact>> {
+ pub async fn run(self) -> Result<Vec<Artifact>> {
let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>();
let ep = self.endpoint.read().await;
let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?;
@@ -241,9 +242,18 @@ impl JobHandle {
// Have to do it the ugly way here because of borrowing semantics
let mut r = vec![];
+ let staging_store_lock = self.staging_store.read().await;
for p in paths.iter() {
+ use std::ops::Deref;
trace!("DB: Creating artifact entry for path: {}", p.display());
- r.push(dbmodels::Artifact::create(&self.db, p, &job)?);
+ let _ = dbmodels::Artifact::create(&self.db, p, &job)?;
+ r.push({
+ staging_store_lock
+ .deref()
+ .get(p)
+ .ok_or_else(|| anyhow!("Artifact not in store: {:?}", p))?
+ .clone()
+ });
}
Ok(r)
}
@@ -336,30 +346,30 @@ impl<'a> LogReceiver<'a> {
trace!("Setting bar to {}", u as u64);
self.bar.set_position(u as u64);
self.bar.set_message(&format!(
- "Job ({} {}): {} running...",
- self.package_name, self.package_version, self.job_id
+ "[{} {} {}]: running...",
+ self.job_id, self.package_name, self.package_version
));
}
LogItem::CurrentPhase(ref phasename) => {
trace!("Setting bar phase to {}", phasename);
self.bar.set_message(&format!(
- "Job ({} {}): {} Phase: {}",
- self.package_name, self.package_version, self.job_id, phasename
+ "[{} {} {}]: Phase: {}",
+ self.job_id, self.package_name, self.package_version, phasename
));
}
LogItem::State(Ok(())) => {
trace!("Setting bar state to Ok");
self.bar.set_message(&format!(
- "Job ({} {}): {} State Ok",
- self.package_name, self.package_version, self.job_id
+ "[{} {} {}]: State Ok",
+ self.job_id, self.package_name, self.package_version
));
success = Some(true);
}
LogItem::State(Err(ref e)) => {
trace!("Setting bar state to Err: {}", e);
self.bar.set_message(&format!(
- "Job ({} {}): {} State Err: {}",
- self.package_name, self.package_version, self.job_id, e
+ "[{} {} {}]: State Err: {}",
+ self.job_id, self.package_name, self.package_version, e
));
success = Some(false);
}
@@ -370,16 +380,16 @@ impl<'a> LogReceiver<'a> {
trace!("Finishing bar = {:?}", success);
let finish_msg = match success {
Some(true) => format!(
- "Job ({} {}): {} finished successfully",
- self.package_name, self.package_version, self.job_id
+ "[{} {} {}]: finished successfully",
+ self.job_id, self.package_name, self.package_version
),
Some(false) => format!(
- "Job ({} {}): {} finished with error",
- self.package_name, self.package_version, self.job_id
+ "[{} {} {}]: finished with error",
+ self.job_id, self.package_name, self.package_version
),
None => format!(
- "Job ({} {}): {} finished",
- self.package_name, self.package_version, self.job_id
+ "[{} {} {}]: finished",
+ self.job_id, self.package_name, self.package_version
),
};
self.bar.finish_with_message(&finish_msg);