summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs262
1 files changed, 151 insertions, 111 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 7fec555..03a4cd2 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -12,25 +12,27 @@ use std::path::PathBuf;
use std::sync::Arc;
use anyhow::anyhow;
-use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use diesel::PgConnection;
+use indicatif::ProgressBar;
use log::trace;
use tokio::sync::RwLock;
+use tokio::stream::StreamExt;
use typed_builder::TypedBuilder;
use uuid::Uuid;
use crate::config::Configuration;
-use crate::db::models::Artifact;
-use crate::db::models::Submit;
+use crate::db::models as dbmodels;
use crate::endpoint::EndpointConfiguration;
use crate::endpoint::EndpointScheduler;
+use crate::filestore::Artifact;
use crate::filestore::MergedStores;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
-use crate::job::JobSet;
+use crate::job::JobDefinition;
use crate::job::RunnableJob;
+use crate::job::Tree as JobTree;
use crate::source::SourceCache;
use crate::util::progress::ProgressBars;
@@ -39,8 +41,9 @@ pub struct Orchestrator<'a> {
progress_generator: ProgressBars,
merged_stores: MergedStores,
source_cache: SourceCache,
- jobsets: Vec<JobSet>,
+ jobtree: JobTree,
config: &'a Configuration,
+ database: Arc<PgConnection>,
}
#[derive(TypedBuilder)]
@@ -50,9 +53,9 @@ pub struct OrchestratorSetup<'a> {
staging_store: Arc<RwLock<StagingStore>>,
release_store: Arc<RwLock<ReleaseStore>>,
source_cache: SourceCache,
- jobsets: Vec<JobSet>,
+ jobtree: JobTree,
database: Arc<PgConnection>,
- submit: Submit,
+ submit: dbmodels::Submit,
log_dir: Option<PathBuf>,
config: &'a Configuration,
}
@@ -62,7 +65,7 @@ impl<'a> OrchestratorSetup<'a> {
let scheduler = EndpointScheduler::setup(
self.endpoint_config,
self.staging_store.clone(),
- self.database,
+ self.database.clone(),
self.submit.clone(),
self.log_dir,
)
@@ -73,125 +76,162 @@ impl<'a> OrchestratorSetup<'a> {
progress_generator: self.progress_generator,
merged_stores: MergedStores::new(self.release_store, self.staging_store),
source_cache: self.source_cache,
- jobsets: self.jobsets,
+ jobtree: self.jobtree,
config: self.config,
+ database: self.database,
})
}
}
+/// Helper type
+///
+/// Represents a result that came from the run of a job inside a container
+///
+/// It is either a list of artifacts (with their respective database artifact objects)
+/// or a UUID and an Error object, where the UUID is the job UUID and the error is the
+/// anyhow::Error that was issued.
+type JobResult = std::result::Result<Vec<(Artifact, dbmodels::Artifact)>, (Uuid, Error)>;
+
impl<'a> Orchestrator<'a> {
- pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, anyhow::Error)>> {
- for jobset in self.jobsets.into_iter() {
- let errs = Self::run_jobset(
- &self.scheduler,
- &self.merged_stores,
- &self.source_cache,
- &self.config,
- &self.progress_generator,
- jobset,
- output,
- )
- .await?;
+ pub async fn run(self, output: &mut Vec<dbmodels::Artifact>) -> Result<Vec<(Uuid, Error)>> {
+ let (results, errors) = self.run_tree().await?;
+ output.extend(results.into_iter().map(|(_, dba)| dba));
+ Ok(errors)
+ }
+
+ async fn run_tree(self) -> Result<(Vec<(Artifact, dbmodels::Artifact)>, Vec<(Uuid, Error)>)> {
+ use futures::FutureExt;
+
+ let mut already_built = vec![];
+ let mut artifacts = vec![];
+ let mut errors = vec![];
+
+ loop {
+ // loop{}
+ // until for all elements of self.jobtree, the uuid exists in already_built
+ //
+ // for each element in jobtree
+ // where dependencies(element) all in already_built
+ // run_job_for(element)
+ //
+ // for results from run_job_for calls
+ // remember UUID in already_built
+ // put built artifacts in artifacts
+ // if error, abort everything
+ //
+ //
+ let multibar = Arc::new(indicatif::MultiProgress::new());
+ let build_results = self.jobtree
+ .inner()
+ .iter()
+ .filter(|(uuid, jobdef)| { // select all jobs where all dependencies are in `already_built`
+ trace!("Filtering job definition: {:?}", jobdef);
+ jobdef.dependencies.iter().all(|d| already_built.contains(d)) && !already_built.contains(uuid)
+ })
+ .map(|(uuid, jobdef)| {
+ trace!("Running job {}", uuid);
+ let bar = multibar.add(self.progress_generator.bar());
+ let uuid = uuid.clone();
+ self.run_job(jobdef, bar).map(move |r| (uuid, r))
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Vec<(_, Result<JobResult>)>>();
+
+ let multibar_block = tokio::task::spawn_blocking(move || multibar.join());
+ let (_, build_results) = tokio::join!(multibar_block, build_results);
+
+ for (uuid, artifact_result) in build_results.into_iter() {
+ already_built.push(uuid);
+
+ match artifact_result {
+ Ok(Ok(mut arts)) => artifacts.append(&mut arts),
+ Ok(Err((uuid, e))) => { // error during job running
+ log::error!("Error for job {} = {}", uuid, e);
+ errors.push((uuid, e));
+ },
+
+ Err(e) => return Err(e), // error during container execution
+ }
+ }
+
+ if !errors.is_empty() {
+ break
+ }
- if !errs.is_empty() {
- return Ok(errs);
+ // already_built.sort(); // TODO: optimization for binary search in
+ // above and below contains() clause
+
+ if self.jobtree.inner().iter().all(|(uuid, _)| already_built.contains(uuid)) {
+ break
}
}
- Ok(vec![])
+ Ok((artifacts, errors))
}
- async fn run_jobset(
- scheduler: &EndpointScheduler,
- merged_store: &MergedStores,
- source_cache: &SourceCache,
- config: &Configuration,
- progress_generator: &ProgressBars,
- jobset: JobSet,
- output: &mut Vec<Artifact>,
- ) -> Result<Vec<(Uuid, anyhow::Error)>> {
- use tokio::stream::StreamExt;
-
- let multibar = Arc::new(indicatif::MultiProgress::new());
- let results = jobset // run the jobs in the set
- .into_runables(&merged_store, source_cache, config)
- .await?
- .into_iter()
- .map(|runnable| {
- let bar = multibar.add(progress_generator.bar());
-
- async {
- let uuid = *runnable.uuid();
- Self::run_runnable(runnable, scheduler, bar)
- .await
- .map_err(|e| (uuid, e))
- }
- })
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Vec<std::result::Result<Vec<Artifact>, (Uuid, Error)>>>();
-
- let multibar_block = tokio::task::spawn_blocking(move || multibar.join());
-
- let (results, barres) = tokio::join!(results, multibar_block);
- let _ = barres?;
- let (okays, errors): (Vec<_>, Vec<_>) = results
- .into_iter()
- .inspect(|e| trace!("Processing result from jobset run: {:?}", e))
- .partition(|e| e.is_ok());
-
- let results = okays
- .into_iter()
- .filter_map(Result::ok)
- .flatten()
- .collect::<Vec<Artifact>>();
-
- {
- // check if all paths that were written are actually there in the staging store
- let staging_store_lock = merged_store.staging().read().await;
-
- trace!("Checking {} results...", results.len());
- for artifact in results.iter() {
- let a_path = artifact.path_buf();
- trace!("Checking path: {}", a_path.display());
- if !staging_store_lock.path_exists_in_store_root(&a_path) {
- return Err(anyhow!(
- "Result path {} is missing from staging store",
- a_path.display()
- ))
- .with_context(|| {
- anyhow!(
- "Should be: {}/{}",
- staging_store_lock.root_path().display(),
- a_path.display()
- )
+ async fn run_job(&self, jobdef: &JobDefinition, bar: ProgressBar) -> Result<JobResult> {
+ let dependency_artifacts = self.get_dependency_artifacts_for_jobs(&jobdef.dependencies).await?;
+ bar.set_message("Preparing...");
+
+ let runnable = RunnableJob::build_from_job(
+ &jobdef.job,
+ &self.source_cache,
+ &self.config,
+ dependency_artifacts)
+ .await?;
+
+ bar.set_message("Scheduling...");
+ let job_uuid = jobdef.job.uuid().clone();
+ match self.scheduler.schedule_job(runnable, bar).await?.run().await {
+ Err(e) => return Ok(Err((job_uuid, e))),
+ Ok(db_artifacts) => {
+ db_artifacts.into_iter()
+ .map(|db_artifact| async {
+ trace!("Getting store Artifact for db Artifact: {:?}", db_artifact);
+ let art = self.get_store_artifact_for(&db_artifact).await?;
+ trace!("Store Artifact: {:?}", art);
+ Ok(Ok((art, db_artifact)))
})
- .map_err(Error::from);
- }
- }
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<JobResult>>()
+ .await
+ },
}
+ }
- let mut results = results; // rebind
- output.append(&mut results);
- Ok(errors.into_iter().filter_map(Result::err).collect())
+ /// Get all dependency artifacts for the job from the database
+ ///
+ /// Use the JobDefinition object and find all dependency outputs in the database
+ async fn get_dependency_artifacts_for_jobs(&self, uuids: &[Uuid]) -> Result<Vec<Artifact>> {
+ use crate::schema;
+ use crate::diesel::ExpressionMethods;
+ use crate::diesel::QueryDsl;
+ use crate::diesel::RunQueryDsl;
+
+ // Pseudo code:
+ //
+ // * return for uuid in uuids:
+ // self.database.get(job).get_artifacts()
+
+ schema::artifacts::table
+ .left_outer_join(schema::jobs::table)
+ .filter(schema::jobs::uuid.eq_any(uuids))
+ .select(schema::artifacts::all_columns)
+ .load::<dbmodels::Artifact>(&*self.database)?
+ .iter()
+ .map(|dbart| self.get_store_artifact_for(dbart))
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect()
+ .await
}
- async fn run_runnable(
- runnable: RunnableJob,
- scheduler: &EndpointScheduler,
- bar: indicatif::ProgressBar,
- ) -> Result<Vec<Artifact>> {
- let job_id = *runnable.uuid();
- trace!(
- "Runnable {} for package {}",
- job_id,
- runnable.package().name()
- );
-
- let jobhandle = scheduler.schedule_job(runnable, bar).await?;
- trace!("Jobhandle -> {:?}", jobhandle);
-
- let r = jobhandle.run().await;
- trace!("Found result in job {}: {:?}", job_id, r);
- r
+ async fn get_store_artifact_for(&self, db_artifact: &dbmodels::Artifact) -> Result<Artifact> {
+ let p = PathBuf::from(&db_artifact.path);
+ self.merged_stores
+ .get_artifact_by_path(&p)
+ .await?
+ .ok_or_else(|| {
+ anyhow!("Artifact not found in {}", p.display())
+ })
}
}