diff options
author | Matthias Beyer <matthias.beyer@atos.net> | 2021-02-08 09:27:20 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-02-08 09:37:18 +0100 |
commit | 223f6dffee378648ecb3c5c8502fb9726bb454b9 (patch) | |
tree | 792babf442ca8c656c3f445b49dfd8184dd976c3 /src/orchestrator | |
parent | 2d82860bbd867a328fa1ab21e77705439ba4636b (diff) |
Fix: Cleanup progressbar in Drop impl for JobTask
The comment in the code describes the change well enough.
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/orchestrator')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 33 |
1 files changed, 31 insertions, 2 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index f5dd8cb..11233fe 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -386,6 +386,35 @@ struct JobTask<'a> { sender: Vec<Sender<JobResult>>, } + +/// Implement Drop to close the progress bar +/// +/// This implementation is a bit of a hack. +/// Because all `JobTask`s are `JobTask::run()` in parallel, but there is no IPC _between_ the +/// tasks (there is IPC between childs and parents, but not between all JobTask objects), we never +/// know whether any other task errored when the JobTask object is destructed. +/// +/// One way to implement this would be to add multi-cast IPC between all `JobTask` objects, with some +/// BUS like structure where all `JobTask`s can send messages to and listen to. +/// But that's non-trivial and a lot of overhead, of course. +/// +/// The trick here is, that the progressbar is either finished when `drop()` is called, which means +/// that the `JobTask` is dropped because it finished, +/// or the progressbar is not finished yet, which means that the `JobTask` is dropped because the +/// runtime stops running it because some other `JobTask` errored. +/// +/// In the latter case, we cleanup by telling the progressbar to finish. +impl<'a> Drop for JobTask<'a> { + fn drop(&mut self) { + if !self.bar.is_finished() { + self.bar.finish_with_message(&format!("[{} {} {}] Stopped, error on other task", + self.jobdef.job.uuid(), + self.jobdef.job.package().name(), + self.jobdef.job.package().version())); + } + } +} + impl<'a> JobTask<'a> { /// Run the job @@ -492,7 +521,7 @@ impl<'a> JobTask<'a> { let job_uuid = *self.jobdef.job.uuid(); // Schedule the job on the scheduler - match self.scheduler.schedule_job(runnable, self.bar).await?.run().await { + match self.scheduler.schedule_job(runnable, self.bar.clone()).await?.run().await { // if the scheduler run reports an error, // that is an error from the actual execution of the job ... Err(e) => { @@ -511,7 +540,7 @@ impl<'a> JobTask<'a> { Ok(artifacts) => { trace!("[{}]: Scheduler returned artifacts = {:?}", self.jobdef.job.uuid(), artifacts); received_dependencies.insert(*self.jobdef.job.uuid(), artifacts); - for s in self.sender { + for s in self.sender.iter() { s.send(Ok(received_dependencies.clone())).await?; } }, |