diff options
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 25 |
1 files changed, 11 insertions, 14 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index cedc549..bed2195 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -37,7 +37,7 @@ use crate::filestore::ReleaseStore; use crate::filestore::StagingStore; use crate::job::JobDefinition; use crate::job::RunnableJob; -use crate::job::Tree as JobTree; +use crate::job::Dag; use crate::source::SourceCache; use crate::util::progress::ProgressBars; @@ -45,7 +45,7 @@ use crate::util::progress::ProgressBars; /// The Orchestrator /// /// The Orchestrator is used to orchestrate the work on one submit. -/// On a very high level: It uses a [JobTree](crate::job::Tree) to build a number (list) of +/// On a very high level: It uses a [Dag](crate::job::Dag) to build a number (list) of /// [JobTasks](crate::orchestrator::JobTask) that is then run concurrently. /// /// Because of the implementation of [JobTask], the work happens in @@ -153,7 +153,7 @@ pub struct Orchestrator<'a> { progress_generator: ProgressBars, merged_stores: MergedStores, source_cache: SourceCache, - jobtree: JobTree, + jobdag: Dag, config: &'a Configuration, database: Arc<PgConnection>, } @@ -165,7 +165,7 @@ pub struct OrchestratorSetup<'a> { staging_store: Arc<RwLock<StagingStore>>, release_store: Arc<RwLock<ReleaseStore>>, source_cache: SourceCache, - jobtree: JobTree, + jobdag: Dag, database: Arc<PgConnection>, submit: dbmodels::Submit, log_dir: Option<PathBuf>, @@ -188,7 +188,7 @@ impl<'a> OrchestratorSetup<'a> { progress_generator: self.progress_generator, merged_stores: MergedStores::new(self.release_store, self.staging_store), source_cache: self.source_cache, - jobtree: self.jobtree, + jobdag: self.jobdag, config: self.config, database: self.database, }) @@ -214,7 +214,7 @@ impl<'a> Orchestrator<'a> { async fn run_tree(self) -> Result<(Vec<Artifact>, Vec<(Uuid, Error)>)> { let multibar = Arc::new(indicatif::MultiProgress::new()); - // For each job in the jobtree, built a tuple with + // For each job in the jobdag, built a tuple with // // 1. The receiver that is used by the task to receive results from dependency tasks from // 2. The task itself (as a TaskPreparation object) @@ -223,16 +223,15 @@ impl<'a> Orchestrator<'a> { // This is an Option<> because we need to set it later and the root of the tree needs a // special handling, as this very function will wait on a receiver that gets the results // of the root task - let jobs: Vec<(Receiver<JobResult>, TaskPreparation, Sender<JobResult>, _)> = self.jobtree - .inner() + let jobs: Vec<(Receiver<JobResult>, TaskPreparation, Sender<JobResult>, _)> = self.jobdag .iter() - .map(|(uuid, jobdef)| { + .map(|jobdef| { // We initialize the channel with 100 elements here, as there is unlikely a task // that depends on 100 other tasks. // Either way, this might be increased in future. let (sender, receiver) = tokio::sync::mpsc::channel(100); - trace!("Creating TaskPreparation object for job {}", uuid); + trace!("Creating TaskPreparation object for job {}", jobdef.job.uuid()); let bar = self.progress_generator.bar(); let bar = multibar.add(bar); bar.set_length(100); @@ -337,8 +336,7 @@ impl<'a> Orchestrator<'a> { /// /// This simply holds data and does not contain any more functionality struct TaskPreparation<'a> { - /// The UUID of this job - jobdef: &'a JobDefinition, + jobdef: JobDefinition<'a>, bar: ProgressBar, @@ -353,8 +351,7 @@ struct TaskPreparation<'a> { /// /// This type represents a task for a job that can immediately be executed (see `JobTask::run()`). struct JobTask<'a> { - /// The UUID of this job - jobdef: &'a JobDefinition, + jobdef: JobDefinition<'a>, bar: ProgressBar, |