diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-01-25 15:37:28 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-01-25 15:43:00 +0100 |
commit | fffac362dd4e866a92e788ef7a2945f78adb8520 (patch) | |
tree | cd43c39168c6202b4950a23d3e356165b471b084 /src/endpoint/scheduler.rs | |
parent | b7d8aaa8d96e7059f6cf3dd23f5b894752a30cc5 (diff) | |
parent | 53360d499e2233b30b1aaf745800cd7199009153 (diff) |
Merge branch 'more-parallelism' into master
Diffstat (limited to 'src/endpoint/scheduler.rs')
-rw-r--r-- | src/endpoint/scheduler.rs | 42 |
1 files changed, 26 insertions, 16 deletions
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 93b7509..c2f35fc 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -29,6 +29,7 @@ use uuid::Uuid; use crate::db::models as dbmodels; use crate::endpoint::Endpoint; use crate::endpoint::EndpointConfiguration; +use crate::filestore::Artifact; use crate::filestore::StagingStore; use crate::job::JobResource; use crate::job::RunnableJob; @@ -145,7 +146,7 @@ impl std::fmt::Debug for JobHandle { } impl JobHandle { - pub async fn run(self) -> Result<Vec<dbmodels::Artifact>> { + pub async fn run(self) -> Result<Vec<Artifact>> { let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>(); let ep = self.endpoint.read().await; let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?; @@ -241,9 +242,18 @@ impl JobHandle { // Have to do it the ugly way here because of borrowing semantics let mut r = vec![]; + let staging_store_lock = self.staging_store.read().await; for p in paths.iter() { + use std::ops::Deref; trace!("DB: Creating artifact entry for path: {}", p.display()); - r.push(dbmodels::Artifact::create(&self.db, p, &job)?); + let _ = dbmodels::Artifact::create(&self.db, p, &job)?; + r.push({ + staging_store_lock + .deref() + .get(p) + .ok_or_else(|| anyhow!("Artifact not in store: {:?}", p))? + .clone() + }); } Ok(r) } @@ -336,30 +346,30 @@ impl<'a> LogReceiver<'a> { trace!("Setting bar to {}", u as u64); self.bar.set_position(u as u64); self.bar.set_message(&format!( - "Job ({} {}): {} running...", - self.package_name, self.package_version, self.job_id + "[{} {} {}]: running...", + self.job_id, self.package_name, self.package_version )); } LogItem::CurrentPhase(ref phasename) => { trace!("Setting bar phase to {}", phasename); self.bar.set_message(&format!( - "Job ({} {}): {} Phase: {}", - self.package_name, self.package_version, self.job_id, phasename + "[{} {} {}]: Phase: {}", + self.job_id, self.package_name, self.package_version, phasename )); } LogItem::State(Ok(())) => { trace!("Setting bar state to Ok"); self.bar.set_message(&format!( - "Job ({} {}): {} State Ok", - self.package_name, self.package_version, self.job_id + "[{} {} {}]: State Ok", + self.job_id, self.package_name, self.package_version )); success = Some(true); } LogItem::State(Err(ref e)) => { trace!("Setting bar state to Err: {}", e); self.bar.set_message(&format!( - "Job ({} {}): {} State Err: {}", - self.package_name, self.package_version, self.job_id, e + "[{} {} {}]: State Err: {}", + self.job_id, self.package_name, self.package_version, e )); success = Some(false); } @@ -370,16 +380,16 @@ impl<'a> LogReceiver<'a> { trace!("Finishing bar = {:?}", success); let finish_msg = match success { Some(true) => format!( - "Job ({} {}): {} finished successfully", - self.package_name, self.package_version, self.job_id + "[{} {} {}]: finished successfully", + self.job_id, self.package_name, self.package_version ), Some(false) => format!( - "Job ({} {}): {} finished with error", - self.package_name, self.package_version, self.job_id + "[{} {} {}]: finished with error", + self.job_id, self.package_name, self.package_version ), None => format!( - "Job ({} {}): {} finished", - self.package_name, self.package_version, self.job_id + "[{} {} {}]: finished", + self.job_id, self.package_name, self.package_version ), }; self.bar.finish_with_message(&finish_msg); |