summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs91
1 files changed, 51 insertions, 40 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index b5d1609..710b53f 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -32,7 +32,7 @@ use crate::config::Configuration;
use crate::db::models as dbmodels;
use crate::endpoint::EndpointConfiguration;
use crate::endpoint::EndpointScheduler;
-use crate::filestore::Artifact;
+use crate::filestore::ArtifactPath;
use crate::filestore::MergedStores;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
@@ -203,16 +203,16 @@ impl<'a> OrchestratorSetup<'a> {
/// It is either a list of artifacts with the UUID of the job they were produced by,
/// or a UUID and an Error object, where the UUID is the job UUID and the error is the
/// anyhow::Error that was issued.
-type JobResult = std::result::Result<HashMap<Uuid, Vec<Artifact>>, HashMap<Uuid, Error>>;
+type JobResult = std::result::Result<HashMap<Uuid, Vec<ArtifactPath>>, HashMap<Uuid, Error>>;
impl<'a> Orchestrator<'a> {
- pub async fn run(self, output: &mut Vec<Artifact>) -> Result<HashMap<Uuid, Error>> {
+ pub async fn run(self, output: &mut Vec<ArtifactPath>) -> Result<HashMap<Uuid, Error>> {
let (results, errors) = self.run_tree().await?;
output.extend(results.into_iter());
Ok(errors)
}
- async fn run_tree(self) -> Result<(Vec<Artifact>, HashMap<Uuid, Error>)> {
+ async fn run_tree(self) -> Result<(Vec<ArtifactPath>, HashMap<Uuid, Error>)> {
let multibar = Arc::new(indicatif::MultiProgress::new());
// For each job in the jobdag, built a tuple with
@@ -268,20 +268,23 @@ impl<'a> Orchestrator<'a> {
.filter(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid()))
.map(|j| j.2.clone())
});
- } else {
- *job.3.borrow_mut() = {
- let depending_on_job = jobs.iter()
- .filter(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid()))
- .map(|j| j.2.clone())
- .collect::<Vec<Sender<JobResult>>>();
-
- if depending_on_job.is_empty() {
- None
- } else {
- Some(depending_on_job)
- }
- };
+
+ continue;
}
+
+ // else, but not in else {} because of borrowing
+ *job.3.borrow_mut() = {
+ let depending_on_job = jobs.iter()
+ .filter(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid()))
+ .map(|j| j.2.clone())
+ .collect::<Vec<Sender<JobResult>>>();
+
+ if depending_on_job.is_empty() {
+ None
+ } else {
+ Some(depending_on_job)
+ }
+ };
}
// Find the id of the root task
@@ -311,23 +314,9 @@ impl<'a> Orchestrator<'a> {
.into_iter()
.map(|prep| {
trace!("Creating JobTask for = {}", prep.1.jobdef.job.uuid());
- let root_sender = root_sender.clone();
- JobTask {
- jobdef: prep.1.jobdef,
-
- bar: prep.1.bar.clone(),
-
- config: prep.1.config,
- source_cache: prep.1.source_cache,
- scheduler: prep.1.scheduler,
- merged_stores: prep.1.merged_stores,
- database: prep.1.database.clone(),
-
- receiver: prep.0,
-
- // the sender is set or we need to use the root sender
- sender: prep.3.into_inner().unwrap_or_else(|| vec![root_sender]),
- }
+ // the sender is set or we need to use the root sender
+ let sender = prep.3.into_inner().unwrap_or_else(|| vec![root_sender.clone()]);
+ JobTask::new(prep.0, prep.1, sender)
})
.map(|task| task.run())
.collect::<futures::stream::FuturesUnordered<_>>();
@@ -416,6 +405,28 @@ impl<'a> Drop for JobTask<'a> {
}
impl<'a> JobTask<'a> {
+ fn new(receiver: Receiver<JobResult>, prep: TaskPreparation<'a>, sender: Vec<Sender<JobResult>>) -> Self {
+ let bar = prep.bar.clone();
+ bar.set_message(&format!("[{} {} {}]: Booting",
+ prep.jobdef.job.uuid(),
+ prep.jobdef.job.package().name(),
+ prep.jobdef.job.package().version()
+ ));
+ JobTask {
+ jobdef: prep.jobdef,
+
+ bar,
+
+ config: prep.config,
+ source_cache: prep.source_cache,
+ scheduler: prep.scheduler,
+ merged_stores: prep.merged_stores,
+ database: prep.database.clone(),
+
+ receiver,
+ sender,
+ }
+ }
/// Run the job
///
@@ -429,7 +440,7 @@ impl<'a> JobTask<'a> {
// A list of job run results from dependencies that were received from the tasks for the
// dependencies
- let mut received_dependencies: HashMap<Uuid, Vec<Artifact>> = HashMap::new();
+ let mut received_dependencies: HashMap<Uuid, Vec<ArtifactPath>> = HashMap::new();
// A list of errors that were received from the tasks for the dependencies
let mut received_errors: HashMap<Uuid, Error> = HashMap::with_capacity(self.jobdef.dependencies.len());
@@ -480,15 +491,15 @@ impl<'a> JobTask<'a> {
}
// Map the list of received dependencies from
- // Vec<(Uuid, Vec<Artifact>)>
+ // Vec<(Uuid, Vec<ArtifactPath>)>
// to
- // Vec<Artifact>
+ // Vec<ArtifactPath>
let dependency_artifacts = received_dependencies
.values()
.map(|v| v.iter())
.flatten()
.cloned()
- .collect();
+ .collect::<Vec<ArtifactPath>>();
trace!("[{}]: Dependency artifacts = {:?}", self.jobdef.job.uuid(), dependency_artifacts);
self.bar.set_message(&format!("[{} {} {}]: Preparing...",
self.jobdef.job.uuid(),
@@ -547,11 +558,11 @@ impl<'a> JobTask<'a> {
///
/// Return Ok(true) if we should continue operation
/// Return Ok(false) if the channel is empty and we're done receiving
- async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<Artifact>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> {
+ async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<ArtifactPath>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> {
match self.receiver.recv().await {
Some(Ok(mut v)) => {
// The task we depend on succeeded and returned an
- // (uuid of the job, [Artifact])
+ // (uuid of the job, [ArtifactPath])
trace!("[{}]: Received: {:?}", self.jobdef.job.uuid(), v);
received_dependencies.extend(v);
Ok(true)