diff options
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 262 |
1 files changed, 151 insertions, 111 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 7fec555..03a4cd2 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -12,25 +12,27 @@ use std::path::PathBuf; use std::sync::Arc; use anyhow::anyhow; -use anyhow::Context; use anyhow::Error; use anyhow::Result; use diesel::PgConnection; +use indicatif::ProgressBar; use log::trace; use tokio::sync::RwLock; +use tokio::stream::StreamExt; use typed_builder::TypedBuilder; use uuid::Uuid; use crate::config::Configuration; -use crate::db::models::Artifact; -use crate::db::models::Submit; +use crate::db::models as dbmodels; use crate::endpoint::EndpointConfiguration; use crate::endpoint::EndpointScheduler; +use crate::filestore::Artifact; use crate::filestore::MergedStores; use crate::filestore::ReleaseStore; use crate::filestore::StagingStore; -use crate::job::JobSet; +use crate::job::JobDefinition; use crate::job::RunnableJob; +use crate::job::Tree as JobTree; use crate::source::SourceCache; use crate::util::progress::ProgressBars; @@ -39,8 +41,9 @@ pub struct Orchestrator<'a> { progress_generator: ProgressBars, merged_stores: MergedStores, source_cache: SourceCache, - jobsets: Vec<JobSet>, + jobtree: JobTree, config: &'a Configuration, + database: Arc<PgConnection>, } #[derive(TypedBuilder)] @@ -50,9 +53,9 @@ pub struct OrchestratorSetup<'a> { staging_store: Arc<RwLock<StagingStore>>, release_store: Arc<RwLock<ReleaseStore>>, source_cache: SourceCache, - jobsets: Vec<JobSet>, + jobtree: JobTree, database: Arc<PgConnection>, - submit: Submit, + submit: dbmodels::Submit, log_dir: Option<PathBuf>, config: &'a Configuration, } @@ -62,7 +65,7 @@ impl<'a> OrchestratorSetup<'a> { let scheduler = EndpointScheduler::setup( self.endpoint_config, self.staging_store.clone(), - self.database, + self.database.clone(), self.submit.clone(), self.log_dir, ) @@ -73,125 +76,162 @@ impl<'a> OrchestratorSetup<'a> { progress_generator: self.progress_generator, merged_stores: MergedStores::new(self.release_store, self.staging_store), source_cache: self.source_cache, - jobsets: self.jobsets, + jobtree: self.jobtree, config: self.config, + database: self.database, }) } } +/// Helper type +/// +/// Represents a result that came from the run of a job inside a container +/// +/// It is either a list of artifacts (with their respective database artifact objects) +/// or a UUID and an Error object, where the UUID is the job UUID and the error is the +/// anyhow::Error that was issued. +type JobResult = std::result::Result<Vec<(Artifact, dbmodels::Artifact)>, (Uuid, Error)>; + impl<'a> Orchestrator<'a> { - pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, anyhow::Error)>> { - for jobset in self.jobsets.into_iter() { - let errs = Self::run_jobset( - &self.scheduler, - &self.merged_stores, - &self.source_cache, - &self.config, - &self.progress_generator, - jobset, - output, - ) - .await?; + pub async fn run(self, output: &mut Vec<dbmodels::Artifact>) -> Result<Vec<(Uuid, Error)>> { + let (results, errors) = self.run_tree().await?; + output.extend(results.into_iter().map(|(_, dba)| dba)); + Ok(errors) + } + + async fn run_tree(self) -> Result<(Vec<(Artifact, dbmodels::Artifact)>, Vec<(Uuid, Error)>)> { + use futures::FutureExt; + + let mut already_built = vec![]; + let mut artifacts = vec![]; + let mut errors = vec![]; + + loop { + // loop{} + // until for all elements of self.jobtree, the uuid exists in already_built + // + // for each element in jobtree + // where dependencies(element) all in already_built + // run_job_for(element) + // + // for results from run_job_for calls + // remember UUID in already_built + // put built artifacts in artifacts + // if error, abort everything + // + // + let multibar = Arc::new(indicatif::MultiProgress::new()); + let build_results = self.jobtree + .inner() + .iter() + .filter(|(uuid, jobdef)| { // select all jobs where all dependencies are in `already_built` + trace!("Filtering job definition: {:?}", jobdef); + jobdef.dependencies.iter().all(|d| already_built.contains(d)) && !already_built.contains(uuid) + }) + .map(|(uuid, jobdef)| { + trace!("Running job {}", uuid); + let bar = multibar.add(self.progress_generator.bar()); + let uuid = uuid.clone(); + self.run_job(jobdef, bar).map(move |r| (uuid, r)) + }) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Vec<(_, Result<JobResult>)>>(); + + let multibar_block = tokio::task::spawn_blocking(move || multibar.join()); + let (_, build_results) = tokio::join!(multibar_block, build_results); + + for (uuid, artifact_result) in build_results.into_iter() { + already_built.push(uuid); + + match artifact_result { + Ok(Ok(mut arts)) => artifacts.append(&mut arts), + Ok(Err((uuid, e))) => { // error during job running + log::error!("Error for job {} = {}", uuid, e); + errors.push((uuid, e)); + }, + + Err(e) => return Err(e), // error during container execution + } + } + + if !errors.is_empty() { + break + } - if !errs.is_empty() { - return Ok(errs); + // already_built.sort(); // TODO: optimization for binary search in + // above and below contains() clause + + if self.jobtree.inner().iter().all(|(uuid, _)| already_built.contains(uuid)) { + break } } - Ok(vec![]) + Ok((artifacts, errors)) } - async fn run_jobset( - scheduler: &EndpointScheduler, - merged_store: &MergedStores, - source_cache: &SourceCache, - config: &Configuration, - progress_generator: &ProgressBars, - jobset: JobSet, - output: &mut Vec<Artifact>, - ) -> Result<Vec<(Uuid, anyhow::Error)>> { - use tokio::stream::StreamExt; - - let multibar = Arc::new(indicatif::MultiProgress::new()); - let results = jobset // run the jobs in the set - .into_runables(&merged_store, source_cache, config) - .await? - .into_iter() - .map(|runnable| { - let bar = multibar.add(progress_generator.bar()); - - async { - let uuid = *runnable.uuid(); - Self::run_runnable(runnable, scheduler, bar) - .await - .map_err(|e| (uuid, e)) - } - }) - .collect::<futures::stream::FuturesUnordered<_>>() - .collect::<Vec<std::result::Result<Vec<Artifact>, (Uuid, Error)>>>(); - - let multibar_block = tokio::task::spawn_blocking(move || multibar.join()); - - let (results, barres) = tokio::join!(results, multibar_block); - let _ = barres?; - let (okays, errors): (Vec<_>, Vec<_>) = results - .into_iter() - .inspect(|e| trace!("Processing result from jobset run: {:?}", e)) - .partition(|e| e.is_ok()); - - let results = okays - .into_iter() - .filter_map(Result::ok) - .flatten() - .collect::<Vec<Artifact>>(); - - { - // check if all paths that were written are actually there in the staging store - let staging_store_lock = merged_store.staging().read().await; - - trace!("Checking {} results...", results.len()); - for artifact in results.iter() { - let a_path = artifact.path_buf(); - trace!("Checking path: {}", a_path.display()); - if !staging_store_lock.path_exists_in_store_root(&a_path) { - return Err(anyhow!( - "Result path {} is missing from staging store", - a_path.display() - )) - .with_context(|| { - anyhow!( - "Should be: {}/{}", - staging_store_lock.root_path().display(), - a_path.display() - ) + async fn run_job(&self, jobdef: &JobDefinition, bar: ProgressBar) -> Result<JobResult> { + let dependency_artifacts = self.get_dependency_artifacts_for_jobs(&jobdef.dependencies).await?; + bar.set_message("Preparing..."); + + let runnable = RunnableJob::build_from_job( + &jobdef.job, + &self.source_cache, + &self.config, + dependency_artifacts) + .await?; + + bar.set_message("Scheduling..."); + let job_uuid = jobdef.job.uuid().clone(); + match self.scheduler.schedule_job(runnable, bar).await?.run().await { + Err(e) => return Ok(Err((job_uuid, e))), + Ok(db_artifacts) => { + db_artifacts.into_iter() + .map(|db_artifact| async { + trace!("Getting store Artifact for db Artifact: {:?}", db_artifact); + let art = self.get_store_artifact_for(&db_artifact).await?; + trace!("Store Artifact: {:?}", art); + Ok(Ok((art, db_artifact))) }) - .map_err(Error::from); - } - } + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Result<JobResult>>() + .await + }, } + } - let mut results = results; // rebind - output.append(&mut results); - Ok(errors.into_iter().filter_map(Result::err).collect()) + /// Get all dependency artifacts for the job from the database + /// + /// Use the JobDefinition object and find all dependency outputs in the database + async fn get_dependency_artifacts_for_jobs(&self, uuids: &[Uuid]) -> Result<Vec<Artifact>> { + use crate::schema; + use crate::diesel::ExpressionMethods; + use crate::diesel::QueryDsl; + use crate::diesel::RunQueryDsl; + + // Pseudo code: + // + // * return for uuid in uuids: + // self.database.get(job).get_artifacts() + + schema::artifacts::table + .left_outer_join(schema::jobs::table) + .filter(schema::jobs::uuid.eq_any(uuids)) + .select(schema::artifacts::all_columns) + .load::<dbmodels::Artifact>(&*self.database)? + .iter() + .map(|dbart| self.get_store_artifact_for(dbart)) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect() + .await } - async fn run_runnable( - runnable: RunnableJob, - scheduler: &EndpointScheduler, - bar: indicatif::ProgressBar, - ) -> Result<Vec<Artifact>> { - let job_id = *runnable.uuid(); - trace!( - "Runnable {} for package {}", - job_id, - runnable.package().name() - ); - - let jobhandle = scheduler.schedule_job(runnable, bar).await?; - trace!("Jobhandle -> {:?}", jobhandle); - - let r = jobhandle.run().await; - trace!("Found result in job {}: {:?}", job_id, r); - r + async fn get_store_artifact_for(&self, db_artifact: &dbmodels::Artifact) -> Result<Artifact> { + let p = PathBuf::from(&db_artifact.path); + self.merged_stores + .get_artifact_by_path(&p) + .await? + .ok_or_else(|| { + anyhow!("Artifact not found in {}", p.display()) + }) } } |