summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-02-05 08:23:38 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-02-05 15:22:24 +0100
commit2852e1b6e9a3987d923e6fe774172a00a12b9192 (patch)
tree985913ee16948175d85229984a2a5ddbd1d4a0c3 /src/orchestrator/orchestrator.rs
parentcbae2959d8c4738445dc3f3635a1358151938e28 (diff)
Optimize: Don't duplicate job UUID
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs44
1 files changed, 20 insertions, 24 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 42e3b63..cedc549 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -237,7 +237,6 @@ impl<'a> Orchestrator<'a> {
let bar = multibar.add(bar);
bar.set_length(100);
let tp = TaskPreparation {
- uuid: *uuid,
jobdef,
bar,
@@ -264,7 +263,7 @@ impl<'a> Orchestrator<'a> {
// use the sender of the found job and set it as sender for this job
for job in jobs.iter() {
*job.3.borrow_mut() = jobs.iter()
- .find(|j| j.1.jobdef.dependencies.contains(&job.1.uuid))
+ .find(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid()))
.map(|j| j.2.clone());
}
@@ -277,7 +276,7 @@ impl<'a> Orchestrator<'a> {
// Here, we copy its uuid, because we need it later.
let root_job_id = jobs.iter()
.find(|j| j.3.borrow().is_none())
- .map(|j| j.1.uuid)
+ .map(|j| j.1.jobdef.job.uuid())
.ok_or_else(|| anyhow!("Failed to find root task"))?;
trace!("Root job id = {}", root_job_id);
@@ -294,10 +293,9 @@ impl<'a> Orchestrator<'a> {
let running_jobs = jobs
.into_iter()
.map(|prep| {
- trace!("Creating JobTask for = {}", prep.1.uuid);
+ trace!("Creating JobTask for = {}", prep.1.jobdef.job.uuid());
let root_sender = root_sender.clone();
JobTask {
- uuid: prep.1.uuid,
jobdef: prep.1.jobdef,
bar: prep.1.bar.clone(),
@@ -340,7 +338,6 @@ impl<'a> Orchestrator<'a> {
/// This simply holds data and does not contain any more functionality
struct TaskPreparation<'a> {
/// The UUID of this job
- uuid: Uuid,
jobdef: &'a JobDefinition,
bar: ProgressBar,
@@ -357,7 +354,6 @@ struct TaskPreparation<'a> {
/// This type represents a task for a job that can immediately be executed (see `JobTask::run()`).
struct JobTask<'a> {
/// The UUID of this job
- uuid: Uuid,
jobdef: &'a JobDefinition,
bar: ProgressBar,
@@ -382,8 +378,8 @@ impl<'a> JobTask<'a> {
/// This function runs the job from this object on the scheduler as soon as all dependend jobs
/// returned successfully.
async fn run(mut self) -> Result<()> {
- debug!("[{}]: Running", self.uuid);
- debug!("[{}]: Waiting for dependencies = {:?}", self.uuid, {
+ debug!("[{}]: Running", self.jobdef.job.uuid());
+ debug!("[{}]: Waiting for dependencies = {:?}", self.jobdef.job.uuid(), {
self.jobdef.dependencies.iter().map(|u| u.to_string()).collect::<Vec<String>>()
});
@@ -406,22 +402,22 @@ impl<'a> JobTask<'a> {
// Update the status bar message
self.bar.set_message({
&format!("[{} {} {}]: Waiting ({}/{})...",
- self.uuid,
+ self.jobdef.job.uuid(),
self.jobdef.job.package().name(),
self.jobdef.job.package().version(),
received_dependencies.len(),
self.jobdef.dependencies.len())
});
- trace!("[{}]: Updated bar", self.uuid);
+ trace!("[{}]: Updated bar", self.jobdef.job.uuid());
- trace!("[{}]: receiving...", self.uuid);
+ trace!("[{}]: receiving...", self.jobdef.job.uuid());
// receive from the receiver
let continue_receiving = self.perform_receive(&mut received_dependencies, &mut received_errors).await?;
if !continue_receiving {
break;
}
- trace!("[{}]: Received errors = {:?}", self.uuid, received_errors);
+ trace!("[{}]: Received errors = {:?}", self.jobdef.job.uuid(), received_errors);
// if there are any errors from child tasks
if !received_errors.is_empty() {
// send them to the parent,...
@@ -456,9 +452,9 @@ impl<'a> JobTask<'a> {
.flatten()
.cloned()
.collect();
- trace!("[{}]: Dependency artifacts = {:?}", self.uuid, dependency_artifacts);
+ trace!("[{}]: Dependency artifacts = {:?}", self.jobdef.job.uuid(), dependency_artifacts);
self.bar.set_message(&format!("[{} {} {}]: Preparing...",
- self.uuid,
+ self.jobdef.job.uuid(),
self.jobdef.job.package().name(),
self.jobdef.job.package().version()
));
@@ -471,7 +467,7 @@ impl<'a> JobTask<'a> {
dependency_artifacts)?;
self.bar.set_message(&format!("[{} {} {}]: Scheduling...",
- self.uuid,
+ self.jobdef.job.uuid(),
self.jobdef.job.package().name(),
self.jobdef.job.package().version()
));
@@ -482,7 +478,7 @@ impl<'a> JobTask<'a> {
// if the scheduler run reports an error,
// that is an error from the actual execution of the job ...
Err(e) => {
- trace!("[{}]: Scheduler returned error = {:?}", self.uuid, e);
+ trace!("[{}]: Scheduler returned error = {:?}", self.jobdef.job.uuid(), e);
// ... and we send that to our parent
self.sender.send(Err(vec![(job_uuid, e)])).await?;
},
@@ -490,15 +486,15 @@ impl<'a> JobTask<'a> {
// if the scheduler run reports success,
// it returns the database artifact objects it created!
Ok(artifacts) => {
- trace!("[{}]: Scheduler returned artifacts = {:?}", self.uuid, artifacts);
- received_dependencies.push((self.uuid, artifacts));
+ trace!("[{}]: Scheduler returned artifacts = {:?}", self.jobdef.job.uuid(), artifacts);
+ received_dependencies.push((*self.jobdef.job.uuid(), artifacts));
self.sender
.send(Ok(received_dependencies))
.await?;
},
}
- trace!("[{}]: Finished successfully", self.uuid);
+ trace!("[{}]: Finished successfully", self.jobdef.job.uuid());
Ok(())
}
@@ -514,20 +510,20 @@ impl<'a> JobTask<'a> {
Some(Ok(mut v)) => {
// The task we depend on succeeded and returned an
// (uuid of the job, [Artifact])
- trace!("[{}]: Received: {:?}", self.uuid, v);
+ trace!("[{}]: Received: {:?}", self.jobdef.job.uuid(), v);
received_dependencies.append(&mut v);
Ok(true)
},
Some(Err(mut e)) => {
// The task we depend on failed
// we log that error for now
- trace!("[{}]: Received: {:?}", self.uuid, e);
+ trace!("[{}]: Received: {:?}", self.jobdef.job.uuid(), e);
received_errors.append(&mut e);
Ok(true)
},
None => {
// The task we depend on finished... we must check what we have now...
- trace!("[{}]: Received nothing, channel seems to be empty", self.uuid);
+ trace!("[{}]: Received nothing, channel seems to be empty", self.jobdef.job.uuid());
// Find all dependencies that we need but which are not received
let received = received_dependencies.iter().map(|tpl| tpl.0).collect::<Vec<_>>();
@@ -536,7 +532,7 @@ impl<'a> JobTask<'a> {
.iter()
.filter(|d| !received.contains(d))
.collect();
- trace!("[{}]: Missing dependencies = {:?}", self.uuid, missing_deps);
+ trace!("[{}]: Missing dependencies = {:?}", self.jobdef.job.uuid(), missing_deps);
// ... if there are any, error
if !missing_deps.is_empty() {