diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2020-11-13 09:46:14 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2020-11-13 09:49:17 +0100 |
commit | 3c0a0f37ccf838b59118407b18a11c071e31ff0b (patch) | |
tree | bb49e16237a93b0b247b74485c96295712a41f4c /src | |
parent | 332fb52189f1febf4f076305b55af43f37f23b6b (diff) |
Make JobHandle create database entry for Job
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src')
-rw-r--r-- | src/endpoint/scheduler.rs | 36 |
1 files changed, 26 insertions, 10 deletions
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 4d198d4..6bbb438 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -123,11 +123,16 @@ impl std::fmt::Debug for JobHandle { impl JobHandle { pub async fn run(self) -> Result<Vec<PathBuf>> { + use crate::db::models as dbmodels; let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>(); let ep = self.endpoint .read() .map_err(|_| anyhow!("Lock poisoned"))?; + let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?; + let package = dbmodels::Package::create_or_fetch(&self.db, self.job.package())?; + let image = dbmodels::Image::create_or_fetch(&self.db, self.job.image())?; + let job_id = self.job.uuid().clone(); trace!("Running on Job {} on Endpoint {}", job_id, ep.name()); let res = ep @@ -137,16 +142,16 @@ impl JobHandle { job_id, log_receiver, bar: self.bar, - db: self.db, + db: self.db.clone(), }.join(); let (res, logres) = tokio::join!(res, logres); trace!("Found result for job {}: {:?}", job_id, res); - logres.with_context(|| anyhow!("Collecting logs for job on '{}'", ep.name()))?; - let (paths, container_hash) = res.with_context(|| anyhow!("Running job on '{}'", ep.name()))?; - + let log = logres.with_context(|| anyhow!("Collecting logs for job on '{}'", ep.name()))?; + let (paths, container_hash, script) = res.with_context(|| anyhow!("Running job on '{}'", ep.name()))?; + dbmodels::Job::create(&self.db, &self.submit, &endpoint, &package, &image, &container_hash, &script, &log)?; Ok(paths) } @@ -160,28 +165,33 @@ struct LogReceiver { } impl LogReceiver { - async fn join(mut self) -> Result<()> { + async fn join(mut self) -> Result<String> { + use resiter::Map; + let mut success = None; + let mut accu = vec![]; + while let Some(logitem) = self.log_receiver.recv().await { match logitem { - LogItem::Line(_) => { + LogItem::Line(ref l) => { // ignore }, LogItem::Progress(u) => { self.bar.set_position(u as u64); }, - LogItem::CurrentPhase(phasename) => { + LogItem::CurrentPhase(ref phasename) => { self.bar.set_message(&format!("{} Phase: {}", self.job_id, phasename)); }, - LogItem::State(Ok(s)) => { + LogItem::State(Ok(ref s)) => { self.bar.set_message(&format!("{} State Ok: {}", self.job_id, s)); success = Some(true); }, - LogItem::State(Err(e)) => { + LogItem::State(Err(ref e)) => { self.bar.set_message(&format!("{} State Err: {}", self.job_id, e)); success = Some(false); }, } + accu.push(logitem); } match success { @@ -190,7 +200,13 @@ impl LogReceiver { None => self.bar.finish_with_message(&format!("{} finished", self.job_id)), } - Ok(()) + Ok({ + accu.into_iter() + .map(|ll| ll.display()) + .map_ok(|d| d.to_string()) + .collect::<Result<Vec<String>>>()? + .join("\n") + }) } } |