diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-02-08 09:36:23 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-02-08 09:36:23 +0100 |
commit | 2d82860bbd867a328fa1ab21e77705439ba4636b (patch) | |
tree | ef91551d6d8cbc9a807537d004b5aff653dba2be /src/orchestrator/orchestrator.rs | |
parent | 66c1fc79ec635429991d5bc632fd149093758e99 (diff) | |
parent | 7a5f0f91b1b1fc29f5148df0f88d26cdbaf474e0 (diff) |
Merge branch 'test-diamond-dependencies'
This merge changes the organizational structure of the JobTask running
from a Tree to a DAG.
We're close to 0.1.0 with this!
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 103 |
1 files changed, 63 insertions, 40 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index cedc549..f5dd8cb 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -10,6 +10,7 @@ #![allow(unused)] +use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; @@ -37,7 +38,7 @@ use crate::filestore::ReleaseStore; use crate::filestore::StagingStore; use crate::job::JobDefinition; use crate::job::RunnableJob; -use crate::job::Tree as JobTree; +use crate::job::Dag; use crate::source::SourceCache; use crate::util::progress::ProgressBars; @@ -45,7 +46,7 @@ use crate::util::progress::ProgressBars; /// The Orchestrator /// /// The Orchestrator is used to orchestrate the work on one submit. -/// On a very high level: It uses a [JobTree](crate::job::Tree) to build a number (list) of +/// On a very high level: It uses a [Dag](crate::job::Dag) to build a number (list) of /// [JobTasks](crate::orchestrator::JobTask) that is then run concurrently. /// /// Because of the implementation of [JobTask], the work happens in @@ -153,7 +154,7 @@ pub struct Orchestrator<'a> { progress_generator: ProgressBars, merged_stores: MergedStores, source_cache: SourceCache, - jobtree: JobTree, + jobdag: Dag, config: &'a Configuration, database: Arc<PgConnection>, } @@ -165,7 +166,7 @@ pub struct OrchestratorSetup<'a> { staging_store: Arc<RwLock<StagingStore>>, release_store: Arc<RwLock<ReleaseStore>>, source_cache: SourceCache, - jobtree: JobTree, + jobdag: Dag, database: Arc<PgConnection>, submit: dbmodels::Submit, log_dir: Option<PathBuf>, @@ -188,7 +189,7 @@ impl<'a> OrchestratorSetup<'a> { progress_generator: self.progress_generator, merged_stores: MergedStores::new(self.release_store, self.staging_store), source_cache: self.source_cache, - jobtree: self.jobtree, + jobdag: self.jobdag, config: self.config, database: self.database, }) @@ -202,19 +203,19 @@ impl<'a> OrchestratorSetup<'a> { /// It is either a list of artifacts with the UUID of the job they were produced by, /// or a UUID and an Error object, where the UUID is the job UUID and the error is the /// anyhow::Error that was issued. -type JobResult = std::result::Result<Vec<(Uuid, Vec<Artifact>)>, Vec<(Uuid, Error)>>; +type JobResult = std::result::Result<HashMap<Uuid, Vec<Artifact>>, HashMap<Uuid, Error>>; impl<'a> Orchestrator<'a> { - pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, Error)>> { + pub async fn run(self, output: &mut Vec<Artifact>) -> Result<HashMap<Uuid, Error>> { let (results, errors) = self.run_tree().await?; output.extend(results.into_iter()); Ok(errors) } - async fn run_tree(self) -> Result<(Vec<Artifact>, Vec<(Uuid, Error)>)> { + async fn run_tree(self) -> Result<(Vec<Artifact>, HashMap<Uuid, Error>)> { let multibar = Arc::new(indicatif::MultiProgress::new()); - // For each job in the jobtree, built a tuple with + // For each job in the jobdag, built a tuple with // // 1. The receiver that is used by the task to receive results from dependency tasks from // 2. The task itself (as a TaskPreparation object) @@ -223,16 +224,15 @@ impl<'a> Orchestrator<'a> { // This is an Option<> because we need to set it later and the root of the tree needs a // special handling, as this very function will wait on a receiver that gets the results // of the root task - let jobs: Vec<(Receiver<JobResult>, TaskPreparation, Sender<JobResult>, _)> = self.jobtree - .inner() + let jobs: Vec<(Receiver<JobResult>, TaskPreparation, Sender<JobResult>, _)> = self.jobdag .iter() - .map(|(uuid, jobdef)| { + .map(|jobdef| { // We initialize the channel with 100 elements here, as there is unlikely a task // that depends on 100 other tasks. // Either way, this might be increased in future. let (sender, receiver) = tokio::sync::mpsc::channel(100); - trace!("Creating TaskPreparation object for job {}", uuid); + trace!("Creating TaskPreparation object for job {}", jobdef.job.uuid()); let bar = self.progress_generator.bar(); let bar = multibar.add(bar); bar.set_length(100); @@ -247,7 +247,7 @@ impl<'a> Orchestrator<'a> { database: self.database.clone(), }; - (receiver, tp, sender, std::cell::RefCell::new(None as Option<Sender<JobResult>>)) + (receiver, tp, sender, std::cell::RefCell::new(None as Option<Vec<Sender<JobResult>>>)) }) .collect(); @@ -262,9 +262,26 @@ impl<'a> Orchestrator<'a> { // find the job that depends on this job // use the sender of the found job and set it as sender for this job for job in jobs.iter() { - *job.3.borrow_mut() = jobs.iter() - .find(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid())) - .map(|j| j.2.clone()); + if let Some(mut v) = job.3.borrow_mut().as_mut() { + v.extend({ + jobs.iter() + .filter(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid())) + .map(|j| j.2.clone()) + }); + } else { + *job.3.borrow_mut() = { + let depending_on_job = jobs.iter() + .filter(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid())) + .map(|j| j.2.clone()) + .collect::<Vec<Sender<JobResult>>>(); + + if depending_on_job.is_empty() { + None + } else { + Some(depending_on_job) + } + }; + } } // Find the id of the root task @@ -309,7 +326,7 @@ impl<'a> Orchestrator<'a> { receiver: prep.0, // the sender is set or we need to use the root sender - sender: prep.3.into_inner().unwrap_or(root_sender), + sender: prep.3.into_inner().unwrap_or_else(|| vec![root_sender]), } }) .map(|task| task.run()) @@ -323,7 +340,7 @@ impl<'a> Orchestrator<'a> { None => Err(anyhow!("No result received...")), Some(Ok(results)) => { let results = results.into_iter().map(|tpl| tpl.1.into_iter()).flatten().collect(); - Ok((results, vec![])) + Ok((results, HashMap::with_capacity(0))) }, Some(Err(errors)) => Ok((vec![], errors)), } @@ -337,8 +354,7 @@ impl<'a> Orchestrator<'a> { /// /// This simply holds data and does not contain any more functionality struct TaskPreparation<'a> { - /// The UUID of this job - jobdef: &'a JobDefinition, + jobdef: JobDefinition<'a>, bar: ProgressBar, @@ -353,8 +369,7 @@ struct TaskPreparation<'a> { /// /// This type represents a task for a job that can immediately be executed (see `JobTask::run()`). struct JobTask<'a> { - /// The UUID of this job - jobdef: &'a JobDefinition, + jobdef: JobDefinition<'a>, bar: ProgressBar, @@ -368,7 +383,7 @@ struct JobTask<'a> { receiver: Receiver<JobResult>, /// Channel to send the own build outputs to - sender: Sender<JobResult>, + sender: Vec<Sender<JobResult>>, } impl<'a> JobTask<'a> { @@ -385,15 +400,15 @@ impl<'a> JobTask<'a> { // A list of job run results from dependencies that were received from the tasks for the // dependencies - let mut received_dependencies: Vec<(Uuid, Vec<Artifact>)> = vec![]; + let mut received_dependencies: HashMap<Uuid, Vec<Artifact>> = HashMap::new(); // A list of errors that were received from the tasks for the dependencies - let mut received_errors: Vec<(Uuid, Error)> = vec![]; + let mut received_errors: HashMap<Uuid, Error> = HashMap::with_capacity(self.jobdef.dependencies.len()); // Helper function to check whether all UUIDs are in a list of UUIDs - let all_dependencies_are_in = |dependency_uuids: &[Uuid], list: &[(Uuid, Vec<_>)]| { + let all_dependencies_are_in = |dependency_uuids: &[Uuid], list: &HashMap<Uuid, Vec<_>>| { dependency_uuids.iter().all(|dependency_uuid| { - list.iter().map(|tpl| tpl.0).any(|id| id == *dependency_uuid) + list.keys().any(|id| id == dependency_uuid) }) }; @@ -421,7 +436,10 @@ impl<'a> JobTask<'a> { // if there are any errors from child tasks if !received_errors.is_empty() { // send them to the parent,... - self.sender.send(Err(received_errors)).await; + // + // We only send to one parent, because it doesn't matter + // And we know that we have at least one sender + self.sender[0].send(Err(received_errors)).await; // ... and stop operation, because the whole tree will fail anyways. return Ok(()) @@ -447,8 +465,8 @@ impl<'a> JobTask<'a> { // to // Vec<Artifact> let dependency_artifacts = received_dependencies - .iter() - .map(|tpl| tpl.1.iter()) + .values() + .map(|v| v.iter()) .flatten() .cloned() .collect(); @@ -480,17 +498,22 @@ impl<'a> JobTask<'a> { Err(e) => { trace!("[{}]: Scheduler returned error = {:?}", self.jobdef.job.uuid(), e); // ... and we send that to our parent - self.sender.send(Err(vec![(job_uuid, e)])).await?; + // + // We only send to one parent, because it doesn't matter anymore + // We know that we have at least one sender available + let mut errormap = HashMap::with_capacity(1); + errormap.insert(job_uuid, e); + self.sender[0].send(Err(errormap)).await?; }, // if the scheduler run reports success, // it returns the database artifact objects it created! Ok(artifacts) => { trace!("[{}]: Scheduler returned artifacts = {:?}", self.jobdef.job.uuid(), artifacts); - received_dependencies.push((*self.jobdef.job.uuid(), artifacts)); - self.sender - .send(Ok(received_dependencies)) - .await?; + received_dependencies.insert(*self.jobdef.job.uuid(), artifacts); + for s in self.sender { + s.send(Ok(received_dependencies.clone())).await?; + } }, } @@ -505,20 +528,20 @@ impl<'a> JobTask<'a> { /// /// Return Ok(true) if we should continue operation /// Return Ok(false) if the channel is empty and we're done receiving - async fn perform_receive(&mut self, received_dependencies: &mut Vec<(Uuid, Vec<Artifact>)>, received_errors: &mut Vec<(Uuid, Error)>) -> Result<bool> { + async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<Artifact>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> { match self.receiver.recv().await { Some(Ok(mut v)) => { // The task we depend on succeeded and returned an // (uuid of the job, [Artifact]) trace!("[{}]: Received: {:?}", self.jobdef.job.uuid(), v); - received_dependencies.append(&mut v); + received_dependencies.extend(v); Ok(true) }, Some(Err(mut e)) => { // The task we depend on failed // we log that error for now trace!("[{}]: Received: {:?}", self.jobdef.job.uuid(), e); - received_errors.append(&mut e); + received_errors.extend(e); Ok(true) }, None => { @@ -526,7 +549,7 @@ impl<'a> JobTask<'a> { trace!("[{}]: Received nothing, channel seems to be empty", self.jobdef.job.uuid()); // Find all dependencies that we need but which are not received - let received = received_dependencies.iter().map(|tpl| tpl.0).collect::<Vec<_>>(); + let received = received_dependencies.keys().collect::<Vec<_>>(); let missing_deps: Vec<_> = self.jobdef .dependencies .iter() |