diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-02-08 11:06:17 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-02-08 11:06:17 +0100 |
commit | b90df19e52acf44482dfebbc6cf62c44a25d649a (patch) | |
tree | 5512516abe4bd2b8489a8ead17b026277e656208 /src/orchestrator/orchestrator.rs | |
parent | 09ca31a64761610cd2fc98a5668d1676046b2d89 (diff) | |
parent | f69289d1a076ac85fb50ee722b7581514e13dcf2 (diff) |
Merge branch 'fix-progressbar-cleanup'
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 51 |
1 files changed, 35 insertions, 16 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index ccf5117..710b53f 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -375,6 +375,35 @@ struct JobTask<'a> { sender: Vec<Sender<JobResult>>, } + +/// Implement Drop to close the progress bar +/// +/// This implementation is a bit of a hack. +/// Because all `JobTask`s are `JobTask::run()` in parallel, but there is no IPC _between_ the +/// tasks (there is IPC between childs and parents, but not between all JobTask objects), we never +/// know whether any other task errored when the JobTask object is destructed. +/// +/// One way to implement this would be to add multi-cast IPC between all `JobTask` objects, with some +/// BUS like structure where all `JobTask`s can send messages to and listen to. +/// But that's non-trivial and a lot of overhead, of course. +/// +/// The trick here is, that the progressbar is either finished when `drop()` is called, which means +/// that the `JobTask` is dropped because it finished, +/// or the progressbar is not finished yet, which means that the `JobTask` is dropped because the +/// runtime stops running it because some other `JobTask` errored. +/// +/// In the latter case, we cleanup by telling the progressbar to finish. +impl<'a> Drop for JobTask<'a> { + fn drop(&mut self) { + if !self.bar.is_finished() { + self.bar.finish_with_message(&format!("[{} {} {}] Stopped, error on other task", + self.jobdef.job.uuid(), + self.jobdef.job.package().name(), + self.jobdef.job.package().version())); + } + } +} + impl<'a> JobTask<'a> { fn new(receiver: Receiver<JobResult>, prep: TaskPreparation<'a>, sender: Vec<Sender<JobResult>>) -> Self { let bar = prep.bar.clone(); @@ -453,24 +482,14 @@ impl<'a> JobTask<'a> { self.sender[0].send(Err(received_errors)).await; // ... and stop operation, because the whole tree will fail anyways. + self.bar.finish_with_message(&format!("[{} {} {}] Stopping, errors from child received", + self.jobdef.job.uuid(), + self.jobdef.job.package().name(), + self.jobdef.job.package().version())); return Ok(()) } } - // receive items until the channel is empty. - // - // In the above loop, it could happen that we have all dependencies to run, but there is - // another job that reports artifacts. - // We need to collect them, too. - // - // This is techically not possible, because in a tree, we need all results from all childs. - // It just feels better having this in place as well. - // - // Sorry, not sorry. - while self.perform_receive(&mut received_dependencies, &mut received_errors).await? { - ; - } - // Map the list of received dependencies from // Vec<(Uuid, Vec<ArtifactPath>)> // to @@ -503,7 +522,7 @@ impl<'a> JobTask<'a> { let job_uuid = *self.jobdef.job.uuid(); // Schedule the job on the scheduler - match self.scheduler.schedule_job(runnable, self.bar).await?.run().await { + match self.scheduler.schedule_job(runnable, self.bar.clone()).await?.run().await { // if the scheduler run reports an error, // that is an error from the actual execution of the job ... Err(e) => { @@ -522,7 +541,7 @@ impl<'a> JobTask<'a> { Ok(artifacts) => { trace!("[{}]: Scheduler returned artifacts = {:?}", self.jobdef.job.uuid(), artifacts); received_dependencies.insert(*self.jobdef.job.uuid(), artifacts); - for s in self.sender { + for s in self.sender.iter() { s.send(Ok(received_dependencies.clone())).await?; } }, |