summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-02-08 11:06:17 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-02-08 11:06:17 +0100
commitb90df19e52acf44482dfebbc6cf62c44a25d649a (patch)
tree5512516abe4bd2b8489a8ead17b026277e656208 /src/orchestrator/orchestrator.rs
parent09ca31a64761610cd2fc98a5668d1676046b2d89 (diff)
parentf69289d1a076ac85fb50ee722b7581514e13dcf2 (diff)
Merge branch 'fix-progressbar-cleanup'
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs51
1 files changed, 35 insertions, 16 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index ccf5117..710b53f 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -375,6 +375,35 @@ struct JobTask<'a> {
sender: Vec<Sender<JobResult>>,
}
+
+/// Implement Drop to close the progress bar
+///
+/// This implementation is a bit of a hack.
+/// Because all `JobTask`s are `JobTask::run()` in parallel, but there is no IPC _between_ the
+/// tasks (there is IPC between childs and parents, but not between all JobTask objects), we never
+/// know whether any other task errored when the JobTask object is destructed.
+///
+/// One way to implement this would be to add multi-cast IPC between all `JobTask` objects, with some
+/// BUS like structure where all `JobTask`s can send messages to and listen to.
+/// But that's non-trivial and a lot of overhead, of course.
+///
+/// The trick here is, that the progressbar is either finished when `drop()` is called, which means
+/// that the `JobTask` is dropped because it finished,
+/// or the progressbar is not finished yet, which means that the `JobTask` is dropped because the
+/// runtime stops running it because some other `JobTask` errored.
+///
+/// In the latter case, we cleanup by telling the progressbar to finish.
+impl<'a> Drop for JobTask<'a> {
+ fn drop(&mut self) {
+ if !self.bar.is_finished() {
+ self.bar.finish_with_message(&format!("[{} {} {}] Stopped, error on other task",
+ self.jobdef.job.uuid(),
+ self.jobdef.job.package().name(),
+ self.jobdef.job.package().version()));
+ }
+ }
+}
+
impl<'a> JobTask<'a> {
fn new(receiver: Receiver<JobResult>, prep: TaskPreparation<'a>, sender: Vec<Sender<JobResult>>) -> Self {
let bar = prep.bar.clone();
@@ -453,24 +482,14 @@ impl<'a> JobTask<'a> {
self.sender[0].send(Err(received_errors)).await;
// ... and stop operation, because the whole tree will fail anyways.
+ self.bar.finish_with_message(&format!("[{} {} {}] Stopping, errors from child received",
+ self.jobdef.job.uuid(),
+ self.jobdef.job.package().name(),
+ self.jobdef.job.package().version()));
return Ok(())
}
}
- // receive items until the channel is empty.
- //
- // In the above loop, it could happen that we have all dependencies to run, but there is
- // another job that reports artifacts.
- // We need to collect them, too.
- //
- // This is techically not possible, because in a tree, we need all results from all childs.
- // It just feels better having this in place as well.
- //
- // Sorry, not sorry.
- while self.perform_receive(&mut received_dependencies, &mut received_errors).await? {
- ;
- }
-
// Map the list of received dependencies from
// Vec<(Uuid, Vec<ArtifactPath>)>
// to
@@ -503,7 +522,7 @@ impl<'a> JobTask<'a> {
let job_uuid = *self.jobdef.job.uuid();
// Schedule the job on the scheduler
- match self.scheduler.schedule_job(runnable, self.bar).await?.run().await {
+ match self.scheduler.schedule_job(runnable, self.bar.clone()).await?.run().await {
// if the scheduler run reports an error,
// that is an error from the actual execution of the job ...
Err(e) => {
@@ -522,7 +541,7 @@ impl<'a> JobTask<'a> {
Ok(artifacts) => {
trace!("[{}]: Scheduler returned artifacts = {:?}", self.jobdef.job.uuid(), artifacts);
received_dependencies.insert(*self.jobdef.job.uuid(), artifacts);
- for s in self.sender {
+ for s in self.sender.iter() {
s.send(Ok(received_dependencies.clone())).await?;
}
},