diff options
-rw-r--r-- | Cargo.toml | 6 | ||||
-rw-r--r-- | src/cli.rs | 4 | ||||
-rw-r--r-- | src/commands/build.rs | 6 | ||||
-rw-r--r-- | src/commands/source.rs | 86 | ||||
-rw-r--r-- | src/commands/tree_of.rs | 2 | ||||
-rw-r--r-- | src/endpoint/configured.rs | 35 | ||||
-rw-r--r-- | src/main.rs | 1 | ||||
-rw-r--r-- | src/orchestrator/orchestrator.rs | 16 |
8 files changed, 96 insertions, 60 deletions
@@ -36,7 +36,7 @@ daggy = { version = "0.7", features = [ "serde" ] } dialoguer = "0.8" diesel = { version = ">=1.4.6", features = ["postgres", "chrono", "uuid", "serde_json"] } diesel_migrations = "*" -env_logger = "0.8" +env_logger = "0.9" filters = "0.4.0" futures = "0.3" getset = "0.1" @@ -94,3 +94,7 @@ rand = "=0.4.3" # See https://github.com/bitvecto-rs/bitvec/issues/105#issuecomment-778570981 funty = "=1.1.0" +# Pin, because dialoguer pulls it in, but 1.4.x and newer has MSRV 1.51.0. With +# the pin here, we enforce the build to not use 1.4.0 or newer. +zeroize = ">=1.3.0, <1.4.0" + @@ -27,11 +27,11 @@ pub fn cli<'a>() -> App<'a> { .version(crate_version!()) .about("Generic Build Orchestration System for building linux packages with docker") - .after_help(r#" + .after_help(indoc::indoc!(r#" The following environment variables can be passed to butido: RUST_LOG - to enable logging, for exact usage see the rust cookbook - "#) + "#)) .arg(Arg::new("hide_bars") .required(false) diff --git a/src/commands/build.rs b/src/commands/build.rs index 8825750..bb0c2d9 100644 --- a/src/commands/build.rs +++ b/src/commands/build.rs @@ -33,22 +33,22 @@ use tokio_stream::StreamExt; use uuid::Uuid; use crate::config::*; -use crate::filestore::path::StoreRoot; use crate::filestore::ReleaseStore; use crate::filestore::StagingStore; +use crate::filestore::path::StoreRoot; use crate::job::JobResource; use crate::log::LogItem; use crate::orchestrator::OrchestratorSetup; +use crate::package::Dag; use crate::package::PackageName; use crate::package::PackageVersion; use crate::package::Shebang; -use crate::package::Dag; use crate::repository::Repository; use crate::schema; use crate::source::SourceCache; +use crate::util::EnvironmentVariableName; use crate::util::docker::ImageName; use crate::util::progress::ProgressBars; -use crate::util::EnvironmentVariableName; /// Implementation of the "build" subcommand #[allow(clippy::too_many_arguments)] diff --git a/src/commands/source.rs b/src/commands/source.rs index 3e0167d..36cf352 100644 --- a/src/commands/source.rs +++ b/src/commands/source.rs @@ -214,6 +214,49 @@ pub async fn download( repo: Repository, progressbars: ProgressBars, ) -> Result<()> { + async fn perform_download(source: &SourceEntry, bar: &indicatif::ProgressBar) -> Result<()> { + trace!("Creating: {:?}", source); + let file = source.create().await.with_context(|| { + anyhow!( + "Creating source file destination: {}", + source.path().display() + ) + })?; + + let mut file = tokio::io::BufWriter::new(file); + let response = match reqwest::get(source.url().as_ref()).await { + Ok(resp) => resp, + Err(e) => { + bar.finish_with_message(format!("Failed: {}", source.url())); + return Err(e).with_context(|| anyhow!("Downloading '{}'", source.url())) + } + }; + + if let Some(len) = response.content_length() { + bar.set_length(len); + } + + let mut stream = reqwest::get(source.url().as_ref()).await?.bytes_stream(); + let mut bytes_written = 0; + while let Some(bytes) = stream.next().await { + let bytes = bytes?; + file.write_all(bytes.as_ref()).await?; + bytes_written += bytes.len(); + + bar.inc(bytes.len() as u64); + if let Some(len) = response.content_length() { + bar.set_message(format!("Downloading {} ({}/{} bytes)", source.url(), bytes_written, len)); + } else { + bar.set_message(format!("Downloading {} ({} bytes)", source.url(), bytes_written)); + } + } + + file.flush() + .await + .map_err(Error::from) + .map(|_| ()) + } + let force = matches.is_present("force"); let cache = PathBuf::from(config.source_cache_root()); let sc = SourceCache::new(cache); @@ -260,49 +303,6 @@ pub async fn download( if source_path_exists && !force { Err(anyhow!("Source exists: {}", source.path().display())) } else { - async fn perform_download(source: &SourceEntry, bar: &indicatif::ProgressBar) -> Result<()> { - trace!("Creating: {:?}", source); - let file = source.create().await.with_context(|| { - anyhow!( - "Creating source file destination: {}", - source.path().display() - ) - })?; - - let mut file = tokio::io::BufWriter::new(file); - let response = match reqwest::get(source.url().as_ref()).await { - Ok(resp) => resp, - Err(e) => { - bar.finish_with_message(format!("Failed: {}", source.url())); - return Err(e).with_context(|| anyhow!("Downloading '{}'", source.url())) - } - }; - - if let Some(len) = response.content_length() { - bar.set_length(len); - } - - let mut stream = reqwest::get(source.url().as_ref()).await?.bytes_stream(); - let mut bytes_written = 0; - while let Some(bytes) = stream.next().await { - let bytes = bytes?; - file.write_all(bytes.as_ref()).await?; - bytes_written += bytes.len(); - - bar.inc(bytes.len() as u64); - if let Some(len) = response.content_length() { - bar.set_message(format!("Downloading {} ({}/{} bytes)", source.url(), bytes_written, len)); - } else { - bar.set_message(format!("Downloading {} ({} bytes)", source.url(), bytes_written)); - } - } - - file.flush() - .await - .map_err(Error::from) - .map(|_| ()) - } - if source_path_exists /* && force is implied by 'if' above*/ { if let Err(e) = source.remove_file().await { bar.finish_with_message(format!("Failed to remove existing file: {}", source.path().display())); diff --git a/src/commands/tree_of.rs b/src/commands/tree_of.rs index e632ed5..4f78c61 100644 --- a/src/commands/tree_of.rs +++ b/src/commands/tree_of.rs @@ -17,9 +17,9 @@ use anyhow::Result; use clap::ArgMatches; use resiter::AndThen; +use crate::package::Dag; use crate::package::PackageName; use crate::package::PackageVersionConstraint; -use crate::package::Dag; use crate::repository::Repository; use crate::util::progress::ProgressBars; diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index aecb291..6a1e3be 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -13,18 +13,19 @@ use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; -use anyhow::anyhow; use anyhow::Context; use anyhow::Error; use anyhow::Result; +use anyhow::anyhow; use futures::FutureExt; use getset::{CopyGetters, Getters}; use log::trace; +use result_inspect::ResultInspect; use shiplift::Container; use shiplift::Docker; use shiplift::ExecContainerOptions; -use tokio::sync::mpsc::UnboundedSender; use tokio::sync::RwLock; +use tokio::sync::mpsc::UnboundedSender; use tokio_stream::StreamExt; use typed_builder::TypedBuilder; @@ -478,6 +479,7 @@ impl<'a> PreparedContainer<'a> { .containers() .create(&builder_opts) .await + .with_context(|| anyhow!("Creating container with builder options = {:?}", builder_opts)) .with_context(|| anyhow!("Creating container on '{}'", endpoint.name))?; trace!("Create info = {:?}", create_info); Ok(create_info) @@ -522,12 +524,16 @@ impl<'a> PreparedContainer<'a> { .with_context(|| anyhow!("Reading file {}", source_path.display()))?; drop(entry); - let _ = container.copy_file_into(destination, &buf).await?; - Ok(()) + container.copy_file_into(destination, &buf) + .await + .inspect(|_| trace!("Successfully copied source {} to container {}", source_path.display(), container.id())) + .with_context(|| anyhow!("Failed to copy source {} to container {}", source_path.display(), container.id())) + .map_err(Error::from) }) .collect::<futures::stream::FuturesUnordered<_>>() .collect::<Result<()>>() .await + .inspect(|_| trace!("Successfully copied sources to container {}", container.id())) .with_context(|| anyhow!("Copying sources to container {}", container.id())) .map_err(Error::from) } @@ -544,7 +550,7 @@ impl<'a> PreparedContainer<'a> { .iter() .map(|patch| async move { let destination = PathBuf::from(crate::consts::PATCH_DIR_PATH).join(patch); - trace!("Copying patch {} to container at {}/{}", crate::consts::PATCH_DIR_PATH, patch.display(), destination.display()); + trace!("Copying patch {} to container at {}", patch.display(), destination.display()); let mut buf = vec![]; tokio::fs::OpenOptions::new() @@ -560,12 +566,18 @@ impl<'a> PreparedContainer<'a> { .await .with_context(|| anyhow!("Reading file {}", patch.display()))?; - let _ = container.copy_file_into(destination, &buf).await?; - Ok(()) + container.copy_file_into(destination, &buf) + .await + .map_err(Error::from) + .inspect(|_| trace!("Copying patch {} successfull", patch.display())) + .with_context(|| anyhow!("Copying patch {} to container {}", patch.display(), container.id())) + .map_err(Error::from) }) .collect::<futures::stream::FuturesUnordered<_>>() .collect::<Result<()>>() .await + .map_err(Error::from) + .inspect(|_| trace!("Copied all patches")) .with_context(|| anyhow!("Copying patches to container {}", container.id())) .map_err(Error::from) } @@ -611,7 +623,10 @@ impl<'a> PreparedContainer<'a> { found = Some(path); break; }, - Err(e) => return Err(e), + Err(e) => { + trace!("Failed to join '{:?}' + '{:?}'", release_store.root_path(), art.display()); + return Err(e) + }, Ok(None) => continue, } } @@ -626,10 +641,12 @@ impl<'a> PreparedContainer<'a> { art.display() ) })?; + trace!("Successfully read {} into buffer", art.display()); let r = container .copy_file_into(&destination, &buf) .await + .inspect(|_| trace!("Successfully copied {} to container", art.display())) .with_context(|| { anyhow!( "Copying artifact {} to container {} at {}", @@ -645,6 +662,7 @@ impl<'a> PreparedContainer<'a> { .collect::<futures::stream::FuturesUnordered<_>>() .collect::<Result<Vec<_>>>() .await + .inspect(|_| trace!("Successfully copied all artifacts to the container {}", container.id())) .with_context(|| anyhow!("Copying artifacts to container {}", container.id())) .map_err(Error::from) .map(|_| ()) @@ -658,6 +676,7 @@ impl<'a> PreparedContainer<'a> { container .copy_file_into(script_path, script.as_ref().as_bytes()) .await + .inspect(|_| trace!("Successfully copied script to container {}", container.id())) .with_context(|| anyhow!("Copying the script into container {}", container.id())) .map_err(Error::from) } diff --git a/src/main.rs b/src/main.rs index 59dd559..f32b3a5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -62,6 +62,7 @@ use logcrate::error; use rand as _; // Required to make lints happy use aquamarine as _; // doc-helper crate use funty as _; // doc-helper crate +use zeroize as _; // Required to make lints happy mod cli; mod commands; diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 4a6cf7d..7e2799d 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -415,6 +415,7 @@ impl<'a> Orchestrator<'a> { let sender = prep.3.into_inner().unwrap_or_else(|| vec![root_sender.clone()]); JobTask::new(prep.0, prep.1, sender) }) + .inspect(|task| trace!("Running: {}", task.jobdef.job.uuid())) .map(|task| task.run()) .collect::<futures::stream::FuturesUnordered<_>>(); debug!("Built {} jobs", running_jobs.len()); @@ -422,6 +423,7 @@ impl<'a> Orchestrator<'a> { let multibar_block = tokio::task::spawn_blocking(move || multibar.join()); let (_, jobs_result) = tokio::join!(multibar_block, running_jobs.collect::<Result<()>>()); let _ = jobs_result?; + trace!("All jobs finished"); match root_receiver.recv().await { None => Err(anyhow!("No result received...")), Some(Ok(results)) => { @@ -503,10 +505,19 @@ struct JobTask<'a> { impl<'a> Drop for JobTask<'a> { fn drop(&mut self) { if !self.bar.is_finished() { - self.bar.finish_with_message(format!("[{} {} {}] Stopped, error on other task", + // If there are dependencies, the error is probably from another task + // If there are no dependencies, the error was caused by something else + let errmsg = if self.jobdef.dependencies.is_empty() { + "error occured" + } else { + "error on other task" + }; + + self.bar.finish_with_message(format!("[{} {} {}] Stopped, {msg}", self.jobdef.job.uuid(), self.jobdef.job.package().name(), - self.jobdef.job.package().version())); + self.jobdef.job.package().version(), + msg = errmsg)); } } } @@ -586,6 +597,7 @@ impl<'a> JobTask<'a> { // // We only send to one parent, because it doesn't matter // And we know that we have at least one sender + log::error!("[{}]: Received errors = {:?}", self.jobdef.job.uuid(), received_errors); self.sender[0].send(Err(received_errors)).await; // ... and stop operation, because the whole tree will fail anyways. |