summaryrefslogtreecommitdiffstats
path: root/src/orchestrator
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-11-07 17:21:18 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-11-07 17:38:27 +0100
commitb5d081eece72841728d4ccc4a325f390a325e699 (patch)
treec6232e2db810e614daff33da0c48a039d1e07d6d /src/orchestrator
parentbe71c74216d6b49a5646587243640f687a2bdf0c (diff)
Implement progress bars in orchestrator
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/orchestrator')
-rw-r--r--src/orchestrator/orchestrator.rs76
1 files changed, 70 insertions, 6 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 04acd4b..0132e08 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -26,8 +26,10 @@ use crate::db::models::Submit;
use crate::db::models::EnvVar;
use crate::job::JobResource;
use crate::filestore::MergedStores;
+use crate::util::progress::ProgressBars;
pub struct Orchestrator {
+ progress_generator: ProgressBars,
scheduler: EndpointScheduler,
staging_store: Arc<RwLock<StagingStore>>,
release_store: Arc<RwLock<ReleaseStore>>,
@@ -38,6 +40,7 @@ pub struct Orchestrator {
#[derive(TypedBuilder)]
pub struct OrchestratorSetup {
+ progress_generator: ProgressBars,
endpoint_config: Vec<EndpointConfiguration>,
staging_store: Arc<RwLock<StagingStore>>,
release_store: Arc<RwLock<ReleaseStore>>,
@@ -52,6 +55,7 @@ impl OrchestratorSetup {
let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone()).await?;
Ok(Orchestrator {
+ progress_generator: self.progress_generator,
scheduler: scheduler,
staging_store: self.staging_store,
release_store: self.release_store,
@@ -67,8 +71,19 @@ impl Orchestrator {
pub async fn run(self) -> Result<()> {
use tokio::stream::StreamExt;
+ let number_of_jobsets = self.jobsets.len();
let _database = self.database;
- for jobset in self.jobsets.into_iter() {
+
+ for (i, jobset) in self.jobsets.into_iter().enumerate() {
+ // create a multi-bar for showing the overall jobset status as well as one bar per
+ // running job.
+ let jobset_bar = indicatif::MultiProgress::default();
+
+ // Create a "overview bar", which shows the progress of all jobs of the jobset combined
+ let jobset_overview_bar = jobset_bar.add({
+ self.progress_generator.jobset_bar(i + 1, number_of_jobsets, jobset.len())
+ });
+
let merged_store = MergedStores::new(self.release_store.clone(), self.staging_store.clone());
let (results, logs) = { // run the jobs in the set
@@ -76,31 +91,80 @@ impl Orchestrator {
let unordered_receivers = futures::stream::FuturesUnordered::new();
for runnable in jobset.into_runables(&merged_store) {
let runnable = runnable?;
- trace!("Runnable {} for package {}", runnable.uuid(), runnable.package().name());
+ let job_id = runnable.uuid().clone();
+ trace!("Runnable {} for package {}", job_id, runnable.package().name());
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>();
let jobhandle = self.scheduler.schedule_job(runnable, sender).await?;
trace!("Jobhandle -> {:?}", jobhandle);
+
+ // clone the bar here, so we can give a handle to the async result fetcher closure
+ // where we tick() it as soon as the job returns the result (= is finished)
+ let bar = jobset_overview_bar.clone();
+
unordered_results.push(async move {
- jobhandle.get_result()
- .await
+ let r = jobhandle.get_result().await;
+ bar.tick();
+ r
});
unordered_receivers.push(async move {
- receiver
+ (job_id, receiver)
});
}
(unordered_results.collect::<Result<Vec<_>>>(), unordered_receivers.collect::<Vec<_>>())
};
- let (results, _logs) = tokio::join!(results, logs);
+ let (results, logs) = tokio::join!(results, logs);
// TODO: Use logs.
+ {
+ let log_processing_results = futures::stream::FuturesUnordered::new();
+ for (job_id, mut log) in logs {
+ let bar = jobset_bar.add(self.progress_generator.job_bar(&job_id));
+ log_processing_results.push(async move {
+ let mut success = None;
+ while let Some(logitem) = log.recv().await {
+ match logitem {
+ LogItem::Line(_) => {
+ // ignore
+ },
+ LogItem::Progress(u) => {
+ bar.set_position(u as u64);
+ },
+ LogItem::CurrentPhase(phasename) => {
+ bar.set_message(&format!("{} Phase: {}", job_id, phasename));
+ },
+ LogItem::State(Ok(s)) => {
+ bar.set_message(&format!("{} State Ok: {}", job_id, s));
+ success = Some(true);
+ },
+ LogItem::State(Err(e)) => {
+ bar.set_message(&format!("{} State Err: {}", job_id, e));
+ success = Some(false);
+ },
+ }
+ }
+
+ match success {
+ Some(true) => bar.finish_with_message(&format!("{} finished successfully", job_id)),
+ Some(false) => bar.finish_with_message(&format!("{} finished with error", job_id)),
+ None => bar.finish_with_message(&format!("{} finished", job_id)),
+ }
+ });
+ }
+
+ let _ = log_processing_results.collect::<Vec<_>>().await;
+ }
+
let results = results?
.into_iter()
.flatten()
.collect::<Vec<PathBuf>>();
+ let _ = jobset_overview_bar.finish();
+ let _ = jobset_bar.join()?;
+
{ // check if all paths that were written are actually there in the staging store
let staging_store_lock = self.staging_store
.read()