summaryrefslogtreecommitdiffstats
path: root/src/orchestrator
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-12-08 15:16:44 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-12-08 15:16:56 +0100
commita2f369eb3e18b062df9ab62868c4a63d3b0e6f94 (patch)
treed7c41d67aeb8cfbcc1bccd6b9e9a8e6f35d9ebc7 /src/orchestrator
parenta1ce2deaeb2c4ed1002bc4c767fd630773e7de16 (diff)
Move progress bar instantiation out of JobHandle implementation
... and into orchestrator implementation. Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/orchestrator')
-rw-r--r--src/orchestrator/orchestrator.rs17
1 files changed, 9 insertions, 8 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index f569120..eb03c0a 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -28,6 +28,7 @@ use crate::util::progress::ProgressBars;
pub struct Orchestrator<'a> {
scheduler: EndpointScheduler,
+ progress_generator: ProgressBars,
merged_stores: MergedStores,
source_cache: SourceCache,
jobsets: Vec<JobSet>,
@@ -51,10 +52,11 @@ pub struct OrchestratorSetup<'a> {
impl<'a> OrchestratorSetup<'a> {
pub async fn setup(self) -> Result<Orchestrator<'a>> {
let db = Arc::new(self.database);
- let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db, self.progress_generator, self.submit.clone(), self.log_dir).await?;
+ let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db, self.submit.clone(), self.log_dir).await?;
Ok(Orchestrator {
scheduler: scheduler,
+ progress_generator: self.progress_generator,
merged_stores: MergedStores::new(self.release_store, self.staging_store),
source_cache: self.source_cache,
jobsets: self.jobsets,
@@ -72,6 +74,7 @@ impl<'a> Orchestrator<'a> {
&self.merged_stores,
&self.source_cache,
&self.config,
+ &self.progress_generator,
jobset)
.await?;
@@ -86,6 +89,7 @@ impl<'a> Orchestrator<'a> {
merged_store: &MergedStores,
source_cache: &SourceCache,
config: &Configuration,
+ progress_generator: &ProgressBars,
jobset: JobSet)
-> Result<Vec<Artifact>>
{
@@ -97,11 +101,8 @@ impl<'a> Orchestrator<'a> {
.await?
.into_iter()
.map(|runnable| {
- let multibar = multibar.clone();
-
- async {
- Self::run_runnable(multibar, runnable, scheduler).await
- }
+ let bar = multibar.add(progress_generator.job_bar(runnable.uuid()));
+ Self::run_runnable(runnable, scheduler, bar)
})
.collect::<futures::stream::FuturesUnordered<_>>()
.collect::<Vec<RResult<Vec<Artifact>, ContainerError>>>();
@@ -147,13 +148,13 @@ impl<'a> Orchestrator<'a> {
Ok(results)
}
- async fn run_runnable(multibar: Arc<indicatif::MultiProgress>, runnable: RunnableJob, scheduler: &EndpointScheduler)
+ async fn run_runnable(runnable: RunnableJob, scheduler: &EndpointScheduler, bar: indicatif::ProgressBar)
-> RResult<Vec<Artifact>, ContainerError>
{
let job_id = runnable.uuid().clone();
trace!("Runnable {} for package {}", job_id, runnable.package().name());
- let jobhandle = scheduler.schedule_job(runnable, multibar).await?;
+ let jobhandle = scheduler.schedule_job(runnable, bar).await?;
trace!("Jobhandle -> {:?}", jobhandle);
let r = jobhandle.run().await;