summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-01-18 14:48:59 +0100
committerMatthias Beyer <matthias.beyer@atos.net>2021-01-18 14:48:59 +0100
commit0295809436d8e178a7d0528b47b9d4313b292eef (patch)
tree55671566fb700328c81a34b322cfa55309e098f8 /src/orchestrator/orchestrator.rs
parent5bee5329b823431fd3c971f75281084617766edd (diff)
Run `cargo fmt`
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs72
1 files changed, 49 insertions, 23 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index dd0c02b..7fec555 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -11,10 +11,10 @@
use std::path::PathBuf;
use std::sync::Arc;
+use anyhow::anyhow;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
-use anyhow::anyhow;
use diesel::PgConnection;
use log::trace;
use tokio::sync::RwLock;
@@ -59,34 +59,42 @@ pub struct OrchestratorSetup<'a> {
impl<'a> OrchestratorSetup<'a> {
pub async fn setup(self) -> Result<Orchestrator<'a>> {
- let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), self.database, self.submit.clone(), self.log_dir).await?;
+ let scheduler = EndpointScheduler::setup(
+ self.endpoint_config,
+ self.staging_store.clone(),
+ self.database,
+ self.submit.clone(),
+ self.log_dir,
+ )
+ .await?;
Ok(Orchestrator {
scheduler,
progress_generator: self.progress_generator,
merged_stores: MergedStores::new(self.release_store, self.staging_store),
- source_cache: self.source_cache,
- jobsets: self.jobsets,
- config: self.config,
+ source_cache: self.source_cache,
+ jobsets: self.jobsets,
+ config: self.config,
})
}
}
impl<'a> Orchestrator<'a> {
-
pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, anyhow::Error)>> {
for jobset in self.jobsets.into_iter() {
- let errs = Self::run_jobset(&self.scheduler,
+ let errs = Self::run_jobset(
+ &self.scheduler,
&self.merged_stores,
&self.source_cache,
&self.config,
&self.progress_generator,
jobset,
- output)
- .await?;
+ output,
+ )
+ .await?;
if !errs.is_empty() {
- return Ok(errs)
+ return Ok(errs);
}
}
@@ -100,9 +108,8 @@ impl<'a> Orchestrator<'a> {
config: &Configuration,
progress_generator: &ProgressBars,
jobset: JobSet,
- output: &mut Vec<Artifact>)
- -> Result<Vec<(Uuid, anyhow::Error)>>
- {
+ output: &mut Vec<Artifact>,
+ ) -> Result<Vec<(Uuid, anyhow::Error)>> {
use tokio::stream::StreamExt;
let multibar = Arc::new(indicatif::MultiProgress::new());
@@ -132,9 +139,14 @@ impl<'a> Orchestrator<'a> {
.inspect(|e| trace!("Processing result from jobset run: {:?}", e))
.partition(|e| e.is_ok());
- let results = okays.into_iter().filter_map(Result::ok).flatten().collect::<Vec<Artifact>>();
+ let results = okays
+ .into_iter()
+ .filter_map(Result::ok)
+ .flatten()
+ .collect::<Vec<Artifact>>();
- { // check if all paths that were written are actually there in the staging store
+ {
+ // check if all paths that were written are actually there in the staging store
let staging_store_lock = merged_store.staging().read().await;
trace!("Checking {} results...", results.len());
@@ -142,12 +154,20 @@ impl<'a> Orchestrator<'a> {
let a_path = artifact.path_buf();
trace!("Checking path: {}", a_path.display());
if !staging_store_lock.path_exists_in_store_root(&a_path) {
- return Err(anyhow!("Result path {} is missing from staging store", a_path.display()))
- .with_context(|| anyhow!("Should be: {}/{}", staging_store_lock.root_path().display(), a_path.display()))
- .map_err(Error::from)
+ return Err(anyhow!(
+ "Result path {} is missing from staging store",
+ a_path.display()
+ ))
+ .with_context(|| {
+ anyhow!(
+ "Should be: {}/{}",
+ staging_store_lock.root_path().display(),
+ a_path.display()
+ )
+ })
+ .map_err(Error::from);
}
}
-
}
let mut results = results; // rebind
@@ -155,9 +175,17 @@ impl<'a> Orchestrator<'a> {
Ok(errors.into_iter().filter_map(Result::err).collect())
}
- async fn run_runnable(runnable: RunnableJob, scheduler: &EndpointScheduler, bar: indicatif::ProgressBar) -> Result<Vec<Artifact>> {
+ async fn run_runnable(
+ runnable: RunnableJob,
+ scheduler: &EndpointScheduler,
+ bar: indicatif::ProgressBar,
+ ) -> Result<Vec<Artifact>> {
let job_id = *runnable.uuid();
- trace!("Runnable {} for package {}", job_id, runnable.package().name());
+ trace!(
+ "Runnable {} for package {}",
+ job_id,
+ runnable.package().name()
+ );
let jobhandle = scheduler.schedule_job(runnable, bar).await?;
trace!("Jobhandle -> {:?}", jobhandle);
@@ -166,6 +194,4 @@ impl<'a> Orchestrator<'a> {
trace!("Found result in job {}: {:?}", job_id, r);
r
}
-
}
-