summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-02-08 09:27:20 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-02-08 09:37:18 +0100
commit223f6dffee378648ecb3c5c8502fb9726bb454b9 (patch)
tree792babf442ca8c656c3f445b49dfd8184dd976c3 /src/orchestrator/orchestrator.rs
parent2d82860bbd867a328fa1ab21e77705439ba4636b (diff)
Fix: Cleanup progressbar in Drop impl for JobTask
The comment in the code describes the change well enough. Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs33
1 files changed, 31 insertions, 2 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index f5dd8cb..11233fe 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -386,6 +386,35 @@ struct JobTask<'a> {
sender: Vec<Sender<JobResult>>,
}
+
+/// Implement Drop to close the progress bar
+///
+/// This implementation is a bit of a hack.
+/// Because all `JobTask`s are `JobTask::run()` in parallel, but there is no IPC _between_ the
+/// tasks (there is IPC between childs and parents, but not between all JobTask objects), we never
+/// know whether any other task errored when the JobTask object is destructed.
+///
+/// One way to implement this would be to add multi-cast IPC between all `JobTask` objects, with some
+/// BUS like structure where all `JobTask`s can send messages to and listen to.
+/// But that's non-trivial and a lot of overhead, of course.
+///
+/// The trick here is, that the progressbar is either finished when `drop()` is called, which means
+/// that the `JobTask` is dropped because it finished,
+/// or the progressbar is not finished yet, which means that the `JobTask` is dropped because the
+/// runtime stops running it because some other `JobTask` errored.
+///
+/// In the latter case, we cleanup by telling the progressbar to finish.
+impl<'a> Drop for JobTask<'a> {
+ fn drop(&mut self) {
+ if !self.bar.is_finished() {
+ self.bar.finish_with_message(&format!("[{} {} {}] Stopped, error on other task",
+ self.jobdef.job.uuid(),
+ self.jobdef.job.package().name(),
+ self.jobdef.job.package().version()));
+ }
+ }
+}
+
impl<'a> JobTask<'a> {
/// Run the job
@@ -492,7 +521,7 @@ impl<'a> JobTask<'a> {
let job_uuid = *self.jobdef.job.uuid();
// Schedule the job on the scheduler
- match self.scheduler.schedule_job(runnable, self.bar).await?.run().await {
+ match self.scheduler.schedule_job(runnable, self.bar.clone()).await?.run().await {
// if the scheduler run reports an error,
// that is an error from the actual execution of the job ...
Err(e) => {
@@ -511,7 +540,7 @@ impl<'a> JobTask<'a> {
Ok(artifacts) => {
trace!("[{}]: Scheduler returned artifacts = {:?}", self.jobdef.job.uuid(), artifacts);
received_dependencies.insert(*self.jobdef.job.uuid(), artifacts);
- for s in self.sender {
+ for s in self.sender.iter() {
s.send(Ok(received_dependencies.clone())).await?;
}
},