summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-02-04 15:52:02 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-02-06 11:54:23 +0100
commitefe07be74cf1ae704cb73b0f20c28b33aa46c217 (patch)
tree2c7b1686fc2878b79d82708bd2335c713326299d /src/orchestrator/orchestrator.rs
parent8a4ef5e6c0126037412794699d5f80540ffd4802 (diff)
Rewrite package organizational structure using DAG
This patch reimplements the package orchestration functionality to rely on a DAG rather than a tree. A / \ B E / \ \ C D F Before this change, the structure the packages were organized in for a build was a tree. That did work reasonable well for initial development of butido, because this is a simple case and the implementation is rather simple, too. But, packages and their dependencies are not always organized in a tree. Most of the time, they are organized in a DAG: .-> C -, / \ D > A \ / `-> B -ยด This is a real-world example: A could be a common crypto-library that I do not want to name here. B and C could be libraries that use the said crypto-library and D could be a program that use B and C. Because said crypto-library builds rather long, building it twice and throwing one result away is a no-go. A DAG as organizational structure makes that issue go away entirely. Also, we can later implement checks whether the DAG contains multiple versions of the same library, if that is undesireable. The change itself is rather big, frankly because it is a non-trivial change the replace the whole data structure and its handling in the orchestrator code. First of all, we introduce the "daggy" library, which provides the DAG implementation on top of the popular "petgraph" library. The package `Tree` datastructure was replaced by a package `Dag` datastructure. This type implements the heavy-lifting that is needed to load a package and all its dependencies from the `Repository` object. The `JobTree` was also reimplemented, but as `daggy::Dag` provides a convenient `map()` function, its implementation which transforms the package `Dag` into a job `Dag` is rather trivial. `crate::job::Dag` then provides the convenience `iter()` function to iterate over all elements in the DAG and providing a `JobDefinition` object for each node. The topology in which we traverse the DAG is not an issue, as we need to create tasks for all `JobDefinition`s anyways, so we do not care about traversal topology at all. The `crate::package::Package` type got an `Hash` implementation, which is necessary to keep track of the mappings while reading the DAG from the repository. The implementation does not create the edges between the nodes in the DAG right when inserting, but afterwards. To keep track of the `daggy::NodeIndex`es, it keeps a mapping Package -> NodeIndex in a Hashmap. Thus, `Package` must implement `std::hash::Hash` Signed-off-by: Matthias Beyer <mail@beyermatthias.de> Tested-by: Matthias Beyer <mail@beyermatthias.de> squash! Reimplement as DAG
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs25
1 files changed, 11 insertions, 14 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index cedc549..bed2195 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -37,7 +37,7 @@ use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
use crate::job::JobDefinition;
use crate::job::RunnableJob;
-use crate::job::Tree as JobTree;
+use crate::job::Dag;
use crate::source::SourceCache;
use crate::util::progress::ProgressBars;
@@ -45,7 +45,7 @@ use crate::util::progress::ProgressBars;
/// The Orchestrator
///
/// The Orchestrator is used to orchestrate the work on one submit.
-/// On a very high level: It uses a [JobTree](crate::job::Tree) to build a number (list) of
+/// On a very high level: It uses a [Dag](crate::job::Dag) to build a number (list) of
/// [JobTasks](crate::orchestrator::JobTask) that is then run concurrently.
///
/// Because of the implementation of [JobTask], the work happens in
@@ -153,7 +153,7 @@ pub struct Orchestrator<'a> {
progress_generator: ProgressBars,
merged_stores: MergedStores,
source_cache: SourceCache,
- jobtree: JobTree,
+ jobdag: Dag,
config: &'a Configuration,
database: Arc<PgConnection>,
}
@@ -165,7 +165,7 @@ pub struct OrchestratorSetup<'a> {
staging_store: Arc<RwLock<StagingStore>>,
release_store: Arc<RwLock<ReleaseStore>>,
source_cache: SourceCache,
- jobtree: JobTree,
+ jobdag: Dag,
database: Arc<PgConnection>,
submit: dbmodels::Submit,
log_dir: Option<PathBuf>,
@@ -188,7 +188,7 @@ impl<'a> OrchestratorSetup<'a> {
progress_generator: self.progress_generator,
merged_stores: MergedStores::new(self.release_store, self.staging_store),
source_cache: self.source_cache,
- jobtree: self.jobtree,
+ jobdag: self.jobdag,
config: self.config,
database: self.database,
})
@@ -214,7 +214,7 @@ impl<'a> Orchestrator<'a> {
async fn run_tree(self) -> Result<(Vec<Artifact>, Vec<(Uuid, Error)>)> {
let multibar = Arc::new(indicatif::MultiProgress::new());
- // For each job in the jobtree, built a tuple with
+ // For each job in the jobdag, built a tuple with
//
// 1. The receiver that is used by the task to receive results from dependency tasks from
// 2. The task itself (as a TaskPreparation object)
@@ -223,16 +223,15 @@ impl<'a> Orchestrator<'a> {
// This is an Option<> because we need to set it later and the root of the tree needs a
// special handling, as this very function will wait on a receiver that gets the results
// of the root task
- let jobs: Vec<(Receiver<JobResult>, TaskPreparation, Sender<JobResult>, _)> = self.jobtree
- .inner()
+ let jobs: Vec<(Receiver<JobResult>, TaskPreparation, Sender<JobResult>, _)> = self.jobdag
.iter()
- .map(|(uuid, jobdef)| {
+ .map(|jobdef| {
// We initialize the channel with 100 elements here, as there is unlikely a task
// that depends on 100 other tasks.
// Either way, this might be increased in future.
let (sender, receiver) = tokio::sync::mpsc::channel(100);
- trace!("Creating TaskPreparation object for job {}", uuid);
+ trace!("Creating TaskPreparation object for job {}", jobdef.job.uuid());
let bar = self.progress_generator.bar();
let bar = multibar.add(bar);
bar.set_length(100);
@@ -337,8 +336,7 @@ impl<'a> Orchestrator<'a> {
///
/// This simply holds data and does not contain any more functionality
struct TaskPreparation<'a> {
- /// The UUID of this job
- jobdef: &'a JobDefinition,
+ jobdef: JobDefinition<'a>,
bar: ProgressBar,
@@ -353,8 +351,7 @@ struct TaskPreparation<'a> {
///
/// This type represents a task for a job that can immediately be executed (see `JobTask::run()`).
struct JobTask<'a> {
- /// The UUID of this job
- jobdef: &'a JobDefinition,
+ jobdef: JobDefinition<'a>,
bar: ProgressBar,