From 2852e1b6e9a3987d923e6fe774172a00a12b9192 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 5 Feb 2021 08:23:38 +0100 Subject: Optimize: Don't duplicate job UUID Signed-off-by: Matthias Beyer --- src/orchestrator/orchestrator.rs | 44 ++++++++++++++++++---------------------- 1 file changed, 20 insertions(+), 24 deletions(-) (limited to 'src') diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 42e3b63..cedc549 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -237,7 +237,6 @@ impl<'a> Orchestrator<'a> { let bar = multibar.add(bar); bar.set_length(100); let tp = TaskPreparation { - uuid: *uuid, jobdef, bar, @@ -264,7 +263,7 @@ impl<'a> Orchestrator<'a> { // use the sender of the found job and set it as sender for this job for job in jobs.iter() { *job.3.borrow_mut() = jobs.iter() - .find(|j| j.1.jobdef.dependencies.contains(&job.1.uuid)) + .find(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid())) .map(|j| j.2.clone()); } @@ -277,7 +276,7 @@ impl<'a> Orchestrator<'a> { // Here, we copy its uuid, because we need it later. let root_job_id = jobs.iter() .find(|j| j.3.borrow().is_none()) - .map(|j| j.1.uuid) + .map(|j| j.1.jobdef.job.uuid()) .ok_or_else(|| anyhow!("Failed to find root task"))?; trace!("Root job id = {}", root_job_id); @@ -294,10 +293,9 @@ impl<'a> Orchestrator<'a> { let running_jobs = jobs .into_iter() .map(|prep| { - trace!("Creating JobTask for = {}", prep.1.uuid); + trace!("Creating JobTask for = {}", prep.1.jobdef.job.uuid()); let root_sender = root_sender.clone(); JobTask { - uuid: prep.1.uuid, jobdef: prep.1.jobdef, bar: prep.1.bar.clone(), @@ -340,7 +338,6 @@ impl<'a> Orchestrator<'a> { /// This simply holds data and does not contain any more functionality struct TaskPreparation<'a> { /// The UUID of this job - uuid: Uuid, jobdef: &'a JobDefinition, bar: ProgressBar, @@ -357,7 +354,6 @@ struct TaskPreparation<'a> { /// This type represents a task for a job that can immediately be executed (see `JobTask::run()`). struct JobTask<'a> { /// The UUID of this job - uuid: Uuid, jobdef: &'a JobDefinition, bar: ProgressBar, @@ -382,8 +378,8 @@ impl<'a> JobTask<'a> { /// This function runs the job from this object on the scheduler as soon as all dependend jobs /// returned successfully. async fn run(mut self) -> Result<()> { - debug!("[{}]: Running", self.uuid); - debug!("[{}]: Waiting for dependencies = {:?}", self.uuid, { + debug!("[{}]: Running", self.jobdef.job.uuid()); + debug!("[{}]: Waiting for dependencies = {:?}", self.jobdef.job.uuid(), { self.jobdef.dependencies.iter().map(|u| u.to_string()).collect::>() }); @@ -406,22 +402,22 @@ impl<'a> JobTask<'a> { // Update the status bar message self.bar.set_message({ &format!("[{} {} {}]: Waiting ({}/{})...", - self.uuid, + self.jobdef.job.uuid(), self.jobdef.job.package().name(), self.jobdef.job.package().version(), received_dependencies.len(), self.jobdef.dependencies.len()) }); - trace!("[{}]: Updated bar", self.uuid); + trace!("[{}]: Updated bar", self.jobdef.job.uuid()); - trace!("[{}]: receiving...", self.uuid); + trace!("[{}]: receiving...", self.jobdef.job.uuid()); // receive from the receiver let continue_receiving = self.perform_receive(&mut received_dependencies, &mut received_errors).await?; if !continue_receiving { break; } - trace!("[{}]: Received errors = {:?}", self.uuid, received_errors); + trace!("[{}]: Received errors = {:?}", self.jobdef.job.uuid(), received_errors); // if there are any errors from child tasks if !received_errors.is_empty() { // send them to the parent,... @@ -456,9 +452,9 @@ impl<'a> JobTask<'a> { .flatten() .cloned() .collect(); - trace!("[{}]: Dependency artifacts = {:?}", self.uuid, dependency_artifacts); + trace!("[{}]: Dependency artifacts = {:?}", self.jobdef.job.uuid(), dependency_artifacts); self.bar.set_message(&format!("[{} {} {}]: Preparing...", - self.uuid, + self.jobdef.job.uuid(), self.jobdef.job.package().name(), self.jobdef.job.package().version() )); @@ -471,7 +467,7 @@ impl<'a> JobTask<'a> { dependency_artifacts)?; self.bar.set_message(&format!("[{} {} {}]: Scheduling...", - self.uuid, + self.jobdef.job.uuid(), self.jobdef.job.package().name(), self.jobdef.job.package().version() )); @@ -482,7 +478,7 @@ impl<'a> JobTask<'a> { // if the scheduler run reports an error, // that is an error from the actual execution of the job ... Err(e) => { - trace!("[{}]: Scheduler returned error = {:?}", self.uuid, e); + trace!("[{}]: Scheduler returned error = {:?}", self.jobdef.job.uuid(), e); // ... and we send that to our parent self.sender.send(Err(vec![(job_uuid, e)])).await?; }, @@ -490,15 +486,15 @@ impl<'a> JobTask<'a> { // if the scheduler run reports success, // it returns the database artifact objects it created! Ok(artifacts) => { - trace!("[{}]: Scheduler returned artifacts = {:?}", self.uuid, artifacts); - received_dependencies.push((self.uuid, artifacts)); + trace!("[{}]: Scheduler returned artifacts = {:?}", self.jobdef.job.uuid(), artifacts); + received_dependencies.push((*self.jobdef.job.uuid(), artifacts)); self.sender .send(Ok(received_dependencies)) .await?; }, } - trace!("[{}]: Finished successfully", self.uuid); + trace!("[{}]: Finished successfully", self.jobdef.job.uuid()); Ok(()) } @@ -514,20 +510,20 @@ impl<'a> JobTask<'a> { Some(Ok(mut v)) => { // The task we depend on succeeded and returned an // (uuid of the job, [Artifact]) - trace!("[{}]: Received: {:?}", self.uuid, v); + trace!("[{}]: Received: {:?}", self.jobdef.job.uuid(), v); received_dependencies.append(&mut v); Ok(true) }, Some(Err(mut e)) => { // The task we depend on failed // we log that error for now - trace!("[{}]: Received: {:?}", self.uuid, e); + trace!("[{}]: Received: {:?}", self.jobdef.job.uuid(), e); received_errors.append(&mut e); Ok(true) }, None => { // The task we depend on finished... we must check what we have now... - trace!("[{}]: Received nothing, channel seems to be empty", self.uuid); + trace!("[{}]: Received nothing, channel seems to be empty", self.jobdef.job.uuid()); // Find all dependencies that we need but which are not received let received = received_dependencies.iter().map(|tpl| tpl.0).collect::>(); @@ -536,7 +532,7 @@ impl<'a> JobTask<'a> { .iter() .filter(|d| !received.contains(d)) .collect(); - trace!("[{}]: Missing dependencies = {:?}", self.uuid, missing_deps); + trace!("[{}]: Missing dependencies = {:?}", self.jobdef.job.uuid(), missing_deps); // ... if there are any, error if !missing_deps.is_empty() { -- cgit v1.2.3