summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-02-08 09:36:23 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-02-08 09:36:23 +0100
commit2d82860bbd867a328fa1ab21e77705439ba4636b (patch)
treeef91551d6d8cbc9a807537d004b5aff653dba2be /src/orchestrator/orchestrator.rs
parent66c1fc79ec635429991d5bc632fd149093758e99 (diff)
parent7a5f0f91b1b1fc29f5148df0f88d26cdbaf474e0 (diff)
Merge branch 'test-diamond-dependencies'
This merge changes the organizational structure of the JobTask running from a Tree to a DAG. We're close to 0.1.0 with this!
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs103
1 files changed, 63 insertions, 40 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index cedc549..f5dd8cb 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -10,6 +10,7 @@
#![allow(unused)]
+use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
@@ -37,7 +38,7 @@ use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
use crate::job::JobDefinition;
use crate::job::RunnableJob;
-use crate::job::Tree as JobTree;
+use crate::job::Dag;
use crate::source::SourceCache;
use crate::util::progress::ProgressBars;
@@ -45,7 +46,7 @@ use crate::util::progress::ProgressBars;
/// The Orchestrator
///
/// The Orchestrator is used to orchestrate the work on one submit.
-/// On a very high level: It uses a [JobTree](crate::job::Tree) to build a number (list) of
+/// On a very high level: It uses a [Dag](crate::job::Dag) to build a number (list) of
/// [JobTasks](crate::orchestrator::JobTask) that is then run concurrently.
///
/// Because of the implementation of [JobTask], the work happens in
@@ -153,7 +154,7 @@ pub struct Orchestrator<'a> {
progress_generator: ProgressBars,
merged_stores: MergedStores,
source_cache: SourceCache,
- jobtree: JobTree,
+ jobdag: Dag,
config: &'a Configuration,
database: Arc<PgConnection>,
}
@@ -165,7 +166,7 @@ pub struct OrchestratorSetup<'a> {
staging_store: Arc<RwLock<StagingStore>>,
release_store: Arc<RwLock<ReleaseStore>>,
source_cache: SourceCache,
- jobtree: JobTree,
+ jobdag: Dag,
database: Arc<PgConnection>,
submit: dbmodels::Submit,
log_dir: Option<PathBuf>,
@@ -188,7 +189,7 @@ impl<'a> OrchestratorSetup<'a> {
progress_generator: self.progress_generator,
merged_stores: MergedStores::new(self.release_store, self.staging_store),
source_cache: self.source_cache,
- jobtree: self.jobtree,
+ jobdag: self.jobdag,
config: self.config,
database: self.database,
})
@@ -202,19 +203,19 @@ impl<'a> OrchestratorSetup<'a> {
/// It is either a list of artifacts with the UUID of the job they were produced by,
/// or a UUID and an Error object, where the UUID is the job UUID and the error is the
/// anyhow::Error that was issued.
-type JobResult = std::result::Result<Vec<(Uuid, Vec<Artifact>)>, Vec<(Uuid, Error)>>;
+type JobResult = std::result::Result<HashMap<Uuid, Vec<Artifact>>, HashMap<Uuid, Error>>;
impl<'a> Orchestrator<'a> {
- pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, Error)>> {
+ pub async fn run(self, output: &mut Vec<Artifact>) -> Result<HashMap<Uuid, Error>> {
let (results, errors) = self.run_tree().await?;
output.extend(results.into_iter());
Ok(errors)
}
- async fn run_tree(self) -> Result<(Vec<Artifact>, Vec<(Uuid, Error)>)> {
+ async fn run_tree(self) -> Result<(Vec<Artifact>, HashMap<Uuid, Error>)> {
let multibar = Arc::new(indicatif::MultiProgress::new());
- // For each job in the jobtree, built a tuple with
+ // For each job in the jobdag, built a tuple with
//
// 1. The receiver that is used by the task to receive results from dependency tasks from
// 2. The task itself (as a TaskPreparation object)
@@ -223,16 +224,15 @@ impl<'a> Orchestrator<'a> {
// This is an Option<> because we need to set it later and the root of the tree needs a
// special handling, as this very function will wait on a receiver that gets the results
// of the root task
- let jobs: Vec<(Receiver<JobResult>, TaskPreparation, Sender<JobResult>, _)> = self.jobtree
- .inner()
+ let jobs: Vec<(Receiver<JobResult>, TaskPreparation, Sender<JobResult>, _)> = self.jobdag
.iter()
- .map(|(uuid, jobdef)| {
+ .map(|jobdef| {
// We initialize the channel with 100 elements here, as there is unlikely a task
// that depends on 100 other tasks.
// Either way, this might be increased in future.
let (sender, receiver) = tokio::sync::mpsc::channel(100);
- trace!("Creating TaskPreparation object for job {}", uuid);
+ trace!("Creating TaskPreparation object for job {}", jobdef.job.uuid());
let bar = self.progress_generator.bar();
let bar = multibar.add(bar);
bar.set_length(100);
@@ -247,7 +247,7 @@ impl<'a> Orchestrator<'a> {
database: self.database.clone(),
};
- (receiver, tp, sender, std::cell::RefCell::new(None as Option<Sender<JobResult>>))
+ (receiver, tp, sender, std::cell::RefCell::new(None as Option<Vec<Sender<JobResult>>>))
})
.collect();
@@ -262,9 +262,26 @@ impl<'a> Orchestrator<'a> {
// find the job that depends on this job
// use the sender of the found job and set it as sender for this job
for job in jobs.iter() {
- *job.3.borrow_mut() = jobs.iter()
- .find(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid()))
- .map(|j| j.2.clone());
+ if let Some(mut v) = job.3.borrow_mut().as_mut() {
+ v.extend({
+ jobs.iter()
+ .filter(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid()))
+ .map(|j| j.2.clone())
+ });
+ } else {
+ *job.3.borrow_mut() = {
+ let depending_on_job = jobs.iter()
+ .filter(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid()))
+ .map(|j| j.2.clone())
+ .collect::<Vec<Sender<JobResult>>>();
+
+ if depending_on_job.is_empty() {
+ None
+ } else {
+ Some(depending_on_job)
+ }
+ };
+ }
}
// Find the id of the root task
@@ -309,7 +326,7 @@ impl<'a> Orchestrator<'a> {
receiver: prep.0,
// the sender is set or we need to use the root sender
- sender: prep.3.into_inner().unwrap_or(root_sender),
+ sender: prep.3.into_inner().unwrap_or_else(|| vec![root_sender]),
}
})
.map(|task| task.run())
@@ -323,7 +340,7 @@ impl<'a> Orchestrator<'a> {
None => Err(anyhow!("No result received...")),
Some(Ok(results)) => {
let results = results.into_iter().map(|tpl| tpl.1.into_iter()).flatten().collect();
- Ok((results, vec![]))
+ Ok((results, HashMap::with_capacity(0)))
},
Some(Err(errors)) => Ok((vec![], errors)),
}
@@ -337,8 +354,7 @@ impl<'a> Orchestrator<'a> {
///
/// This simply holds data and does not contain any more functionality
struct TaskPreparation<'a> {
- /// The UUID of this job
- jobdef: &'a JobDefinition,
+ jobdef: JobDefinition<'a>,
bar: ProgressBar,
@@ -353,8 +369,7 @@ struct TaskPreparation<'a> {
///
/// This type represents a task for a job that can immediately be executed (see `JobTask::run()`).
struct JobTask<'a> {
- /// The UUID of this job
- jobdef: &'a JobDefinition,
+ jobdef: JobDefinition<'a>,
bar: ProgressBar,
@@ -368,7 +383,7 @@ struct JobTask<'a> {
receiver: Receiver<JobResult>,
/// Channel to send the own build outputs to
- sender: Sender<JobResult>,
+ sender: Vec<Sender<JobResult>>,
}
impl<'a> JobTask<'a> {
@@ -385,15 +400,15 @@ impl<'a> JobTask<'a> {
// A list of job run results from dependencies that were received from the tasks for the
// dependencies
- let mut received_dependencies: Vec<(Uuid, Vec<Artifact>)> = vec![];
+ let mut received_dependencies: HashMap<Uuid, Vec<Artifact>> = HashMap::new();
// A list of errors that were received from the tasks for the dependencies
- let mut received_errors: Vec<(Uuid, Error)> = vec![];
+ let mut received_errors: HashMap<Uuid, Error> = HashMap::with_capacity(self.jobdef.dependencies.len());
// Helper function to check whether all UUIDs are in a list of UUIDs
- let all_dependencies_are_in = |dependency_uuids: &[Uuid], list: &[(Uuid, Vec<_>)]| {
+ let all_dependencies_are_in = |dependency_uuids: &[Uuid], list: &HashMap<Uuid, Vec<_>>| {
dependency_uuids.iter().all(|dependency_uuid| {
- list.iter().map(|tpl| tpl.0).any(|id| id == *dependency_uuid)
+ list.keys().any(|id| id == dependency_uuid)
})
};
@@ -421,7 +436,10 @@ impl<'a> JobTask<'a> {
// if there are any errors from child tasks
if !received_errors.is_empty() {
// send them to the parent,...
- self.sender.send(Err(received_errors)).await;
+ //
+ // We only send to one parent, because it doesn't matter
+ // And we know that we have at least one sender
+ self.sender[0].send(Err(received_errors)).await;
// ... and stop operation, because the whole tree will fail anyways.
return Ok(())
@@ -447,8 +465,8 @@ impl<'a> JobTask<'a> {
// to
// Vec<Artifact>
let dependency_artifacts = received_dependencies
- .iter()
- .map(|tpl| tpl.1.iter())
+ .values()
+ .map(|v| v.iter())
.flatten()
.cloned()
.collect();
@@ -480,17 +498,22 @@ impl<'a> JobTask<'a> {
Err(e) => {
trace!("[{}]: Scheduler returned error = {:?}", self.jobdef.job.uuid(), e);
// ... and we send that to our parent
- self.sender.send(Err(vec![(job_uuid, e)])).await?;
+ //
+ // We only send to one parent, because it doesn't matter anymore
+ // We know that we have at least one sender available
+ let mut errormap = HashMap::with_capacity(1);
+ errormap.insert(job_uuid, e);
+ self.sender[0].send(Err(errormap)).await?;
},
// if the scheduler run reports success,
// it returns the database artifact objects it created!
Ok(artifacts) => {
trace!("[{}]: Scheduler returned artifacts = {:?}", self.jobdef.job.uuid(), artifacts);
- received_dependencies.push((*self.jobdef.job.uuid(), artifacts));
- self.sender
- .send(Ok(received_dependencies))
- .await?;
+ received_dependencies.insert(*self.jobdef.job.uuid(), artifacts);
+ for s in self.sender {
+ s.send(Ok(received_dependencies.clone())).await?;
+ }
},
}
@@ -505,20 +528,20 @@ impl<'a> JobTask<'a> {
///
/// Return Ok(true) if we should continue operation
/// Return Ok(false) if the channel is empty and we're done receiving
- async fn perform_receive(&mut self, received_dependencies: &mut Vec<(Uuid, Vec<Artifact>)>, received_errors: &mut Vec<(Uuid, Error)>) -> Result<bool> {
+ async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<Artifact>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> {
match self.receiver.recv().await {
Some(Ok(mut v)) => {
// The task we depend on succeeded and returned an
// (uuid of the job, [Artifact])
trace!("[{}]: Received: {:?}", self.jobdef.job.uuid(), v);
- received_dependencies.append(&mut v);
+ received_dependencies.extend(v);
Ok(true)
},
Some(Err(mut e)) => {
// The task we depend on failed
// we log that error for now
trace!("[{}]: Received: {:?}", self.jobdef.job.uuid(), e);
- received_errors.append(&mut e);
+ received_errors.extend(e);
Ok(true)
},
None => {
@@ -526,7 +549,7 @@ impl<'a> JobTask<'a> {
trace!("[{}]: Received nothing, channel seems to be empty", self.jobdef.job.uuid());
// Find all dependencies that we need but which are not received
- let received = received_dependencies.iter().map(|tpl| tpl.0).collect::<Vec<_>>();
+ let received = received_dependencies.keys().collect::<Vec<_>>();
let missing_deps: Vec<_> = self.jobdef
.dependencies
.iter()