summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-01-25 15:37:28 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-01-25 15:43:00 +0100
commitfffac362dd4e866a92e788ef7a2945f78adb8520 (patch)
treecd43c39168c6202b4950a23d3e356165b471b084 /src
parentb7d8aaa8d96e7059f6cf3dd23f5b894752a30cc5 (diff)
parent53360d499e2233b30b1aaf745800cd7199009153 (diff)
Merge branch 'more-parallelism' into master
Diffstat (limited to 'src')
-rw-r--r--src/commands/build.rs2
-rw-r--r--src/endpoint/scheduler.rs42
-rw-r--r--src/filestore/merged.rs6
-rw-r--r--src/filestore/path.rs6
-rw-r--r--src/filestore/staging.rs5
-rw-r--r--src/orchestrator/orchestrator.rs439
6 files changed, 365 insertions, 135 deletions
diff --git a/src/commands/build.rs b/src/commands/build.rs
index c9a564b..86f205b 100644
--- a/src/commands/build.rs
+++ b/src/commands/build.rs
@@ -339,7 +339,7 @@ pub async fn build(
writeln!(outlock, "Packages created:")?;
}
artifacts.into_iter().try_for_each(|artifact| {
- writeln!(outlock, "-> {}", staging_dir.join(artifact.path).display()).map_err(Error::from)
+ writeln!(outlock, "-> {}", staging_dir.join(artifact.path()).display()).map_err(Error::from)
})?;
let mut had_error = false;
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 93b7509..c2f35fc 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -29,6 +29,7 @@ use uuid::Uuid;
use crate::db::models as dbmodels;
use crate::endpoint::Endpoint;
use crate::endpoint::EndpointConfiguration;
+use crate::filestore::Artifact;
use crate::filestore::StagingStore;
use crate::job::JobResource;
use crate::job::RunnableJob;
@@ -145,7 +146,7 @@ impl std::fmt::Debug for JobHandle {
}
impl JobHandle {
- pub async fn run(self) -> Result<Vec<dbmodels::Artifact>> {
+ pub async fn run(self) -> Result<Vec<Artifact>> {
let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>();
let ep = self.endpoint.read().await;
let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?;
@@ -241,9 +242,18 @@ impl JobHandle {
// Have to do it the ugly way here because of borrowing semantics
let mut r = vec![];
+ let staging_store_lock = self.staging_store.read().await;
for p in paths.iter() {
+ use std::ops::Deref;
trace!("DB: Creating artifact entry for path: {}", p.display());
- r.push(dbmodels::Artifact::create(&self.db, p, &job)?);
+ let _ = dbmodels::Artifact::create(&self.db, p, &job)?;
+ r.push({
+ staging_store_lock
+ .deref()
+ .get(p)
+ .ok_or_else(|| anyhow!("Artifact not in store: {:?}", p))?
+ .clone()
+ });
}
Ok(r)
}
@@ -336,30 +346,30 @@ impl<'a> LogReceiver<'a> {
trace!("Setting bar to {}", u as u64);
self.bar.set_position(u as u64);
self.bar.set_message(&format!(
- "Job ({} {}): {} running...",
- self.package_name, self.package_version, self.job_id
+ "[{} {} {}]: running...",
+ self.job_id, self.package_name, self.package_version
));
}
LogItem::CurrentPhase(ref phasename) => {
trace!("Setting bar phase to {}", phasename);
self.bar.set_message(&format!(
- "Job ({} {}): {} Phase: {}",
- self.package_name, self.package_version, self.job_id, phasename
+ "[{} {} {}]: Phase: {}",
+ self.job_id, self.package_name, self.package_version, phasename
));
}
LogItem::State(Ok(())) => {
trace!("Setting bar state to Ok");
self.bar.set_message(&format!(
- "Job ({} {}): {} State Ok",
- self.package_name, self.package_version, self.job_id
+ "[{} {} {}]: State Ok",
+ self.job_id, self.package_name, self.package_version
));
success = Some(true);
}
LogItem::State(Err(ref e)) => {
trace!("Setting bar state to Err: {}", e);
self.bar.set_message(&format!(
- "Job ({} {}): {} State Err: {}",
- self.package_name, self.package_version, self.job_id, e
+ "[{} {} {}]: State Err: {}",
+ self.job_id, self.package_name, self.package_version, e
));
success = Some(false);
}
@@ -370,16 +380,16 @@ impl<'a> LogReceiver<'a> {
trace!("Finishing bar = {:?}", success);
let finish_msg = match success {
Some(true) => format!(
- "Job ({} {}): {} finished successfully",
- self.package_name, self.package_version, self.job_id
+ "[{} {} {}]: finished successfully",
+ self.job_id, self.package_name, self.package_version
),
Some(false) => format!(
- "Job ({} {}): {} finished with error",
- self.package_name, self.package_version, self.job_id
+ "[{} {} {}]: finished with error",
+ self.job_id, self.package_name, self.package_version
),
None => format!(
- "Job ({} {}): {} finished",
- self.package_name, self.package_version, self.job_id
+ "[{} {} {}]: finished",
+ self.job_id, self.package_name, self.package_version
),
};
self.bar.finish_with_message(&finish_msg);
diff --git a/src/filestore/merged.rs b/src/filestore/merged.rs
index 3ed59dd..0b19d85 100644
--- a/src/filestore/merged.rs
+++ b/src/filestore/merged.rs
@@ -8,6 +8,11 @@
// SPDX-License-Identifier: EPL-2.0
//
+// TODO: The MergedStores is not used at all anymore, because we removed the feature while doing
+// the rewrite
+#![allow(unused)]
+
+
use std::sync::Arc;
use std::path::Path;
@@ -21,6 +26,7 @@ use crate::filestore::path::ArtifactPath;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
+
/// A type that merges the release store and the staging store
///
/// The stores are not actually merged (on disk or in memory), but the querying mechanism works in
diff --git a/src/filestore/path.rs b/src/filestore/path.rs
index ab0655b..cfa999f 100644
--- a/src/filestore/path.rs
+++ b/src/filestore/path.rs
@@ -149,6 +149,12 @@ impl ArtifactPath {
}
}
+impl AsRef<Path> for ArtifactPath {
+ fn as_ref(&self) -> &Path {
+ &self.0
+ }
+}
+
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct FullArtifactPath<'a>(&'a StoreRoot, &'a ArtifactPath);
diff --git a/src/filestore/staging.rs b/src/filestore/staging.rs
index b944d84..788cd02 100644
--- a/src/filestore/staging.rs
+++ b/src/filestore/staging.rs
@@ -19,6 +19,7 @@ use indicatif::ProgressBar;
use log::trace;
use result_inspect::ResultInspect;
+use crate::filestore::Artifact;
use crate::filestore::path::ArtifactPath;
use crate::filestore::path::StoreRoot;
use crate::filestore::util::FileStoreImpl;
@@ -100,4 +101,8 @@ impl StagingStore {
pub fn root_path(&self) -> &StoreRoot {
self.0.root_path()
}
+
+ pub fn get(&self, p: &ArtifactPath) -> Option<&Artifact> {
+ self.0.get(p)
+ }
}
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index e637f36..5c5b8ec 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -8,17 +8,21 @@
// SPDX-License-Identifier: EPL-2.0
//
+#![allow(unused)]
+
use std::path::PathBuf;
use std::sync::Arc;
-use anyhow::anyhow;
use anyhow::Error;
use anyhow::Result;
+use anyhow::anyhow;
use diesel::PgConnection;
use indicatif::ProgressBar;
use log::trace;
-use tokio::sync::RwLock;
use tokio::stream::StreamExt;
+use tokio::sync::RwLock;
+use tokio::sync::mpsc::Receiver;
+use tokio::sync::mpsc::Sender;
use typed_builder::TypedBuilder;
use uuid::Uuid;
@@ -90,147 +94,346 @@ impl<'a> OrchestratorSetup<'a> {
/// It is either a list of artifacts (with their respective database artifact objects)
/// or a UUID and an Error object, where the UUID is the job UUID and the error is the
/// anyhow::Error that was issued.
-type JobResult = std::result::Result<Vec<(Artifact, dbmodels::Artifact)>, (Uuid, Error)>;
+type JobResult = std::result::Result<(Uuid, Vec<Artifact>), Vec<(Uuid, Error)>>;
impl<'a> Orchestrator<'a> {
- pub async fn run(self, output: &mut Vec<dbmodels::Artifact>) -> Result<Vec<(Uuid, Error)>> {
+ pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, Error)>> {
let (results, errors) = self.run_tree().await?;
- output.extend(results.into_iter().map(|(_, dba)| dba));
+ output.extend(results.into_iter());
Ok(errors)
}
- async fn run_tree(self) -> Result<(Vec<(Artifact, dbmodels::Artifact)>, Vec<(Uuid, Error)>)> {
- use futures::FutureExt;
-
- let mut already_built = vec![];
- let mut artifacts = vec![];
- let mut errors = vec![];
-
- loop {
- // loop{}
- // until for all elements of self.jobtree, the uuid exists in already_built
- //
- // for each element in jobtree
- // where dependencies(element) all in already_built
- // run_job_for(element)
- //
- // for results from run_job_for calls
- // remember UUID in already_built
- // put built artifacts in artifacts
- // if error, abort everything
- //
- //
- let multibar = Arc::new(indicatif::MultiProgress::new());
- let build_results = self.jobtree
- .inner()
- .iter()
- .filter(|(uuid, jobdef)| { // select all jobs where all dependencies are in `already_built`
- trace!("Filtering job definition: {:?}", jobdef);
- jobdef.dependencies.iter().all(|d| already_built.contains(d)) && !already_built.contains(uuid)
- })
- .map(|(uuid, jobdef)| {
- trace!("Running job {}", uuid);
- let bar = multibar.add(self.progress_generator.bar());
- self.run_job(jobdef, bar).map(move |r| (*uuid, r))
- })
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Vec<(_, Result<JobResult>)>>();
-
- let multibar_block = tokio::task::spawn_blocking(move || multibar.join());
- let (_, build_results) = tokio::join!(multibar_block, build_results);
-
- for (uuid, artifact_result) in build_results.into_iter() {
- already_built.push(uuid);
-
- match artifact_result {
- Ok(Ok(mut arts)) => artifacts.append(&mut arts),
- Ok(Err((uuid, e))) => { // error during job running
- log::error!("Error for job {} = {}", uuid, e);
- errors.push((uuid, e));
- },
-
- Err(e) => return Err(e), // error during container execution
+ async fn run_tree(self) -> Result<(Vec<Artifact>, Vec<(Uuid, Error)>)> {
+ let multibar = Arc::new(indicatif::MultiProgress::new());
+
+ // For each job in the jobtree, built a tuple with
+ //
+ // 1. The receiver that is used by the task to receive results from dependency tasks from
+ // 2. The task itself (as a TaskPreparation object)
+ // 3. The sender, that can be used to send results to this task
+ // 4. An Option<Sender> that this tasks uses to send its results with
+ // This is an Option<> because we need to set it later and the root of the tree needs a
+ // special handling, as this very function will wait on a receiver that gets the results
+ // of the root task
+ let jobs: Vec<(Receiver<JobResult>, TaskPreparation, Sender<JobResult>, _)> = self.jobtree
+ .inner()
+ .iter()
+ .map(|(uuid, jobdef)| {
+ // We initialize the channel with 100 elements here, as there is unlikely a task
+ // that depends on 100 other tasks.
+ // Either way, this might be increased in future.
+ let (sender, receiver) = tokio::sync::mpsc::channel(100);
+
+ trace!("Creating TaskPreparation object for job {}", uuid);
+ let tp = TaskPreparation {
+ uuid: *uuid,
+ jobdef,
+
+ bar: multibar.add(self.progress_generator.bar()),
+ config: self.config,
+ source_cache: &self.source_cache,
+ scheduler: &self.scheduler,
+ merged_stores: &self.merged_stores,
+ database: self.database.clone(),
+ };
+
+ (receiver, tp, sender, std::cell::RefCell::new(None as Option<Sender<JobResult>>))
+ })
+ .collect();
+
+ // Associate tasks with their appropriate sender
+ //
+ // Right now, the tuple yielded from above contains (rx, task, tx, _), where rx and tx belong
+ // to eachother.
+ // But what we need is the tx (sender) that the task should send its result to, of course.
+ //
+ // So this algorithm in plain text is:
+ // for each job
+ // find the job that depends on this job
+ // use the sender of the found job and set it as sender for this job
+ for job in jobs.iter() {
+ *job.3.borrow_mut() = jobs.iter()
+ .find(|j| j.1.jobdef.dependencies.contains(&job.1.uuid))
+ .map(|j| j.2.clone());
+ }
+
+ // Find the id of the root task
+ //
+ // By now, all tasks should be associated with their respective sender.
+ // Only one has None sender: The task that is the "root" of the tree.
+ // By that property, we can find the root task.
+ //
+ // Here, we copy its uuid, because we need it later.
+ let root_job_id = jobs.iter()
+ .find(|j| j.3.borrow().is_none())
+ .map(|j| j.1.uuid)
+ .ok_or_else(|| anyhow!("Failed to find root task"))?;
+ trace!("Root job id = {}", root_job_id);
+
+ // Create a sender and a receiver for the root of the tree
+ let (root_sender, mut root_receiver) = tokio::sync::mpsc::channel(100);
+
+ // Make all prepared jobs into real jobs and run them
+ //
+ // This maps each TaskPreparation with its sender and receiver to a JobTask and calls the
+ // async fn JobTask::run() to run the task.
+ //
+ // The JobTask::run implementation handles the rest, we just have to wait for all futures
+ // to succeed.
+ let running_jobs = jobs
+ .into_iter()
+ .map(|prep| {
+ trace!("Creating JobTask for = {}", prep.1.uuid);
+ let root_sender = root_sender.clone();
+ JobTask {
+ uuid: prep.1.uuid,
+ jobdef: prep.1.jobdef,
+
+ bar: prep.1.bar.clone(),
+
+ config: prep.1.config,
+ source_cache: prep.1.source_cache,
+ scheduler: prep.1.scheduler,
+ merged_stores: prep.1.merged_stores,
+ database: prep.1.database.clone(),
+
+ receiver: prep.0,
+
+ // the sender is set or we need to use the root sender
+ sender: prep.3.into_inner().unwrap_or(root_sender),
}
- }
+ })
+ .map(|task| task.run())
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<()>>();
+
+ let root_recv = root_receiver.recv();
+ let multibar_block = tokio::task::spawn_blocking(move || multibar.join());
+
+ let (root_recv, _, jobs_result) = tokio::join!(root_recv, multibar_block, running_jobs);
+ let _ = jobs_result?;
+ match root_recv {
+ None => Err(anyhow!("No result received...")),
+ Some(Ok((_, artifacts))) => Ok((artifacts, vec![])),
+ Some(Err(errors)) => Ok((vec![], errors)),
+ }
+ }
+}
+
+/// Helper type: A task with all things attached, but not sender and receivers
+///
+/// This is the preparation of the JobTask, but without the associated sender and receiver, because
+/// it is not mapped to the task yet.
+///
+/// This simply holds data and does not contain any more functionality
+struct TaskPreparation<'a> {
+ /// The UUID of this job
+ uuid: Uuid,
+ jobdef: &'a JobDefinition,
+
+ bar: ProgressBar,
+
+ config: &'a Configuration,
+ source_cache: &'a SourceCache,
+ scheduler: &'a EndpointScheduler,
+ merged_stores: &'a MergedStores,
+ database: Arc<PgConnection>,
+}
+
+/// Helper type for executing one job task
+///
+/// This type represents a task for a job that can immediately be executed (see `JobTask::run()`).
+struct JobTask<'a> {
+ /// The UUID of this job
+ uuid: Uuid,
+ jobdef: &'a JobDefinition,
+
+ bar: ProgressBar,
+
+ config: &'a Configuration,
+ source_cache: &'a SourceCache,
+ scheduler: &'a EndpointScheduler,
+ merged_stores: &'a MergedStores,
+ database: Arc<PgConnection>,
+
+ /// Channel where the dependencies arrive
+ receiver: Receiver<JobResult>,
+
+ /// Channel to send the own build outputs to
+ sender: Sender<JobResult>,
+}
+
+impl<'a> JobTask<'a> {
- if !errors.is_empty() {
- break
+ /// Run the job
+ ///
+ /// This function runs the job from this object on the scheduler as soon as all dependend jobs
+ /// returned successfully.
+ async fn run(mut self) -> Result<()> {
+ trace!("[{}]: Running", self.uuid);
+
+ // A list of job run results from dependencies that were received from the tasks for the
+ // dependencies
+ let mut received_dependencies: Vec<(Uuid, Vec<Artifact>)> = vec![];
+
+ // A list of errors that were received from the tasks for the dependencies
+ let mut received_errors: Vec<(Uuid, Error)> = vec![];
+
+ // Helper function to check whether all UUIDs are in a list of UUIDs
+ let all_dependencies_are_in = |dependency_uuids: &[Uuid], list: &[(Uuid, Vec<_>)]| {
+ dependency_uuids.iter().all(|dependency_uuid| {
+ list.iter().map(|tpl| tpl.0).any(|id| id == *dependency_uuid)
+ })
+ };
+
+ // as long as the job definition lists dependencies that are not in the received_dependencies list...
+ while !all_dependencies_are_in(&self.jobdef.dependencies, &received_dependencies) {
+ // Update the status bar message
+ self.bar.set_message({
+ &format!("[{} {} {}]: Waiting ({}/{})...",
+ self.uuid,
+ self.jobdef.job.package().name(),
+ self.jobdef.job.package().version(),
+ received_dependencies.len(),
+ self.jobdef.dependencies.len())
+ });
+ trace!("[{}]: Updated bar", self.uuid);
+
+ trace!("[{}]: receiving...", self.uuid);
+ // receive from the receiver
+ let continue_receiving = self.perform_receive(&mut received_dependencies, &mut received_errors).await?;
+ if !continue_receiving {
+ break;
}
- // already_built.sort(); // TODO: optimization for binary search in
- // above and below contains() clause
+ trace!("[{}]: Received errors = {:?}", self.uuid, received_errors);
+ // if there are any errors from child tasks
+ if !received_errors.is_empty() {
+ // send them to the parent,...
+ self.sender.send(Err(received_errors)).await;
- if self.jobtree.inner().iter().all(|(uuid, _)| already_built.contains(uuid)) {
- break
+ // ... and stop operation, because the whole tree will fail anyways.
+ return Ok(())
}
}
- Ok((artifacts, errors))
- }
+ // receive items until the channel is empty.
+ //
+ // In the above loop, it could happen that we have all dependencies to run, but there is
+ // another job that reports artifacts.
+ // We need to collect them, too.
+ //
+ // This is techically not possible, because in a tree, we need all results from all childs.
+ // It just feels better having this in place as well.
+ //
+ // Sorry, not sorry.
+ while self.perform_receive(&mut received_dependencies, &mut received_errors).await? {
+ ;
+ }
- async fn run_job(&self, jobdef: &JobDefinition, bar: ProgressBar) -> Result<JobResult> {
- let dependency_artifacts = self.get_dependency_artifacts_for_jobs(&jobdef.dependencies).await?;
- bar.set_message("Preparing...");
+ // Map the list of received dependencies from
+ // Vec<(Uuid, Vec<Artifact>)>
+ // to
+ // Vec<Artifact>
+ let dependency_artifacts = received_dependencies
+ .iter()
+ .map(|tpl| tpl.1.iter())
+ .flatten()
+ .cloned()
+ .collect();
+ trace!("[{}]: Dependency artifacts = {:?}", self.uuid, dependency_artifacts);
+ self.bar.set_message(&format!("[{} {} {}]: Preparing...",
+ self.uuid,
+ self.jobdef.job.package().name(),
+ self.jobdef.job.package().version()
+ ));
+ // Create a RunnableJob object
let runnable = RunnableJob::build_from_job(
- &jobdef.job,
- &self.source_cache,
- &self.config,
+ &self.jobdef.job,
+ self.source_cache,
+ self.config,
dependency_artifacts)
.await?;
- bar.set_message("Scheduling...");
- let job_uuid = *jobdef.job.uuid();
- match self.scheduler.schedule_job(runnable, bar).await?.run().await {
- Err(e) => return Ok(Err((job_uuid, e))),
- Ok(db_artifacts) => {
- db_artifacts.into_iter()
- .map(|db_artifact| async {
- trace!("Getting store Artifact for db Artifact: {:?}", db_artifact);
- let art = self.get_store_artifact_for(&db_artifact).await?;
- trace!("Store Artifact: {:?}", art);
- Ok(Ok((art, db_artifact)))
- })
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Result<JobResult>>()
- .await
+ self.bar.set_message(&format!("[{} {} {}]: Scheduling...",
+ self.uuid,
+ self.jobdef.job.package().name(),
+ self.jobdef.job.package().version()
+ ));
+ let job_uuid = *self.jobdef.job.uuid();
+
+ // Schedule the job on the scheduler
+ match self.scheduler.schedule_job(runnable, self.bar).await?.run().await {
+ // if the scheduler run reports an error,
+ // that is an error from the actual execution of the job ...
+ Err(e) => {
+ trace!("[{}]: Scheduler returned error = {:?}", self.uuid, e);
+ // ... and we send that to our parent
+ self.sender.send(Err(vec![(job_uuid, e)])).await?;
+ },
+
+ // if the scheduler run reports success,
+ // it returns the database artifact objects it created!
+ Ok(mut artifacts) => {
+ trace!("[{}]: Scheduler returned artifacts = {:?}", self.uuid, artifacts);
+ artifacts.extend(received_dependencies.into_iter().map(|(_, v)| v.into_iter()).flatten());
+ self.sender
+ .send(Ok((self.uuid, artifacts)))
+ .await?;
},
}
+
+ trace!("[{}]: Finished successfully", self.uuid);
+ Ok(())
}
- /// Get all dependency artifacts for the job from the database
+ /// Performe a recv() call on the receiving side of the channel
///
- /// Use the JobDefinition object and find all dependency outputs in the database
- async fn get_dependency_artifacts_for_jobs(&self, uuids: &[Uuid]) -> Result<Vec<Artifact>> {
- use crate::schema;
- use crate::diesel::ExpressionMethods;
- use crate::diesel::QueryDsl;
- use crate::diesel::RunQueryDsl;
-
- // Pseudo code:
- //
- // * return for uuid in uuids:
- // self.database.get(job).get_artifacts()
-
- schema::artifacts::table
- .left_outer_join(schema::jobs::table)
- .filter(schema::jobs::uuid.eq_any(uuids))
- .select(schema::artifacts::all_columns)
- .load::<dbmodels::Artifact>(&*self.database)?
- .iter()
- .map(|dbart| self.get_store_artifact_for(dbart))
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect()
- .await
- }
+ /// Put the dependencies you received into the `received_dependencies`, the errors in the
+ /// `received_errors`
+ ///
+ /// Return Ok(true) if we should continue operation
+ /// Return Ok(false) if the channel is empty and we're done receiving
+ async fn perform_receive(&mut self, received_dependencies: &mut Vec<(Uuid, Vec<Artifact>)>, received_errors: &mut Vec<(Uuid, Error)>) -> Result<bool> {
+ match self.receiver.recv().await {
+ Some(Ok(v)) => {
+ // The task we depend on succeeded and returned an
+ // (uuid of the job, [Artifact])
+ trace!("[{}]: Received: {:?}", self.uuid, v);
+ received_dependencies.push(v);
+ Ok(true)
+ },
+ Some(Err(mut e)) => {
+ // The task we depend on failed
+ // we log that error for now
+ trace!("[{}]: Received: {:?}", self.uuid, e);
+ received_errors.append(&mut e);
+ Ok(true)
+ },
+ None => {
+ // The task we depend on finished... we must check what we have now...
+ trace!("[{}]: Received nothing, channel seems to be empty", self.uuid);
- async fn get_store_artifact_for(&self, db_artifact: &dbmodels::Artifact) -> Result<Artifact> {
- let p = PathBuf::from(&db_artifact.path);
- self.merged_stores
- .get_artifact_by_path(&p)
- .await?
- .ok_or_else(|| {
- anyhow!("Artifact not found in {}", p.display())
- })
+ // Find all dependencies that we need but which are not received
+ let received = received_dependencies.iter().map(|tpl| tpl.0).collect::<Vec<_>>();
+ let missing_deps: Vec<_> = self.jobdef
+ .dependencies
+ .iter()
+ .filter(|d| !received.contains(d))
+ .collect();
+ trace!("[{}]: Missing dependencies = {:?}", self.uuid, missing_deps);
+
+ // ... if there are any, error
+ if !missing_deps.is_empty() {
+ return Err(anyhow!("Childs finished, but dependencies still missing: {:?}", missing_deps))
+ } else {
+ // all dependencies are received
+ Ok(false)
+ }
+ },
+ }
}
+
}
+