diff options
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 91 |
1 files changed, 51 insertions, 40 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index b5d1609..710b53f 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -32,7 +32,7 @@ use crate::config::Configuration; use crate::db::models as dbmodels; use crate::endpoint::EndpointConfiguration; use crate::endpoint::EndpointScheduler; -use crate::filestore::Artifact; +use crate::filestore::ArtifactPath; use crate::filestore::MergedStores; use crate::filestore::ReleaseStore; use crate::filestore::StagingStore; @@ -203,16 +203,16 @@ impl<'a> OrchestratorSetup<'a> { /// It is either a list of artifacts with the UUID of the job they were produced by, /// or a UUID and an Error object, where the UUID is the job UUID and the error is the /// anyhow::Error that was issued. -type JobResult = std::result::Result<HashMap<Uuid, Vec<Artifact>>, HashMap<Uuid, Error>>; +type JobResult = std::result::Result<HashMap<Uuid, Vec<ArtifactPath>>, HashMap<Uuid, Error>>; impl<'a> Orchestrator<'a> { - pub async fn run(self, output: &mut Vec<Artifact>) -> Result<HashMap<Uuid, Error>> { + pub async fn run(self, output: &mut Vec<ArtifactPath>) -> Result<HashMap<Uuid, Error>> { let (results, errors) = self.run_tree().await?; output.extend(results.into_iter()); Ok(errors) } - async fn run_tree(self) -> Result<(Vec<Artifact>, HashMap<Uuid, Error>)> { + async fn run_tree(self) -> Result<(Vec<ArtifactPath>, HashMap<Uuid, Error>)> { let multibar = Arc::new(indicatif::MultiProgress::new()); // For each job in the jobdag, built a tuple with @@ -268,20 +268,23 @@ impl<'a> Orchestrator<'a> { .filter(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid())) .map(|j| j.2.clone()) }); - } else { - *job.3.borrow_mut() = { - let depending_on_job = jobs.iter() - .filter(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid())) - .map(|j| j.2.clone()) - .collect::<Vec<Sender<JobResult>>>(); - - if depending_on_job.is_empty() { - None - } else { - Some(depending_on_job) - } - }; + + continue; } + + // else, but not in else {} because of borrowing + *job.3.borrow_mut() = { + let depending_on_job = jobs.iter() + .filter(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid())) + .map(|j| j.2.clone()) + .collect::<Vec<Sender<JobResult>>>(); + + if depending_on_job.is_empty() { + None + } else { + Some(depending_on_job) + } + }; } // Find the id of the root task @@ -311,23 +314,9 @@ impl<'a> Orchestrator<'a> { .into_iter() .map(|prep| { trace!("Creating JobTask for = {}", prep.1.jobdef.job.uuid()); - let root_sender = root_sender.clone(); - JobTask { - jobdef: prep.1.jobdef, - - bar: prep.1.bar.clone(), - - config: prep.1.config, - source_cache: prep.1.source_cache, - scheduler: prep.1.scheduler, - merged_stores: prep.1.merged_stores, - database: prep.1.database.clone(), - - receiver: prep.0, - - // the sender is set or we need to use the root sender - sender: prep.3.into_inner().unwrap_or_else(|| vec![root_sender]), - } + // the sender is set or we need to use the root sender + let sender = prep.3.into_inner().unwrap_or_else(|| vec![root_sender.clone()]); + JobTask::new(prep.0, prep.1, sender) }) .map(|task| task.run()) .collect::<futures::stream::FuturesUnordered<_>>(); @@ -416,6 +405,28 @@ impl<'a> Drop for JobTask<'a> { } impl<'a> JobTask<'a> { + fn new(receiver: Receiver<JobResult>, prep: TaskPreparation<'a>, sender: Vec<Sender<JobResult>>) -> Self { + let bar = prep.bar.clone(); + bar.set_message(&format!("[{} {} {}]: Booting", + prep.jobdef.job.uuid(), + prep.jobdef.job.package().name(), + prep.jobdef.job.package().version() + )); + JobTask { + jobdef: prep.jobdef, + + bar, + + config: prep.config, + source_cache: prep.source_cache, + scheduler: prep.scheduler, + merged_stores: prep.merged_stores, + database: prep.database.clone(), + + receiver, + sender, + } + } /// Run the job /// @@ -429,7 +440,7 @@ impl<'a> JobTask<'a> { // A list of job run results from dependencies that were received from the tasks for the // dependencies - let mut received_dependencies: HashMap<Uuid, Vec<Artifact>> = HashMap::new(); + let mut received_dependencies: HashMap<Uuid, Vec<ArtifactPath>> = HashMap::new(); // A list of errors that were received from the tasks for the dependencies let mut received_errors: HashMap<Uuid, Error> = HashMap::with_capacity(self.jobdef.dependencies.len()); @@ -480,15 +491,15 @@ impl<'a> JobTask<'a> { } // Map the list of received dependencies from - // Vec<(Uuid, Vec<Artifact>)> + // Vec<(Uuid, Vec<ArtifactPath>)> // to - // Vec<Artifact> + // Vec<ArtifactPath> let dependency_artifacts = received_dependencies .values() .map(|v| v.iter()) .flatten() .cloned() - .collect(); + .collect::<Vec<ArtifactPath>>(); trace!("[{}]: Dependency artifacts = {:?}", self.jobdef.job.uuid(), dependency_artifacts); self.bar.set_message(&format!("[{} {} {}]: Preparing...", self.jobdef.job.uuid(), @@ -547,11 +558,11 @@ impl<'a> JobTask<'a> { /// /// Return Ok(true) if we should continue operation /// Return Ok(false) if the channel is empty and we're done receiving - async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<Artifact>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> { + async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<ArtifactPath>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> { match self.receiver.recv().await { Some(Ok(mut v)) => { // The task we depend on succeeded and returned an - // (uuid of the job, [Artifact]) + // (uuid of the job, [ArtifactPath]) trace!("[{}]: Received: {:?}", self.jobdef.job.uuid(), v); received_dependencies.extend(v); Ok(true) |