diff options
author | Matthias Beyer <matthias.beyer@atos.net> | 2021-01-18 14:48:59 +0100 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@atos.net> | 2021-01-18 14:48:59 +0100 |
commit | 0295809436d8e178a7d0528b47b9d4313b292eef (patch) | |
tree | 55671566fb700328c81a34b322cfa55309e098f8 /src/orchestrator/orchestrator.rs | |
parent | 5bee5329b823431fd3c971f75281084617766edd (diff) |
Run `cargo fmt`
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 72 |
1 files changed, 49 insertions, 23 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index dd0c02b..7fec555 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -11,10 +11,10 @@ use std::path::PathBuf; use std::sync::Arc; +use anyhow::anyhow; use anyhow::Context; use anyhow::Error; use anyhow::Result; -use anyhow::anyhow; use diesel::PgConnection; use log::trace; use tokio::sync::RwLock; @@ -59,34 +59,42 @@ pub struct OrchestratorSetup<'a> { impl<'a> OrchestratorSetup<'a> { pub async fn setup(self) -> Result<Orchestrator<'a>> { - let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), self.database, self.submit.clone(), self.log_dir).await?; + let scheduler = EndpointScheduler::setup( + self.endpoint_config, + self.staging_store.clone(), + self.database, + self.submit.clone(), + self.log_dir, + ) + .await?; Ok(Orchestrator { scheduler, progress_generator: self.progress_generator, merged_stores: MergedStores::new(self.release_store, self.staging_store), - source_cache: self.source_cache, - jobsets: self.jobsets, - config: self.config, + source_cache: self.source_cache, + jobsets: self.jobsets, + config: self.config, }) } } impl<'a> Orchestrator<'a> { - pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, anyhow::Error)>> { for jobset in self.jobsets.into_iter() { - let errs = Self::run_jobset(&self.scheduler, + let errs = Self::run_jobset( + &self.scheduler, &self.merged_stores, &self.source_cache, &self.config, &self.progress_generator, jobset, - output) - .await?; + output, + ) + .await?; if !errs.is_empty() { - return Ok(errs) + return Ok(errs); } } @@ -100,9 +108,8 @@ impl<'a> Orchestrator<'a> { config: &Configuration, progress_generator: &ProgressBars, jobset: JobSet, - output: &mut Vec<Artifact>) - -> Result<Vec<(Uuid, anyhow::Error)>> - { + output: &mut Vec<Artifact>, + ) -> Result<Vec<(Uuid, anyhow::Error)>> { use tokio::stream::StreamExt; let multibar = Arc::new(indicatif::MultiProgress::new()); @@ -132,9 +139,14 @@ impl<'a> Orchestrator<'a> { .inspect(|e| trace!("Processing result from jobset run: {:?}", e)) .partition(|e| e.is_ok()); - let results = okays.into_iter().filter_map(Result::ok).flatten().collect::<Vec<Artifact>>(); + let results = okays + .into_iter() + .filter_map(Result::ok) + .flatten() + .collect::<Vec<Artifact>>(); - { // check if all paths that were written are actually there in the staging store + { + // check if all paths that were written are actually there in the staging store let staging_store_lock = merged_store.staging().read().await; trace!("Checking {} results...", results.len()); @@ -142,12 +154,20 @@ impl<'a> Orchestrator<'a> { let a_path = artifact.path_buf(); trace!("Checking path: {}", a_path.display()); if !staging_store_lock.path_exists_in_store_root(&a_path) { - return Err(anyhow!("Result path {} is missing from staging store", a_path.display())) - .with_context(|| anyhow!("Should be: {}/{}", staging_store_lock.root_path().display(), a_path.display())) - .map_err(Error::from) + return Err(anyhow!( + "Result path {} is missing from staging store", + a_path.display() + )) + .with_context(|| { + anyhow!( + "Should be: {}/{}", + staging_store_lock.root_path().display(), + a_path.display() + ) + }) + .map_err(Error::from); } } - } let mut results = results; // rebind @@ -155,9 +175,17 @@ impl<'a> Orchestrator<'a> { Ok(errors.into_iter().filter_map(Result::err).collect()) } - async fn run_runnable(runnable: RunnableJob, scheduler: &EndpointScheduler, bar: indicatif::ProgressBar) -> Result<Vec<Artifact>> { + async fn run_runnable( + runnable: RunnableJob, + scheduler: &EndpointScheduler, + bar: indicatif::ProgressBar, + ) -> Result<Vec<Artifact>> { let job_id = *runnable.uuid(); - trace!("Runnable {} for package {}", job_id, runnable.package().name()); + trace!( + "Runnable {} for package {}", + job_id, + runnable.package().name() + ); let jobhandle = scheduler.schedule_job(runnable, bar).await?; trace!("Jobhandle -> {:?}", jobhandle); @@ -166,6 +194,4 @@ impl<'a> Orchestrator<'a> { trace!("Found result in job {}: {:?}", job_id, r); r } - } - |