summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-11-13 09:46:14 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-11-13 09:49:17 +0100
commit3c0a0f37ccf838b59118407b18a11c071e31ff0b (patch)
treebb49e16237a93b0b247b74485c96295712a41f4c /src
parent332fb52189f1febf4f076305b55af43f37f23b6b (diff)
Make JobHandle create database entry for Job
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src')
-rw-r--r--src/endpoint/scheduler.rs36
1 files changed, 26 insertions, 10 deletions
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 4d198d4..6bbb438 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -123,11 +123,16 @@ impl std::fmt::Debug for JobHandle {
impl JobHandle {
pub async fn run(self) -> Result<Vec<PathBuf>> {
+ use crate::db::models as dbmodels;
let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>();
let ep = self.endpoint
.read()
.map_err(|_| anyhow!("Lock poisoned"))?;
+ let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?;
+ let package = dbmodels::Package::create_or_fetch(&self.db, self.job.package())?;
+ let image = dbmodels::Image::create_or_fetch(&self.db, self.job.image())?;
+
let job_id = self.job.uuid().clone();
trace!("Running on Job {} on Endpoint {}", job_id, ep.name());
let res = ep
@@ -137,16 +142,16 @@ impl JobHandle {
job_id,
log_receiver,
bar: self.bar,
- db: self.db,
+ db: self.db.clone(),
}.join();
let (res, logres) = tokio::join!(res, logres);
trace!("Found result for job {}: {:?}", job_id, res);
- logres.with_context(|| anyhow!("Collecting logs for job on '{}'", ep.name()))?;
- let (paths, container_hash) = res.with_context(|| anyhow!("Running job on '{}'", ep.name()))?;
-
+ let log = logres.with_context(|| anyhow!("Collecting logs for job on '{}'", ep.name()))?;
+ let (paths, container_hash, script) = res.with_context(|| anyhow!("Running job on '{}'", ep.name()))?;
+ dbmodels::Job::create(&self.db, &self.submit, &endpoint, &package, &image, &container_hash, &script, &log)?;
Ok(paths)
}
@@ -160,28 +165,33 @@ struct LogReceiver {
}
impl LogReceiver {
- async fn join(mut self) -> Result<()> {
+ async fn join(mut self) -> Result<String> {
+ use resiter::Map;
+
let mut success = None;
+ let mut accu = vec![];
+
while let Some(logitem) = self.log_receiver.recv().await {
match logitem {
- LogItem::Line(_) => {
+ LogItem::Line(ref l) => {
// ignore
},
LogItem::Progress(u) => {
self.bar.set_position(u as u64);
},
- LogItem::CurrentPhase(phasename) => {
+ LogItem::CurrentPhase(ref phasename) => {
self.bar.set_message(&format!("{} Phase: {}", self.job_id, phasename));
},
- LogItem::State(Ok(s)) => {
+ LogItem::State(Ok(ref s)) => {
self.bar.set_message(&format!("{} State Ok: {}", self.job_id, s));
success = Some(true);
},
- LogItem::State(Err(e)) => {
+ LogItem::State(Err(ref e)) => {
self.bar.set_message(&format!("{} State Err: {}", self.job_id, e));
success = Some(false);
},
}
+ accu.push(logitem);
}
match success {
@@ -190,7 +200,13 @@ impl LogReceiver {
None => self.bar.finish_with_message(&format!("{} finished", self.job_id)),
}
- Ok(())
+ Ok({
+ accu.into_iter()
+ .map(|ll| ll.display())
+ .map_ok(|d| d.to_string())
+ .collect::<Result<Vec<String>>>()?
+ .join("\n")
+ })
}
}