From 223f6dffee378648ecb3c5c8502fb9726bb454b9 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Mon, 8 Feb 2021 09:27:20 +0100 Subject: Fix: Cleanup progressbar in Drop impl for JobTask The comment in the code describes the change well enough. Signed-off-by: Matthias Beyer --- src/orchestrator/orchestrator.rs | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) (limited to 'src/orchestrator/orchestrator.rs') diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index f5dd8cb..11233fe 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -386,6 +386,35 @@ struct JobTask<'a> { sender: Vec>, } + +/// Implement Drop to close the progress bar +/// +/// This implementation is a bit of a hack. +/// Because all `JobTask`s are `JobTask::run()` in parallel, but there is no IPC _between_ the +/// tasks (there is IPC between childs and parents, but not between all JobTask objects), we never +/// know whether any other task errored when the JobTask object is destructed. +/// +/// One way to implement this would be to add multi-cast IPC between all `JobTask` objects, with some +/// BUS like structure where all `JobTask`s can send messages to and listen to. +/// But that's non-trivial and a lot of overhead, of course. +/// +/// The trick here is, that the progressbar is either finished when `drop()` is called, which means +/// that the `JobTask` is dropped because it finished, +/// or the progressbar is not finished yet, which means that the `JobTask` is dropped because the +/// runtime stops running it because some other `JobTask` errored. +/// +/// In the latter case, we cleanup by telling the progressbar to finish. +impl<'a> Drop for JobTask<'a> { + fn drop(&mut self) { + if !self.bar.is_finished() { + self.bar.finish_with_message(&format!("[{} {} {}] Stopped, error on other task", + self.jobdef.job.uuid(), + self.jobdef.job.package().name(), + self.jobdef.job.package().version())); + } + } +} + impl<'a> JobTask<'a> { /// Run the job @@ -492,7 +521,7 @@ impl<'a> JobTask<'a> { let job_uuid = *self.jobdef.job.uuid(); // Schedule the job on the scheduler - match self.scheduler.schedule_job(runnable, self.bar).await?.run().await { + match self.scheduler.schedule_job(runnable, self.bar.clone()).await?.run().await { // if the scheduler run reports an error, // that is an error from the actual execution of the job ... Err(e) => { @@ -511,7 +540,7 @@ impl<'a> JobTask<'a> { Ok(artifacts) => { trace!("[{}]: Scheduler returned artifacts = {:?}", self.jobdef.job.uuid(), artifacts); received_dependencies.insert(*self.jobdef.job.uuid(), artifacts); - for s in self.sender { + for s in self.sender.iter() { s.send(Ok(received_dependencies.clone())).await?; } }, -- cgit v1.2.3 From 32a2a808188ab06032932befc17d4711492ff8d1 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Mon, 8 Feb 2021 09:28:16 +0100 Subject: Fix: Make sure to finish the progress bar before returning Signed-off-by: Matthias Beyer --- src/orchestrator/orchestrator.rs | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'src/orchestrator/orchestrator.rs') diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 11233fe..4d47ddb 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -471,6 +471,10 @@ impl<'a> JobTask<'a> { self.sender[0].send(Err(received_errors)).await; // ... and stop operation, because the whole tree will fail anyways. + self.bar.finish_with_message(&format!("[{} {} {}] Stopping, errors from child received", + self.jobdef.job.uuid(), + self.jobdef.job.package().name(), + self.jobdef.job.package().version())); return Ok(()) } } -- cgit v1.2.3 From f69289d1a076ac85fb50ee722b7581514e13dcf2 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Mon, 8 Feb 2021 09:28:32 +0100 Subject: Remove code that didnt do anything either way Signed-off-by: Matthias Beyer --- src/orchestrator/orchestrator.rs | 14 -------------- 1 file changed, 14 deletions(-) (limited to 'src/orchestrator/orchestrator.rs') diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 4d47ddb..b5d1609 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -479,20 +479,6 @@ impl<'a> JobTask<'a> { } } - // receive items until the channel is empty. - // - // In the above loop, it could happen that we have all dependencies to run, but there is - // another job that reports artifacts. - // We need to collect them, too. - // - // This is techically not possible, because in a tree, we need all results from all childs. - // It just feels better having this in place as well. - // - // Sorry, not sorry. - while self.perform_receive(&mut received_dependencies, &mut received_errors).await? { - ; - } - // Map the list of received dependencies from // Vec<(Uuid, Vec)> // to -- cgit v1.2.3