summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.toml6
-rw-r--r--src/cli.rs4
-rw-r--r--src/commands/build.rs6
-rw-r--r--src/commands/source.rs86
-rw-r--r--src/commands/tree_of.rs2
-rw-r--r--src/endpoint/configured.rs35
-rw-r--r--src/main.rs1
-rw-r--r--src/orchestrator/orchestrator.rs16
8 files changed, 96 insertions, 60 deletions
diff --git a/Cargo.toml b/Cargo.toml
index b982d11..6c29ecc 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -36,7 +36,7 @@ daggy = { version = "0.7", features = [ "serde" ] }
dialoguer = "0.8"
diesel = { version = ">=1.4.6", features = ["postgres", "chrono", "uuid", "serde_json"] }
diesel_migrations = "*"
-env_logger = "0.8"
+env_logger = "0.9"
filters = "0.4.0"
futures = "0.3"
getset = "0.1"
@@ -94,3 +94,7 @@ rand = "=0.4.3"
# See https://github.com/bitvecto-rs/bitvec/issues/105#issuecomment-778570981
funty = "=1.1.0"
+# Pin, because dialoguer pulls it in, but 1.4.x and newer has MSRV 1.51.0. With
+# the pin here, we enforce the build to not use 1.4.0 or newer.
+zeroize = ">=1.3.0, <1.4.0"
+
diff --git a/src/cli.rs b/src/cli.rs
index a8b0cc1..512e35a 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -27,11 +27,11 @@ pub fn cli<'a>() -> App<'a> {
.version(crate_version!())
.about("Generic Build Orchestration System for building linux packages with docker")
- .after_help(r#"
+ .after_help(indoc::indoc!(r#"
The following environment variables can be passed to butido:
RUST_LOG - to enable logging, for exact usage see the rust cookbook
- "#)
+ "#))
.arg(Arg::new("hide_bars")
.required(false)
diff --git a/src/commands/build.rs b/src/commands/build.rs
index 8825750..bb0c2d9 100644
--- a/src/commands/build.rs
+++ b/src/commands/build.rs
@@ -33,22 +33,22 @@ use tokio_stream::StreamExt;
use uuid::Uuid;
use crate::config::*;
-use crate::filestore::path::StoreRoot;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
+use crate::filestore::path::StoreRoot;
use crate::job::JobResource;
use crate::log::LogItem;
use crate::orchestrator::OrchestratorSetup;
+use crate::package::Dag;
use crate::package::PackageName;
use crate::package::PackageVersion;
use crate::package::Shebang;
-use crate::package::Dag;
use crate::repository::Repository;
use crate::schema;
use crate::source::SourceCache;
+use crate::util::EnvironmentVariableName;
use crate::util::docker::ImageName;
use crate::util::progress::ProgressBars;
-use crate::util::EnvironmentVariableName;
/// Implementation of the "build" subcommand
#[allow(clippy::too_many_arguments)]
diff --git a/src/commands/source.rs b/src/commands/source.rs
index 3e0167d..36cf352 100644
--- a/src/commands/source.rs
+++ b/src/commands/source.rs
@@ -214,6 +214,49 @@ pub async fn download(
repo: Repository,
progressbars: ProgressBars,
) -> Result<()> {
+ async fn perform_download(source: &SourceEntry, bar: &indicatif::ProgressBar) -> Result<()> {
+ trace!("Creating: {:?}", source);
+ let file = source.create().await.with_context(|| {
+ anyhow!(
+ "Creating source file destination: {}",
+ source.path().display()
+ )
+ })?;
+
+ let mut file = tokio::io::BufWriter::new(file);
+ let response = match reqwest::get(source.url().as_ref()).await {
+ Ok(resp) => resp,
+ Err(e) => {
+ bar.finish_with_message(format!("Failed: {}", source.url()));
+ return Err(e).with_context(|| anyhow!("Downloading '{}'", source.url()))
+ }
+ };
+
+ if let Some(len) = response.content_length() {
+ bar.set_length(len);
+ }
+
+ let mut stream = reqwest::get(source.url().as_ref()).await?.bytes_stream();
+ let mut bytes_written = 0;
+ while let Some(bytes) = stream.next().await {
+ let bytes = bytes?;
+ file.write_all(bytes.as_ref()).await?;
+ bytes_written += bytes.len();
+
+ bar.inc(bytes.len() as u64);
+ if let Some(len) = response.content_length() {
+ bar.set_message(format!("Downloading {} ({}/{} bytes)", source.url(), bytes_written, len));
+ } else {
+ bar.set_message(format!("Downloading {} ({} bytes)", source.url(), bytes_written));
+ }
+ }
+
+ file.flush()
+ .await
+ .map_err(Error::from)
+ .map(|_| ())
+ }
+
let force = matches.is_present("force");
let cache = PathBuf::from(config.source_cache_root());
let sc = SourceCache::new(cache);
@@ -260,49 +303,6 @@ pub async fn download(
if source_path_exists && !force {
Err(anyhow!("Source exists: {}", source.path().display()))
} else {
- async fn perform_download(source: &SourceEntry, bar: &indicatif::ProgressBar) -> Result<()> {
- trace!("Creating: {:?}", source);
- let file = source.create().await.with_context(|| {
- anyhow!(
- "Creating source file destination: {}",
- source.path().display()
- )
- })?;
-
- let mut file = tokio::io::BufWriter::new(file);
- let response = match reqwest::get(source.url().as_ref()).await {
- Ok(resp) => resp,
- Err(e) => {
- bar.finish_with_message(format!("Failed: {}", source.url()));
- return Err(e).with_context(|| anyhow!("Downloading '{}'", source.url()))
- }
- };
-
- if let Some(len) = response.content_length() {
- bar.set_length(len);
- }
-
- let mut stream = reqwest::get(source.url().as_ref()).await?.bytes_stream();
- let mut bytes_written = 0;
- while let Some(bytes) = stream.next().await {
- let bytes = bytes?;
- file.write_all(bytes.as_ref()).await?;
- bytes_written += bytes.len();
-
- bar.inc(bytes.len() as u64);
- if let Some(len) = response.content_length() {
- bar.set_message(format!("Downloading {} ({}/{} bytes)", source.url(), bytes_written, len));
- } else {
- bar.set_message(format!("Downloading {} ({} bytes)", source.url(), bytes_written));
- }
- }
-
- file.flush()
- .await
- .map_err(Error::from)
- .map(|_| ())
- }
-
if source_path_exists /* && force is implied by 'if' above*/ {
if let Err(e) = source.remove_file().await {
bar.finish_with_message(format!("Failed to remove existing file: {}", source.path().display()));
diff --git a/src/commands/tree_of.rs b/src/commands/tree_of.rs
index e632ed5..4f78c61 100644
--- a/src/commands/tree_of.rs
+++ b/src/commands/tree_of.rs
@@ -17,9 +17,9 @@ use anyhow::Result;
use clap::ArgMatches;
use resiter::AndThen;
+use crate::package::Dag;
use crate::package::PackageName;
use crate::package::PackageVersionConstraint;
-use crate::package::Dag;
use crate::repository::Repository;
use crate::util::progress::ProgressBars;
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index aecb291..6a1e3be 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -13,18 +13,19 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
-use anyhow::anyhow;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
+use anyhow::anyhow;
use futures::FutureExt;
use getset::{CopyGetters, Getters};
use log::trace;
+use result_inspect::ResultInspect;
use shiplift::Container;
use shiplift::Docker;
use shiplift::ExecContainerOptions;
-use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::RwLock;
+use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::StreamExt;
use typed_builder::TypedBuilder;
@@ -478,6 +479,7 @@ impl<'a> PreparedContainer<'a> {
.containers()
.create(&builder_opts)
.await
+ .with_context(|| anyhow!("Creating container with builder options = {:?}", builder_opts))
.with_context(|| anyhow!("Creating container on '{}'", endpoint.name))?;
trace!("Create info = {:?}", create_info);
Ok(create_info)
@@ -522,12 +524,16 @@ impl<'a> PreparedContainer<'a> {
.with_context(|| anyhow!("Reading file {}", source_path.display()))?;
drop(entry);
- let _ = container.copy_file_into(destination, &buf).await?;
- Ok(())
+ container.copy_file_into(destination, &buf)
+ .await
+ .inspect(|_| trace!("Successfully copied source {} to container {}", source_path.display(), container.id()))
+ .with_context(|| anyhow!("Failed to copy source {} to container {}", source_path.display(), container.id()))
+ .map_err(Error::from)
})
.collect::<futures::stream::FuturesUnordered<_>>()
.collect::<Result<()>>()
.await
+ .inspect(|_| trace!("Successfully copied sources to container {}", container.id()))
.with_context(|| anyhow!("Copying sources to container {}", container.id()))
.map_err(Error::from)
}
@@ -544,7 +550,7 @@ impl<'a> PreparedContainer<'a> {
.iter()
.map(|patch| async move {
let destination = PathBuf::from(crate::consts::PATCH_DIR_PATH).join(patch);
- trace!("Copying patch {} to container at {}/{}", crate::consts::PATCH_DIR_PATH, patch.display(), destination.display());
+ trace!("Copying patch {} to container at {}", patch.display(), destination.display());
let mut buf = vec![];
tokio::fs::OpenOptions::new()
@@ -560,12 +566,18 @@ impl<'a> PreparedContainer<'a> {
.await
.with_context(|| anyhow!("Reading file {}", patch.display()))?;
- let _ = container.copy_file_into(destination, &buf).await?;
- Ok(())
+ container.copy_file_into(destination, &buf)
+ .await
+ .map_err(Error::from)
+ .inspect(|_| trace!("Copying patch {} successfull", patch.display()))
+ .with_context(|| anyhow!("Copying patch {} to container {}", patch.display(), container.id()))
+ .map_err(Error::from)
})
.collect::<futures::stream::FuturesUnordered<_>>()
.collect::<Result<()>>()
.await
+ .map_err(Error::from)
+ .inspect(|_| trace!("Copied all patches"))
.with_context(|| anyhow!("Copying patches to container {}", container.id()))
.map_err(Error::from)
}
@@ -611,7 +623,10 @@ impl<'a> PreparedContainer<'a> {
found = Some(path);
break;
},
- Err(e) => return Err(e),
+ Err(e) => {
+ trace!("Failed to join '{:?}' + '{:?}'", release_store.root_path(), art.display());
+ return Err(e)
+ },
Ok(None) => continue,
}
}
@@ -626,10 +641,12 @@ impl<'a> PreparedContainer<'a> {
art.display()
)
})?;
+ trace!("Successfully read {} into buffer", art.display());
let r = container
.copy_file_into(&destination, &buf)
.await
+ .inspect(|_| trace!("Successfully copied {} to container", art.display()))
.with_context(|| {
anyhow!(
"Copying artifact {} to container {} at {}",
@@ -645,6 +662,7 @@ impl<'a> PreparedContainer<'a> {
.collect::<futures::stream::FuturesUnordered<_>>()
.collect::<Result<Vec<_>>>()
.await
+ .inspect(|_| trace!("Successfully copied all artifacts to the container {}", container.id()))
.with_context(|| anyhow!("Copying artifacts to container {}", container.id()))
.map_err(Error::from)
.map(|_| ())
@@ -658,6 +676,7 @@ impl<'a> PreparedContainer<'a> {
container
.copy_file_into(script_path, script.as_ref().as_bytes())
.await
+ .inspect(|_| trace!("Successfully copied script to container {}", container.id()))
.with_context(|| anyhow!("Copying the script into container {}", container.id()))
.map_err(Error::from)
}
diff --git a/src/main.rs b/src/main.rs
index 59dd559..f32b3a5 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -62,6 +62,7 @@ use logcrate::error;
use rand as _; // Required to make lints happy
use aquamarine as _; // doc-helper crate
use funty as _; // doc-helper crate
+use zeroize as _; // Required to make lints happy
mod cli;
mod commands;
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 4a6cf7d..7e2799d 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -415,6 +415,7 @@ impl<'a> Orchestrator<'a> {
let sender = prep.3.into_inner().unwrap_or_else(|| vec![root_sender.clone()]);
JobTask::new(prep.0, prep.1, sender)
})
+ .inspect(|task| trace!("Running: {}", task.jobdef.job.uuid()))
.map(|task| task.run())
.collect::<futures::stream::FuturesUnordered<_>>();
debug!("Built {} jobs", running_jobs.len());
@@ -422,6 +423,7 @@ impl<'a> Orchestrator<'a> {
let multibar_block = tokio::task::spawn_blocking(move || multibar.join());
let (_, jobs_result) = tokio::join!(multibar_block, running_jobs.collect::<Result<()>>());
let _ = jobs_result?;
+ trace!("All jobs finished");
match root_receiver.recv().await {
None => Err(anyhow!("No result received...")),
Some(Ok(results)) => {
@@ -503,10 +505,19 @@ struct JobTask<'a> {
impl<'a> Drop for JobTask<'a> {
fn drop(&mut self) {
if !self.bar.is_finished() {
- self.bar.finish_with_message(format!("[{} {} {}] Stopped, error on other task",
+ // If there are dependencies, the error is probably from another task
+ // If there are no dependencies, the error was caused by something else
+ let errmsg = if self.jobdef.dependencies.is_empty() {
+ "error occured"
+ } else {
+ "error on other task"
+ };
+
+ self.bar.finish_with_message(format!("[{} {} {}] Stopped, {msg}",
self.jobdef.job.uuid(),
self.jobdef.job.package().name(),
- self.jobdef.job.package().version()));
+ self.jobdef.job.package().version(),
+ msg = errmsg));
}
}
}
@@ -586,6 +597,7 @@ impl<'a> JobTask<'a> {
//
// We only send to one parent, because it doesn't matter
// And we know that we have at least one sender
+ log::error!("[{}]: Received errors = {:?}", self.jobdef.job.uuid(), received_errors);
self.sender[0].send(Err(received_errors)).await;
// ... and stop operation, because the whole tree will fail anyways.