diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-02-04 15:52:02 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-02-06 11:54:23 +0100 |
commit | efe07be74cf1ae704cb73b0f20c28b33aa46c217 (patch) | |
tree | 2c7b1686fc2878b79d82708bd2335c713326299d /src/orchestrator/orchestrator.rs | |
parent | 8a4ef5e6c0126037412794699d5f80540ffd4802 (diff) |
Rewrite package organizational structure using DAG
This patch reimplements the package orchestration functionality to rely
on a DAG rather than a tree.
A
/ \
B E
/ \ \
C D F
Before this change, the structure the packages were organized in for a
build was a tree.
That did work reasonable well for initial development of butido, because
this is a simple case and the implementation is rather simple, too.
But, packages and their dependencies are not always organized in a tree.
Most of the time, they are organized in a DAG:
.-> C -,
/ \
D > A
\ /
`-> B -ยด
This is a real-world example: A could be a common crypto-library that I
do not want to name here.
B and C could be libraries that use the said crypto-library and D could
be a program that use B and C.
Because said crypto-library builds rather long, building it twice and
throwing one result away is a no-go.
A DAG as organizational structure makes that issue go away entirely.
Also, we can later implement checks whether the DAG contains multiple
versions of the same library, if that is undesireable.
The change itself is rather big, frankly because it is a non-trivial
change the replace the whole data structure and its handling in the
orchestrator code.
First of all, we introduce the "daggy" library, which provides the DAG
implementation on top of the popular "petgraph" library.
The package `Tree` datastructure was replaced by a package `Dag`
datastructure. This type implements the heavy-lifting that is needed to
load a package and all its dependencies from the `Repository` object.
The `JobTree` was also reimplemented, but as `daggy::Dag` provides a
convenient `map()` function, its implementation which transforms the
package `Dag` into a job `Dag` is rather trivial.
`crate::job::Dag` then provides the convenience `iter()` function to
iterate over all elements in the DAG and providing a `JobDefinition`
object for each node.
The topology in which we traverse the DAG is not an issue, as we need to
create tasks for all `JobDefinition`s anyways, so we do not care about
traversal topology at all.
The `crate::package::Package` type got an `Hash` implementation, which
is necessary to keep track of the mappings while reading the DAG from
the repository.
The implementation does not create the edges between the nodes in the
DAG right when inserting, but afterwards.
To keep track of the `daggy::NodeIndex`es, it keeps a mapping
Package -> NodeIndex
in a Hashmap. Thus, `Package` must implement `std::hash::Hash`
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Tested-by: Matthias Beyer <mail@beyermatthias.de>
squash! Reimplement as DAG
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 25 |
1 files changed, 11 insertions, 14 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index cedc549..bed2195 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -37,7 +37,7 @@ use crate::filestore::ReleaseStore; use crate::filestore::StagingStore; use crate::job::JobDefinition; use crate::job::RunnableJob; -use crate::job::Tree as JobTree; +use crate::job::Dag; use crate::source::SourceCache; use crate::util::progress::ProgressBars; @@ -45,7 +45,7 @@ use crate::util::progress::ProgressBars; /// The Orchestrator /// /// The Orchestrator is used to orchestrate the work on one submit. -/// On a very high level: It uses a [JobTree](crate::job::Tree) to build a number (list) of +/// On a very high level: It uses a [Dag](crate::job::Dag) to build a number (list) of /// [JobTasks](crate::orchestrator::JobTask) that is then run concurrently. /// /// Because of the implementation of [JobTask], the work happens in @@ -153,7 +153,7 @@ pub struct Orchestrator<'a> { progress_generator: ProgressBars, merged_stores: MergedStores, source_cache: SourceCache, - jobtree: JobTree, + jobdag: Dag, config: &'a Configuration, database: Arc<PgConnection>, } @@ -165,7 +165,7 @@ pub struct OrchestratorSetup<'a> { staging_store: Arc<RwLock<StagingStore>>, release_store: Arc<RwLock<ReleaseStore>>, source_cache: SourceCache, - jobtree: JobTree, + jobdag: Dag, database: Arc<PgConnection>, submit: dbmodels::Submit, log_dir: Option<PathBuf>, @@ -188,7 +188,7 @@ impl<'a> OrchestratorSetup<'a> { progress_generator: self.progress_generator, merged_stores: MergedStores::new(self.release_store, self.staging_store), source_cache: self.source_cache, - jobtree: self.jobtree, + jobdag: self.jobdag, config: self.config, database: self.database, }) @@ -214,7 +214,7 @@ impl<'a> Orchestrator<'a> { async fn run_tree(self) -> Result<(Vec<Artifact>, Vec<(Uuid, Error)>)> { let multibar = Arc::new(indicatif::MultiProgress::new()); - // For each job in the jobtree, built a tuple with + // For each job in the jobdag, built a tuple with // // 1. The receiver that is used by the task to receive results from dependency tasks from // 2. The task itself (as a TaskPreparation object) @@ -223,16 +223,15 @@ impl<'a> Orchestrator<'a> { // This is an Option<> because we need to set it later and the root of the tree needs a // special handling, as this very function will wait on a receiver that gets the results // of the root task - let jobs: Vec<(Receiver<JobResult>, TaskPreparation, Sender<JobResult>, _)> = self.jobtree - .inner() + let jobs: Vec<(Receiver<JobResult>, TaskPreparation, Sender<JobResult>, _)> = self.jobdag .iter() - .map(|(uuid, jobdef)| { + .map(|jobdef| { // We initialize the channel with 100 elements here, as there is unlikely a task // that depends on 100 other tasks. // Either way, this might be increased in future. let (sender, receiver) = tokio::sync::mpsc::channel(100); - trace!("Creating TaskPreparation object for job {}", uuid); + trace!("Creating TaskPreparation object for job {}", jobdef.job.uuid()); let bar = self.progress_generator.bar(); let bar = multibar.add(bar); bar.set_length(100); @@ -337,8 +336,7 @@ impl<'a> Orchestrator<'a> { /// /// This simply holds data and does not contain any more functionality struct TaskPreparation<'a> { - /// The UUID of this job - jobdef: &'a JobDefinition, + jobdef: JobDefinition<'a>, bar: ProgressBar, @@ -353,8 +351,7 @@ struct TaskPreparation<'a> { /// /// This type represents a task for a job that can immediately be executed (see `JobTask::run()`). struct JobTask<'a> { - /// The UUID of this job - jobdef: &'a JobDefinition, + jobdef: JobDefinition<'a>, bar: ProgressBar, |