diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/commands/build.rs | 2 | ||||
-rw-r--r-- | src/endpoint/scheduler.rs | 42 | ||||
-rw-r--r-- | src/filestore/merged.rs | 6 | ||||
-rw-r--r-- | src/filestore/path.rs | 6 | ||||
-rw-r--r-- | src/filestore/staging.rs | 5 | ||||
-rw-r--r-- | src/orchestrator/orchestrator.rs | 439 |
6 files changed, 365 insertions, 135 deletions
diff --git a/src/commands/build.rs b/src/commands/build.rs index c9a564b..86f205b 100644 --- a/src/commands/build.rs +++ b/src/commands/build.rs @@ -339,7 +339,7 @@ pub async fn build( writeln!(outlock, "Packages created:")?; } artifacts.into_iter().try_for_each(|artifact| { - writeln!(outlock, "-> {}", staging_dir.join(artifact.path).display()).map_err(Error::from) + writeln!(outlock, "-> {}", staging_dir.join(artifact.path()).display()).map_err(Error::from) })?; let mut had_error = false; diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 93b7509..c2f35fc 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -29,6 +29,7 @@ use uuid::Uuid; use crate::db::models as dbmodels; use crate::endpoint::Endpoint; use crate::endpoint::EndpointConfiguration; +use crate::filestore::Artifact; use crate::filestore::StagingStore; use crate::job::JobResource; use crate::job::RunnableJob; @@ -145,7 +146,7 @@ impl std::fmt::Debug for JobHandle { } impl JobHandle { - pub async fn run(self) -> Result<Vec<dbmodels::Artifact>> { + pub async fn run(self) -> Result<Vec<Artifact>> { let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>(); let ep = self.endpoint.read().await; let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?; @@ -241,9 +242,18 @@ impl JobHandle { // Have to do it the ugly way here because of borrowing semantics let mut r = vec![]; + let staging_store_lock = self.staging_store.read().await; for p in paths.iter() { + use std::ops::Deref; trace!("DB: Creating artifact entry for path: {}", p.display()); - r.push(dbmodels::Artifact::create(&self.db, p, &job)?); + let _ = dbmodels::Artifact::create(&self.db, p, &job)?; + r.push({ + staging_store_lock + .deref() + .get(p) + .ok_or_else(|| anyhow!("Artifact not in store: {:?}", p))? + .clone() + }); } Ok(r) } @@ -336,30 +346,30 @@ impl<'a> LogReceiver<'a> { trace!("Setting bar to {}", u as u64); self.bar.set_position(u as u64); self.bar.set_message(&format!( - "Job ({} {}): {} running...", - self.package_name, self.package_version, self.job_id + "[{} {} {}]: running...", + self.job_id, self.package_name, self.package_version )); } LogItem::CurrentPhase(ref phasename) => { trace!("Setting bar phase to {}", phasename); self.bar.set_message(&format!( - "Job ({} {}): {} Phase: {}", - self.package_name, self.package_version, self.job_id, phasename + "[{} {} {}]: Phase: {}", + self.job_id, self.package_name, self.package_version, phasename )); } LogItem::State(Ok(())) => { trace!("Setting bar state to Ok"); self.bar.set_message(&format!( - "Job ({} {}): {} State Ok", - self.package_name, self.package_version, self.job_id + "[{} {} {}]: State Ok", + self.job_id, self.package_name, self.package_version )); success = Some(true); } LogItem::State(Err(ref e)) => { trace!("Setting bar state to Err: {}", e); self.bar.set_message(&format!( - "Job ({} {}): {} State Err: {}", - self.package_name, self.package_version, self.job_id, e + "[{} {} {}]: State Err: {}", + self.job_id, self.package_name, self.package_version, e )); success = Some(false); } @@ -370,16 +380,16 @@ impl<'a> LogReceiver<'a> { trace!("Finishing bar = {:?}", success); let finish_msg = match success { Some(true) => format!( - "Job ({} {}): {} finished successfully", - self.package_name, self.package_version, self.job_id + "[{} {} {}]: finished successfully", + self.job_id, self.package_name, self.package_version ), Some(false) => format!( - "Job ({} {}): {} finished with error", - self.package_name, self.package_version, self.job_id + "[{} {} {}]: finished with error", + self.job_id, self.package_name, self.package_version ), None => format!( - "Job ({} {}): {} finished", - self.package_name, self.package_version, self.job_id + "[{} {} {}]: finished", + self.job_id, self.package_name, self.package_version ), }; self.bar.finish_with_message(&finish_msg); diff --git a/src/filestore/merged.rs b/src/filestore/merged.rs index 3ed59dd..0b19d85 100644 --- a/src/filestore/merged.rs +++ b/src/filestore/merged.rs @@ -8,6 +8,11 @@ // SPDX-License-Identifier: EPL-2.0 // +// TODO: The MergedStores is not used at all anymore, because we removed the feature while doing +// the rewrite +#![allow(unused)] + + use std::sync::Arc; use std::path::Path; @@ -21,6 +26,7 @@ use crate::filestore::path::ArtifactPath; use crate::filestore::ReleaseStore; use crate::filestore::StagingStore; + /// A type that merges the release store and the staging store /// /// The stores are not actually merged (on disk or in memory), but the querying mechanism works in diff --git a/src/filestore/path.rs b/src/filestore/path.rs index ab0655b..cfa999f 100644 --- a/src/filestore/path.rs +++ b/src/filestore/path.rs @@ -149,6 +149,12 @@ impl ArtifactPath { } } +impl AsRef<Path> for ArtifactPath { + fn as_ref(&self) -> &Path { + &self.0 + } +} + #[derive(Clone, Debug, PartialEq, Eq)] pub struct FullArtifactPath<'a>(&'a StoreRoot, &'a ArtifactPath); diff --git a/src/filestore/staging.rs b/src/filestore/staging.rs index b944d84..788cd02 100644 --- a/src/filestore/staging.rs +++ b/src/filestore/staging.rs @@ -19,6 +19,7 @@ use indicatif::ProgressBar; use log::trace; use result_inspect::ResultInspect; +use crate::filestore::Artifact; use crate::filestore::path::ArtifactPath; use crate::filestore::path::StoreRoot; use crate::filestore::util::FileStoreImpl; @@ -100,4 +101,8 @@ impl StagingStore { pub fn root_path(&self) -> &StoreRoot { self.0.root_path() } + + pub fn get(&self, p: &ArtifactPath) -> Option<&Artifact> { + self.0.get(p) + } } diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index e637f36..5c5b8ec 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -8,17 +8,21 @@ // SPDX-License-Identifier: EPL-2.0 // +#![allow(unused)] + use std::path::PathBuf; use std::sync::Arc; -use anyhow::anyhow; use anyhow::Error; use anyhow::Result; +use anyhow::anyhow; use diesel::PgConnection; use indicatif::ProgressBar; use log::trace; -use tokio::sync::RwLock; use tokio::stream::StreamExt; +use tokio::sync::RwLock; +use tokio::sync::mpsc::Receiver; +use tokio::sync::mpsc::Sender; use typed_builder::TypedBuilder; use uuid::Uuid; @@ -90,147 +94,346 @@ impl<'a> OrchestratorSetup<'a> { /// It is either a list of artifacts (with their respective database artifact objects) /// or a UUID and an Error object, where the UUID is the job UUID and the error is the /// anyhow::Error that was issued. -type JobResult = std::result::Result<Vec<(Artifact, dbmodels::Artifact)>, (Uuid, Error)>; +type JobResult = std::result::Result<(Uuid, Vec<Artifact>), Vec<(Uuid, Error)>>; impl<'a> Orchestrator<'a> { - pub async fn run(self, output: &mut Vec<dbmodels::Artifact>) -> Result<Vec<(Uuid, Error)>> { + pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, Error)>> { let (results, errors) = self.run_tree().await?; - output.extend(results.into_iter().map(|(_, dba)| dba)); + output.extend(results.into_iter()); Ok(errors) } - async fn run_tree(self) -> Result<(Vec<(Artifact, dbmodels::Artifact)>, Vec<(Uuid, Error)>)> { - use futures::FutureExt; - - let mut already_built = vec![]; - let mut artifacts = vec![]; - let mut errors = vec![]; - - loop { - // loop{} - // until for all elements of self.jobtree, the uuid exists in already_built - // - // for each element in jobtree - // where dependencies(element) all in already_built - // run_job_for(element) - // - // for results from run_job_for calls - // remember UUID in already_built - // put built artifacts in artifacts - // if error, abort everything - // - // - let multibar = Arc::new(indicatif::MultiProgress::new()); - let build_results = self.jobtree - .inner() - .iter() - .filter(|(uuid, jobdef)| { // select all jobs where all dependencies are in `already_built` - trace!("Filtering job definition: {:?}", jobdef); - jobdef.dependencies.iter().all(|d| already_built.contains(d)) && !already_built.contains(uuid) - }) - .map(|(uuid, jobdef)| { - trace!("Running job {}", uuid); - let bar = multibar.add(self.progress_generator.bar()); - self.run_job(jobdef, bar).map(move |r| (*uuid, r)) - }) - .collect::<futures::stream::FuturesUnordered<_>>() - .collect::<Vec<(_, Result<JobResult>)>>(); - - let multibar_block = tokio::task::spawn_blocking(move || multibar.join()); - let (_, build_results) = tokio::join!(multibar_block, build_results); - - for (uuid, artifact_result) in build_results.into_iter() { - already_built.push(uuid); - - match artifact_result { - Ok(Ok(mut arts)) => artifacts.append(&mut arts), - Ok(Err((uuid, e))) => { // error during job running - log::error!("Error for job {} = {}", uuid, e); - errors.push((uuid, e)); - }, - - Err(e) => return Err(e), // error during container execution + async fn run_tree(self) -> Result<(Vec<Artifact>, Vec<(Uuid, Error)>)> { + let multibar = Arc::new(indicatif::MultiProgress::new()); + + // For each job in the jobtree, built a tuple with + // + // 1. The receiver that is used by the task to receive results from dependency tasks from + // 2. The task itself (as a TaskPreparation object) + // 3. The sender, that can be used to send results to this task + // 4. An Option<Sender> that this tasks uses to send its results with + // This is an Option<> because we need to set it later and the root of the tree needs a + // special handling, as this very function will wait on a receiver that gets the results + // of the root task + let jobs: Vec<(Receiver<JobResult>, TaskPreparation, Sender<JobResult>, _)> = self.jobtree + .inner() + .iter() + .map(|(uuid, jobdef)| { + // We initialize the channel with 100 elements here, as there is unlikely a task + // that depends on 100 other tasks. + // Either way, this might be increased in future. + let (sender, receiver) = tokio::sync::mpsc::channel(100); + + trace!("Creating TaskPreparation object for job {}", uuid); + let tp = TaskPreparation { + uuid: *uuid, + jobdef, + + bar: multibar.add(self.progress_generator.bar()), + config: self.config, + source_cache: &self.source_cache, + scheduler: &self.scheduler, + merged_stores: &self.merged_stores, + database: self.database.clone(), + }; + + (receiver, tp, sender, std::cell::RefCell::new(None as Option<Sender<JobResult>>)) + }) + .collect(); + + // Associate tasks with their appropriate sender + // + // Right now, the tuple yielded from above contains (rx, task, tx, _), where rx and tx belong + // to eachother. + // But what we need is the tx (sender) that the task should send its result to, of course. + // + // So this algorithm in plain text is: + // for each job + // find the job that depends on this job + // use the sender of the found job and set it as sender for this job + for job in jobs.iter() { + *job.3.borrow_mut() = jobs.iter() + .find(|j| j.1.jobdef.dependencies.contains(&job.1.uuid)) + .map(|j| j.2.clone()); + } + + // Find the id of the root task + // + // By now, all tasks should be associated with their respective sender. + // Only one has None sender: The task that is the "root" of the tree. + // By that property, we can find the root task. + // + // Here, we copy its uuid, because we need it later. + let root_job_id = jobs.iter() + .find(|j| j.3.borrow().is_none()) + .map(|j| j.1.uuid) + .ok_or_else(|| anyhow!("Failed to find root task"))?; + trace!("Root job id = {}", root_job_id); + + // Create a sender and a receiver for the root of the tree + let (root_sender, mut root_receiver) = tokio::sync::mpsc::channel(100); + + // Make all prepared jobs into real jobs and run them + // + // This maps each TaskPreparation with its sender and receiver to a JobTask and calls the + // async fn JobTask::run() to run the task. + // + // The JobTask::run implementation handles the rest, we just have to wait for all futures + // to succeed. + let running_jobs = jobs + .into_iter() + .map(|prep| { + trace!("Creating JobTask for = {}", prep.1.uuid); + let root_sender = root_sender.clone(); + JobTask { + uuid: prep.1.uuid, + jobdef: prep.1.jobdef, + + bar: prep.1.bar.clone(), + + config: prep.1.config, + source_cache: prep.1.source_cache, + scheduler: prep.1.scheduler, + merged_stores: prep.1.merged_stores, + database: prep.1.database.clone(), + + receiver: prep.0, + + // the sender is set or we need to use the root sender + sender: prep.3.into_inner().unwrap_or(root_sender), } - } + }) + .map(|task| task.run()) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Result<()>>(); + + let root_recv = root_receiver.recv(); + let multibar_block = tokio::task::spawn_blocking(move || multibar.join()); + + let (root_recv, _, jobs_result) = tokio::join!(root_recv, multibar_block, running_jobs); + let _ = jobs_result?; + match root_recv { + None => Err(anyhow!("No result received...")), + Some(Ok((_, artifacts))) => Ok((artifacts, vec![])), + Some(Err(errors)) => Ok((vec![], errors)), + } + } +} + +/// Helper type: A task with all things attached, but not sender and receivers +/// +/// This is the preparation of the JobTask, but without the associated sender and receiver, because +/// it is not mapped to the task yet. +/// +/// This simply holds data and does not contain any more functionality +struct TaskPreparation<'a> { + /// The UUID of this job + uuid: Uuid, + jobdef: &'a JobDefinition, + + bar: ProgressBar, + + config: &'a Configuration, + source_cache: &'a SourceCache, + scheduler: &'a EndpointScheduler, + merged_stores: &'a MergedStores, + database: Arc<PgConnection>, +} + +/// Helper type for executing one job task +/// +/// This type represents a task for a job that can immediately be executed (see `JobTask::run()`). +struct JobTask<'a> { + /// The UUID of this job + uuid: Uuid, + jobdef: &'a JobDefinition, + + bar: ProgressBar, + + config: &'a Configuration, + source_cache: &'a SourceCache, + scheduler: &'a EndpointScheduler, + merged_stores: &'a MergedStores, + database: Arc<PgConnection>, + + /// Channel where the dependencies arrive + receiver: Receiver<JobResult>, + + /// Channel to send the own build outputs to + sender: Sender<JobResult>, +} + +impl<'a> JobTask<'a> { - if !errors.is_empty() { - break + /// Run the job + /// + /// This function runs the job from this object on the scheduler as soon as all dependend jobs + /// returned successfully. + async fn run(mut self) -> Result<()> { + trace!("[{}]: Running", self.uuid); + + // A list of job run results from dependencies that were received from the tasks for the + // dependencies + let mut received_dependencies: Vec<(Uuid, Vec<Artifact>)> = vec![]; + + // A list of errors that were received from the tasks for the dependencies + let mut received_errors: Vec<(Uuid, Error)> = vec![]; + + // Helper function to check whether all UUIDs are in a list of UUIDs + let all_dependencies_are_in = |dependency_uuids: &[Uuid], list: &[(Uuid, Vec<_>)]| { + dependency_uuids.iter().all(|dependency_uuid| { + list.iter().map(|tpl| tpl.0).any(|id| id == *dependency_uuid) + }) + }; + + // as long as the job definition lists dependencies that are not in the received_dependencies list... + while !all_dependencies_are_in(&self.jobdef.dependencies, &received_dependencies) { + // Update the status bar message + self.bar.set_message({ + &format!("[{} {} {}]: Waiting ({}/{})...", + self.uuid, + self.jobdef.job.package().name(), + self.jobdef.job.package().version(), + received_dependencies.len(), + self.jobdef.dependencies.len()) + }); + trace!("[{}]: Updated bar", self.uuid); + + trace!("[{}]: receiving...", self.uuid); + // receive from the receiver + let continue_receiving = self.perform_receive(&mut received_dependencies, &mut received_errors).await?; + if !continue_receiving { + break; } - // already_built.sort(); // TODO: optimization for binary search in - // above and below contains() clause + trace!("[{}]: Received errors = {:?}", self.uuid, received_errors); + // if there are any errors from child tasks + if !received_errors.is_empty() { + // send them to the parent,... + self.sender.send(Err(received_errors)).await; - if self.jobtree.inner().iter().all(|(uuid, _)| already_built.contains(uuid)) { - break + // ... and stop operation, because the whole tree will fail anyways. + return Ok(()) } } - Ok((artifacts, errors)) - } + // receive items until the channel is empty. + // + // In the above loop, it could happen that we have all dependencies to run, but there is + // another job that reports artifacts. + // We need to collect them, too. + // + // This is techically not possible, because in a tree, we need all results from all childs. + // It just feels better having this in place as well. + // + // Sorry, not sorry. + while self.perform_receive(&mut received_dependencies, &mut received_errors).await? { + ; + } - async fn run_job(&self, jobdef: &JobDefinition, bar: ProgressBar) -> Result<JobResult> { - let dependency_artifacts = self.get_dependency_artifacts_for_jobs(&jobdef.dependencies).await?; - bar.set_message("Preparing..."); + // Map the list of received dependencies from + // Vec<(Uuid, Vec<Artifact>)> + // to + // Vec<Artifact> + let dependency_artifacts = received_dependencies + .iter() + .map(|tpl| tpl.1.iter()) + .flatten() + .cloned() + .collect(); + trace!("[{}]: Dependency artifacts = {:?}", self.uuid, dependency_artifacts); + self.bar.set_message(&format!("[{} {} {}]: Preparing...", + self.uuid, + self.jobdef.job.package().name(), + self.jobdef.job.package().version() + )); + // Create a RunnableJob object let runnable = RunnableJob::build_from_job( - &jobdef.job, - &self.source_cache, - &self.config, + &self.jobdef.job, + self.source_cache, + self.config, dependency_artifacts) .await?; - bar.set_message("Scheduling..."); - let job_uuid = *jobdef.job.uuid(); - match self.scheduler.schedule_job(runnable, bar).await?.run().await { - Err(e) => return Ok(Err((job_uuid, e))), - Ok(db_artifacts) => { - db_artifacts.into_iter() - .map(|db_artifact| async { - trace!("Getting store Artifact for db Artifact: {:?}", db_artifact); - let art = self.get_store_artifact_for(&db_artifact).await?; - trace!("Store Artifact: {:?}", art); - Ok(Ok((art, db_artifact))) - }) - .collect::<futures::stream::FuturesUnordered<_>>() - .collect::<Result<JobResult>>() - .await + self.bar.set_message(&format!("[{} {} {}]: Scheduling...", + self.uuid, + self.jobdef.job.package().name(), + self.jobdef.job.package().version() + )); + let job_uuid = *self.jobdef.job.uuid(); + + // Schedule the job on the scheduler + match self.scheduler.schedule_job(runnable, self.bar).await?.run().await { + // if the scheduler run reports an error, + // that is an error from the actual execution of the job ... + Err(e) => { + trace!("[{}]: Scheduler returned error = {:?}", self.uuid, e); + // ... and we send that to our parent + self.sender.send(Err(vec![(job_uuid, e)])).await?; + }, + + // if the scheduler run reports success, + // it returns the database artifact objects it created! + Ok(mut artifacts) => { + trace!("[{}]: Scheduler returned artifacts = {:?}", self.uuid, artifacts); + artifacts.extend(received_dependencies.into_iter().map(|(_, v)| v.into_iter()).flatten()); + self.sender + .send(Ok((self.uuid, artifacts))) + .await?; }, } + + trace!("[{}]: Finished successfully", self.uuid); + Ok(()) } - /// Get all dependency artifacts for the job from the database + /// Performe a recv() call on the receiving side of the channel /// - /// Use the JobDefinition object and find all dependency outputs in the database - async fn get_dependency_artifacts_for_jobs(&self, uuids: &[Uuid]) -> Result<Vec<Artifact>> { - use crate::schema; - use crate::diesel::ExpressionMethods; - use crate::diesel::QueryDsl; - use crate::diesel::RunQueryDsl; - - // Pseudo code: - // - // * return for uuid in uuids: - // self.database.get(job).get_artifacts() - - schema::artifacts::table - .left_outer_join(schema::jobs::table) - .filter(schema::jobs::uuid.eq_any(uuids)) - .select(schema::artifacts::all_columns) - .load::<dbmodels::Artifact>(&*self.database)? - .iter() - .map(|dbart| self.get_store_artifact_for(dbart)) - .collect::<futures::stream::FuturesUnordered<_>>() - .collect() - .await - } + /// Put the dependencies you received into the `received_dependencies`, the errors in the + /// `received_errors` + /// + /// Return Ok(true) if we should continue operation + /// Return Ok(false) if the channel is empty and we're done receiving + async fn perform_receive(&mut self, received_dependencies: &mut Vec<(Uuid, Vec<Artifact>)>, received_errors: &mut Vec<(Uuid, Error)>) -> Result<bool> { + match self.receiver.recv().await { + Some(Ok(v)) => { + // The task we depend on succeeded and returned an + // (uuid of the job, [Artifact]) + trace!("[{}]: Received: {:?}", self.uuid, v); + received_dependencies.push(v); + Ok(true) + }, + Some(Err(mut e)) => { + // The task we depend on failed + // we log that error for now + trace!("[{}]: Received: {:?}", self.uuid, e); + received_errors.append(&mut e); + Ok(true) + }, + None => { + // The task we depend on finished... we must check what we have now... + trace!("[{}]: Received nothing, channel seems to be empty", self.uuid); - async fn get_store_artifact_for(&self, db_artifact: &dbmodels::Artifact) -> Result<Artifact> { - let p = PathBuf::from(&db_artifact.path); - self.merged_stores - .get_artifact_by_path(&p) - .await? - .ok_or_else(|| { - anyhow!("Artifact not found in {}", p.display()) - }) + // Find all dependencies that we need but which are not received + let received = received_dependencies.iter().map(|tpl| tpl.0).collect::<Vec<_>>(); + let missing_deps: Vec<_> = self.jobdef + .dependencies + .iter() + .filter(|d| !received.contains(d)) + .collect(); + trace!("[{}]: Missing dependencies = {:?}", self.uuid, missing_deps); + + // ... if there are any, error + if !missing_deps.is_empty() { + return Err(anyhow!("Childs finished, but dependencies still missing: {:?}", missing_deps)) + } else { + // all dependencies are received + Ok(false) + } + }, + } } + } + |