summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs25
1 files changed, 11 insertions, 14 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index cedc549..bed2195 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -37,7 +37,7 @@ use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
use crate::job::JobDefinition;
use crate::job::RunnableJob;
-use crate::job::Tree as JobTree;
+use crate::job::Dag;
use crate::source::SourceCache;
use crate::util::progress::ProgressBars;
@@ -45,7 +45,7 @@ use crate::util::progress::ProgressBars;
/// The Orchestrator
///
/// The Orchestrator is used to orchestrate the work on one submit.
-/// On a very high level: It uses a [JobTree](crate::job::Tree) to build a number (list) of
+/// On a very high level: It uses a [Dag](crate::job::Dag) to build a number (list) of
/// [JobTasks](crate::orchestrator::JobTask) that is then run concurrently.
///
/// Because of the implementation of [JobTask], the work happens in
@@ -153,7 +153,7 @@ pub struct Orchestrator<'a> {
progress_generator: ProgressBars,
merged_stores: MergedStores,
source_cache: SourceCache,
- jobtree: JobTree,
+ jobdag: Dag,
config: &'a Configuration,
database: Arc<PgConnection>,
}
@@ -165,7 +165,7 @@ pub struct OrchestratorSetup<'a> {
staging_store: Arc<RwLock<StagingStore>>,
release_store: Arc<RwLock<ReleaseStore>>,
source_cache: SourceCache,
- jobtree: JobTree,
+ jobdag: Dag,
database: Arc<PgConnection>,
submit: dbmodels::Submit,
log_dir: Option<PathBuf>,
@@ -188,7 +188,7 @@ impl<'a> OrchestratorSetup<'a> {
progress_generator: self.progress_generator,
merged_stores: MergedStores::new(self.release_store, self.staging_store),
source_cache: self.source_cache,
- jobtree: self.jobtree,
+ jobdag: self.jobdag,
config: self.config,
database: self.database,
})
@@ -214,7 +214,7 @@ impl<'a> Orchestrator<'a> {
async fn run_tree(self) -> Result<(Vec<Artifact>, Vec<(Uuid, Error)>)> {
let multibar = Arc::new(indicatif::MultiProgress::new());
- // For each job in the jobtree, built a tuple with
+ // For each job in the jobdag, built a tuple with
//
// 1. The receiver that is used by the task to receive results from dependency tasks from
// 2. The task itself (as a TaskPreparation object)
@@ -223,16 +223,15 @@ impl<'a> Orchestrator<'a> {
// This is an Option<> because we need to set it later and the root of the tree needs a
// special handling, as this very function will wait on a receiver that gets the results
// of the root task
- let jobs: Vec<(Receiver<JobResult>, TaskPreparation, Sender<JobResult>, _)> = self.jobtree
- .inner()
+ let jobs: Vec<(Receiver<JobResult>, TaskPreparation, Sender<JobResult>, _)> = self.jobdag
.iter()
- .map(|(uuid, jobdef)| {
+ .map(|jobdef| {
// We initialize the channel with 100 elements here, as there is unlikely a task
// that depends on 100 other tasks.
// Either way, this might be increased in future.
let (sender, receiver) = tokio::sync::mpsc::channel(100);
- trace!("Creating TaskPreparation object for job {}", uuid);
+ trace!("Creating TaskPreparation object for job {}", jobdef.job.uuid());
let bar = self.progress_generator.bar();
let bar = multibar.add(bar);
bar.set_length(100);
@@ -337,8 +336,7 @@ impl<'a> Orchestrator<'a> {
///
/// This simply holds data and does not contain any more functionality
struct TaskPreparation<'a> {
- /// The UUID of this job
- jobdef: &'a JobDefinition,
+ jobdef: JobDefinition<'a>,
bar: ProgressBar,
@@ -353,8 +351,7 @@ struct TaskPreparation<'a> {
///
/// This type represents a task for a job that can immediately be executed (see `JobTask::run()`).
struct JobTask<'a> {
- /// The UUID of this job
- jobdef: &'a JobDefinition,
+ jobdef: JobDefinition<'a>,
bar: ProgressBar,