diff options
Diffstat (limited to 'src/endpoint/scheduler.rs')
-rw-r--r-- | src/endpoint/scheduler.rs | 42 |
1 files changed, 26 insertions, 16 deletions
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 93b7509..c2f35fc 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -29,6 +29,7 @@ use uuid::Uuid; use crate::db::models as dbmodels; use crate::endpoint::Endpoint; use crate::endpoint::EndpointConfiguration; +use crate::filestore::Artifact; use crate::filestore::StagingStore; use crate::job::JobResource; use crate::job::RunnableJob; @@ -145,7 +146,7 @@ impl std::fmt::Debug for JobHandle { } impl JobHandle { - pub async fn run(self) -> Result<Vec<dbmodels::Artifact>> { + pub async fn run(self) -> Result<Vec<Artifact>> { let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>(); let ep = self.endpoint.read().await; let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?; @@ -241,9 +242,18 @@ impl JobHandle { // Have to do it the ugly way here because of borrowing semantics let mut r = vec![]; + let staging_store_lock = self.staging_store.read().await; for p in paths.iter() { + use std::ops::Deref; trace!("DB: Creating artifact entry for path: {}", p.display()); - r.push(dbmodels::Artifact::create(&self.db, p, &job)?); + let _ = dbmodels::Artifact::create(&self.db, p, &job)?; + r.push({ + staging_store_lock + .deref() + .get(p) + .ok_or_else(|| anyhow!("Artifact not in store: {:?}", p))? + .clone() + }); } Ok(r) } @@ -336,30 +346,30 @@ impl<'a> LogReceiver<'a> { trace!("Setting bar to {}", u as u64); self.bar.set_position(u as u64); self.bar.set_message(&format!( - "Job ({} {}): {} running...", - self.package_name, self.package_version, self.job_id + "[{} {} {}]: running...", + self.job_id, self.package_name, self.package_version )); } LogItem::CurrentPhase(ref phasename) => { trace!("Setting bar phase to {}", phasename); self.bar.set_message(&format!( - "Job ({} {}): {} Phase: {}", - self.package_name, self.package_version, self.job_id, phasename + "[{} {} {}]: Phase: {}", + self.job_id, self.package_name, self.package_version, phasename )); } LogItem::State(Ok(())) => { trace!("Setting bar state to Ok"); self.bar.set_message(&format!( - "Job ({} {}): {} State Ok", - self.package_name, self.package_version, self.job_id + "[{} {} {}]: State Ok", + self.job_id, self.package_name, self.package_version )); success = Some(true); } LogItem::State(Err(ref e)) => { trace!("Setting bar state to Err: {}", e); self.bar.set_message(&format!( - "Job ({} {}): {} State Err: {}", - self.package_name, self.package_version, self.job_id, e + "[{} {} {}]: State Err: {}", + self.job_id, self.package_name, self.package_version, e )); success = Some(false); } @@ -370,16 +380,16 @@ impl<'a> LogReceiver<'a> { trace!("Finishing bar = {:?}", success); let finish_msg = match success { Some(true) => format!( - "Job ({} {}): {} finished successfully", - self.package_name, self.package_version, self.job_id + "[{} {} {}]: finished successfully", + self.job_id, self.package_name, self.package_version ), Some(false) => format!( - "Job ({} {}): {} finished with error", - self.package_name, self.package_version, self.job_id + "[{} {} {}]: finished with error", + self.job_id, self.package_name, self.package_version ), None => format!( - "Job ({} {}): {} finished", - self.package_name, self.package_version, self.job_id + "[{} {} {}]: finished", + self.job_id, self.package_name, self.package_version ), }; self.bar.finish_with_message(&finish_msg); |