From 1438fc5ee803cf78ed29bf0e030b3cf1fd72f33e Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Thu, 2 Dec 2021 13:09:10 +0100 Subject: Fix: Use already existing response type to get byte stream, instead of starting new GET request Signed-off-by: Matthias Beyer --- src/commands/source.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/commands/source.rs b/src/commands/source.rs index 68a8926..2709df8 100644 --- a/src/commands/source.rs +++ b/src/commands/source.rs @@ -261,7 +261,8 @@ pub async fn download( bar.set_length(len); } - let mut stream = reqwest::get(source.url().as_ref()).await?.bytes_stream(); + let content_length = response.content_length(); + let mut stream = response.bytes_stream(); let mut bytes_written = 0; while let Some(bytes) = stream.next().await { let bytes = bytes?; @@ -269,7 +270,7 @@ pub async fn download( bytes_written += bytes.len(); bar.inc(bytes.len() as u64); - if let Some(len) = response.content_length() { + if let Some(len) = content_length { bar.set_message(format!("Downloading {} ({}/{} bytes)", source.url(), bytes_written, len)); } else { bar.set_message(format!("Downloading {} ({} bytes)", source.url(), bytes_written)); -- cgit v1.2.3 From 366c26f7f47d55f99b8684e95d78c9740dda1e34 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Thu, 2 Dec 2021 13:15:38 +0100 Subject: Remove spinner support in progress bar helper Signed-off-by: Matthias Beyer --- src/main.rs | 1 - src/util/progress.rs | 14 +------------- 2 files changed, 1 insertion(+), 14 deletions(-) diff --git a/src/main.rs b/src/main.rs index c0ee15a..a120523 100644 --- a/src/main.rs +++ b/src/main.rs @@ -139,7 +139,6 @@ async fn main() -> Result<()> { let hide_bars = cli.is_present("hide_bars") || crate::util::stdout_is_pipe(); let progressbars = ProgressBars::setup( config.progress_format().clone(), - config.spinner_format().clone(), hide_bars, ); diff --git a/src/util/progress.rs b/src/util/progress.rs index b2edc08..3a42b52 100644 --- a/src/util/progress.rs +++ b/src/util/progress.rs @@ -14,17 +14,15 @@ use getset::CopyGetters; #[derive(Clone, Debug, CopyGetters)] pub struct ProgressBars { bar_template: String, - spinner_template: String, #[getset(get_copy = "pub")] hide: bool, } impl ProgressBars { - pub fn setup(bar_template: String, spinner_template: String, hide: bool) -> Self { + pub fn setup(bar_template: String, hide: bool) -> Self { ProgressBars { bar_template, - spinner_template, hide, } } @@ -38,14 +36,4 @@ impl ProgressBars { b } } - - pub fn spinner(&self) -> ProgressBar { - if self.hide { - ProgressBar::hidden() - } else { - let bar = ProgressBar::new_spinner(); - bar.set_style(ProgressStyle::default_spinner().template(&self.spinner_template)); - bar - } - } } -- cgit v1.2.3 From 5d98b4b6210c0ef5635cc5a97be61ae7d93ac139 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Thu, 2 Dec 2021 13:17:10 +0100 Subject: Remove multiple progress when downloading This patch removes the multiple progress bars when downloading packages, and instead moves to a single progress bar, because downloading (for example) 50 packages at once resulted in an unusable progress bar list. Signed-off-by: Matthias Beyer --- src/commands/source.rs | 68 +++++++++++++++++++------------------------------- 1 file changed, 25 insertions(+), 43 deletions(-) diff --git a/src/commands/source.rs b/src/commands/source.rs index 2709df8..a24aac6 100644 --- a/src/commands/source.rs +++ b/src/commands/source.rs @@ -15,13 +15,15 @@ use std::path::PathBuf; use std::convert::TryFrom; use std::str::FromStr; -use anyhow::anyhow; use anyhow::Context; use anyhow::Error; use anyhow::Result; +use anyhow::anyhow; use clap::ArgMatches; use colored::Colorize; use log::{info, trace}; +use result_inspect::ResultInspect; +use result_inspect::ResultInspectErr; use tokio::io::AsyncWriteExt; use tokio_stream::StreamExt; @@ -224,7 +226,7 @@ pub async fn download( repo: Repository, progressbars: ProgressBars, ) -> Result<()> { - async fn perform_download(source: &SourceEntry, bar: &indicatif::ProgressBar, timeout: Option) -> Result<()> { + async fn perform_download(source: &SourceEntry, timeout: Option) -> Result<()> { trace!("Creating: {:?}", source); let file = source.create().await.with_context(|| { anyhow!( @@ -252,29 +254,13 @@ pub async fn download( let response = match client.execute(request).await { Ok(resp) => resp, Err(e) => { - bar.finish_with_message(format!("Failed: {}", source.url())); return Err(e).with_context(|| anyhow!("Downloading '{}'", source.url())) } }; - if let Some(len) = response.content_length() { - bar.set_length(len); - } - - let content_length = response.content_length(); let mut stream = response.bytes_stream(); - let mut bytes_written = 0; while let Some(bytes) = stream.next().await { - let bytes = bytes?; - file.write_all(bytes.as_ref()).await?; - bytes_written += bytes.len(); - - bar.inc(bytes.len() as u64); - if let Some(len) = content_length { - bar.set_message(format!("Downloading {} ({}/{} bytes)", source.url(), bytes_written, len)); - } else { - bar.set_message(format!("Downloading {} ({} bytes)", source.url(), bytes_written)); - } + file.write_all(bytes?.as_ref()).await?; } file.flush() @@ -298,20 +284,14 @@ pub async fn download( .value_of("package_version") .map(PackageVersionConstraint::try_from) .transpose()?; - let multi = { - let mp = indicatif::MultiProgress::new(); - if progressbars.hide() { - mp.set_draw_target(indicatif::ProgressDrawTarget::hidden()); - } - mp - }; let matching_regexp = matches.value_of("matching") .map(crate::commands::util::mk_package_name_regex) .transpose()?; - let r = repo - .packages() + let progressbar = progressbars.bar(); + + repo.packages() .filter(|p| { match (pname.as_ref(), pvers.as_ref(), matching_regexp.as_ref()) { (None, None, None) => true, @@ -326,8 +306,7 @@ pub async fn download( }) .map(|p| { sc.sources_for(p).into_iter().map(|source| { - let bar = multi.add(progressbars.spinner()); - bar.set_message(format!("Downloading {}", source.url())); + let progressbar = progressbar.clone(); async move { let source_path_exists = source.path().exists(); if !source_path_exists && source.download_manually() { @@ -344,30 +323,33 @@ pub async fn download( } else { if source_path_exists /* && force is implied by 'if' above*/ { if let Err(e) = source.remove_file().await { - bar.finish_with_message(format!("Failed to remove existing file: {}", source.path().display())); + progressbar.inc(1); return Err(e) } } - if let Err(e) = perform_download(&source, &bar, timeout).await { - bar.finish_with_message(format!("Failed: {}", source.url())); - Err(e) - } else { - bar.finish_with_message(format!("Finished: {}", source.url())); - Ok(()) - } + perform_download(&source, timeout) + .await + .inspect(|_| { + progressbar.inc(1); + }) } } }) }) .flatten() .collect::>() - .collect::>>(); - - let multibar_block = tokio::task::spawn_blocking(move || multi.join()); - let (r, _) = tokio::join!(r, multibar_block); - r.into_iter().collect() + .collect::>>() + .await + .into_iter() + .collect::>() + .inspect_err(|_| { + progressbar.finish_with_message("At least one package failed"); + }) + .inspect(|_| { + progressbar.finish_with_message("Succeeded"); + }) } async fn of( -- cgit v1.2.3 From 9ad578dc96cdf7b74d5f58f7798d0327523b07f7 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Thu, 2 Dec 2021 13:44:08 +0100 Subject: Add progress wrapper When creating only one progress bar for all downloads, we still want to know how many downloads are happening and the remaining number of bytes to be received. This patch implements a ProgressWrapper that synchronizes between the download tasks and the progress bar. Signed-off-by: Matthias Beyer --- src/commands/source.rs | 121 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 99 insertions(+), 22 deletions(-) diff --git a/src/commands/source.rs b/src/commands/source.rs index a24aac6..d4e6af5 100644 --- a/src/commands/source.rs +++ b/src/commands/source.rs @@ -10,10 +10,11 @@ //! Implementation of the 'source' subcommand +use std::convert::TryFrom; use std::io::Write; use std::path::PathBuf; -use std::convert::TryFrom; use std::str::FromStr; +use std::sync::Arc; use anyhow::Context; use anyhow::Error; @@ -21,10 +22,9 @@ use anyhow::Result; use anyhow::anyhow; use clap::ArgMatches; use colored::Colorize; -use log::{info, trace}; -use result_inspect::ResultInspect; -use result_inspect::ResultInspectErr; +use log::{debug, info, trace}; use tokio::io::AsyncWriteExt; +use tokio::sync::Mutex; use tokio_stream::StreamExt; use crate::config::*; @@ -226,7 +226,68 @@ pub async fn download( repo: Repository, progressbars: ProgressBars, ) -> Result<()> { - async fn perform_download(source: &SourceEntry, timeout: Option) -> Result<()> { + #[derive(Clone)] + struct ProgressWrapper { + download_count: u64, + finished_downloads: u64, + current_bytes: usize, + sum_bytes: u64, + bar: Arc>, + } + + impl ProgressWrapper { + fn new(bar: indicatif::ProgressBar) -> Self { + Self { + download_count: 0, + finished_downloads: 0, + current_bytes: 0, + sum_bytes: 0, + bar: Arc::new(Mutex::new(bar)) + } + } + + async fn inc_download_count(&mut self) { + self.download_count += 1; + self.set_message().await; + } + + async fn inc_download_bytes(&mut self, bytes: u64) { + self.sum_bytes += bytes; + self.set_message().await; + } + + async fn finish_one_download(&mut self) { + self.finished_downloads += 1; + self.bar.lock().await.inc(1); + self.set_message().await; + } + + async fn add_bytes(&mut self, len: usize) { + self.current_bytes += len; + self.set_message().await; + } + + async fn set_message(&self) { + let bar = self.bar.lock().await; + bar.set_message(format!("Downloading ({current_bytes}/{sum_bytes} bytes, {dlfinished}/{dlsum} downloads finished", + current_bytes = self.current_bytes, + sum_bytes = self.sum_bytes, + dlfinished = self.finished_downloads, + dlsum = self.download_count)); + } + + async fn success(&self) { + let bar = self.bar.lock().await; + bar.finish_with_message(format!("Succeeded {}/{} downloads", self.finished_downloads, self.download_count)); + } + + async fn error(&self) { + let bar = self.bar.lock().await; + bar.finish_with_message(format!("At least one download of {} failed", self.download_count)); + } + } + + async fn perform_download(source: &SourceEntry, progress: Arc>, timeout: Option) -> Result<()> { trace!("Creating: {:?}", source); let file = source.create().await.with_context(|| { anyhow!( @@ -258,11 +319,22 @@ pub async fn download( } }; + progress.lock() + .await + .inc_download_bytes(response.content_length().unwrap_or(0)) + .await; + let mut stream = response.bytes_stream(); while let Some(bytes) = stream.next().await { - file.write_all(bytes?.as_ref()).await?; + let bytes = bytes?; + file.write_all(bytes.as_ref()).await?; + progress.lock() + .await + .add_bytes(bytes.len()) + .await } + progress.lock().await.finish_one_download().await; file.flush() .await .map_err(Error::from) @@ -289,9 +361,9 @@ pub async fn download( .map(crate::commands::util::mk_package_name_regex) .transpose()?; - let progressbar = progressbars.bar(); + let progressbar = Arc::new(Mutex::new(ProgressWrapper::new(progressbars.bar()))); - repo.packages() + let r = repo.packages() .filter(|p| { match (pname.as_ref(), pvers.as_ref(), matching_regexp.as_ref()) { (None, None, None) => true, @@ -321,19 +393,21 @@ pub async fn download( if source_path_exists && !force { Err(anyhow!("Source exists: {}", source.path().display())) } else { + progressbar.lock() + .await + .inc_download_count() + .await; + if source_path_exists /* && force is implied by 'if' above*/ { if let Err(e) = source.remove_file().await { - progressbar.inc(1); + progressbar.lock().await.finish_one_download().await; return Err(e) } } - - perform_download(&source, timeout) - .await - .inspect(|_| { - progressbar.inc(1); - }) + perform_download(&source, progressbar.clone(), timeout).await?; + progressbar.lock().await.finish_one_download().await; + Ok(()) } } }) @@ -343,13 +417,16 @@ pub async fn download( .collect::>>() .await .into_iter() - .collect::>() - .inspect_err(|_| { - progressbar.finish_with_message("At least one package failed"); - }) - .inspect(|_| { - progressbar.finish_with_message("Succeeded"); - }) + .collect::>(); + + if r.is_err() { + progressbar.lock().await.error().await; + } else { + progressbar.lock().await.success().await; + } + + debug!("r = {:?}", r); + r } async fn of( -- cgit v1.2.3 From e1b39ee4873da5a8b488419764fa1ae0d8220056 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Thu, 2 Dec 2021 14:01:06 +0100 Subject: Move source command implementation to submodule Signed-off-by: Matthias Beyer --- src/commands/source.rs | 475 --------------------------------------------- src/commands/source/mod.rs | 475 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 475 insertions(+), 475 deletions(-) delete mode 100644 src/commands/source.rs create mode 100644 src/commands/source/mod.rs diff --git a/src/commands/source.rs b/src/commands/source.rs deleted file mode 100644 index d4e6af5..0000000 --- a/src/commands/source.rs +++ /dev/null @@ -1,475 +0,0 @@ -// -// Copyright (c) 2020-2021 science+computing ag and other contributors -// -// This program and the accompanying materials are made -// available under the terms of the Eclipse Public License 2.0 -// which is available at https://www.eclipse.org/legal/epl-2.0/ -// -// SPDX-License-Identifier: EPL-2.0 -// - -//! Implementation of the 'source' subcommand - -use std::convert::TryFrom; -use std::io::Write; -use std::path::PathBuf; -use std::str::FromStr; -use std::sync::Arc; - -use anyhow::Context; -use anyhow::Error; -use anyhow::Result; -use anyhow::anyhow; -use clap::ArgMatches; -use colored::Colorize; -use log::{debug, info, trace}; -use tokio::io::AsyncWriteExt; -use tokio::sync::Mutex; -use tokio_stream::StreamExt; - -use crate::config::*; -use crate::package::Package; -use crate::package::PackageName; -use crate::package::PackageVersionConstraint; -use crate::repository::Repository; -use crate::source::*; -use crate::util::progress::ProgressBars; - -/// Implementation of the "source" subcommand -pub async fn source( - matches: &ArgMatches, - config: &Configuration, - repo: Repository, - progressbars: ProgressBars, -) -> Result<()> { - match matches.subcommand() { - Some(("verify", matches)) => verify(matches, config, repo, progressbars).await, - Some(("list-missing", matches)) => list_missing(matches, config, repo).await, - Some(("url", matches)) => url(matches, repo).await, - Some(("download", matches)) => download(matches, config, repo, progressbars).await, - Some(("of", matches)) => of(matches, config, repo).await, - Some((other, _)) => return Err(anyhow!("Unknown subcommand: {}", other)), - None => Err(anyhow!("No subcommand")), - } -} - -pub async fn verify( - matches: &ArgMatches, - config: &Configuration, - repo: Repository, - progressbars: ProgressBars, -) -> Result<()> { - let sc = SourceCache::new(config.source_cache_root().clone()); - let pname = matches - .value_of("package_name") - .map(String::from) - .map(PackageName::from); - let pvers = matches - .value_of("package_version") - .map(PackageVersionConstraint::try_from) - .transpose()?; - - let matching_regexp = matches.value_of("matching") - .map(crate::commands::util::mk_package_name_regex) - .transpose()?; - - let packages = repo - .packages() - .filter(|p| { - match (pname.as_ref(), pvers.as_ref(), matching_regexp.as_ref()) { - (None, None, None) => true, - (Some(pname), None, None) => p.name() == pname, - (Some(pname), Some(vers), None) => p.name() == pname && vers.matches(p.version()), - (None, None, Some(regex)) => regex.is_match(p.name()), - - (_, _, _) => { - panic!("This should not be possible, either we select packages by name and (optionally) version, or by regex.") - }, - } - }) - .inspect(|p| trace!("Found for verification: {} {}", p.name(), p.version())); - - verify_impl(packages, &sc, &progressbars).await -} - -pub(in crate::commands) async fn verify_impl<'a, I>( - packages: I, - sc: &SourceCache, - progressbars: &ProgressBars, -) -> Result<()> -where - I: Iterator + 'a, -{ - let sources = packages - .map(|p| sc.sources_for(p).into_iter()) - .flatten() - .collect::>(); - - let bar = progressbars.bar(); - bar.set_message("Verifying sources"); - bar.set_length(sources.len() as u64); - - let results = sources.into_iter() - .map(|src| (bar.clone(), src)) - .map(|(bar, source)| async move { - trace!("Verifying: {}", source.path().display()); - if source.path().exists() { - trace!("Exists: {}", source.path().display()); - source.verify_hash().await.with_context(|| { - anyhow!("Hash verification failed for: {}", source.path().display()) - })?; - - trace!("Success verifying: {}", source.path().display()); - bar.inc(1); - Ok(()) - } else { - trace!("Failed verifying: {}", source.path().display()); - bar.inc(1); - Err(anyhow!("Source missing: {}", source.path().display())) - } - }) - .collect::>() - .collect::>>() - .await; - - info!("Verification processes finished"); - - if results.iter().any(Result::is_err) { - bar.finish_with_message("Source verification failed"); - } else { - bar.finish_with_message("Source verification successfull"); - } - - let out = std::io::stdout(); - let mut any_error = false; - for result in results { - if let Err(e) = result { - let mut outlock = out.lock(); - any_error = true; - for cause in e.chain() { - let _ = writeln!(outlock, "Error: {}", cause.to_string().red()); - } - let _ = writeln!(outlock); - } - } - - if any_error { - Err(anyhow!( - "At least one package failed with source verification" - )) - } else { - Ok(()) - } -} - -pub async fn list_missing(_: &ArgMatches, config: &Configuration, repo: Repository) -> Result<()> { - let sc = SourceCache::new(config.source_cache_root().clone()); - let out = std::io::stdout(); - let mut outlock = out.lock(); - - repo.packages().try_for_each(|p| { - for source in sc.sources_for(p) { - if !source.path().exists() { - writeln!( - outlock, - "{} {} -> {}", - p.name(), - p.version(), - source.path().display() - )?; - } - } - - Ok(()) - }) -} - -pub async fn url(matches: &ArgMatches, repo: Repository) -> Result<()> { - let out = std::io::stdout(); - let mut outlock = out.lock(); - - let pname = matches - .value_of("package_name") - .map(String::from) - .map(PackageName::from); - let pvers = matches - .value_of("package_version") - .map(PackageVersionConstraint::try_from) - .transpose()?; - - repo.packages() - .filter(|p| pname.as_ref().map(|n| p.name() == n).unwrap_or(true)) - .filter(|p| { - pvers - .as_ref() - .map(|v| v.matches(p.version())) - .unwrap_or(true) - }) - .try_for_each(|p| { - p.sources().iter().try_for_each(|(source_name, source)| { - writeln!( - outlock, - "{} {} -> {} = {}", - p.name(), - p.version(), - source_name, - source.url() - ) - .map_err(Error::from) - }) - }) -} - -pub async fn download( - matches: &ArgMatches, - config: &Configuration, - repo: Repository, - progressbars: ProgressBars, -) -> Result<()> { - #[derive(Clone)] - struct ProgressWrapper { - download_count: u64, - finished_downloads: u64, - current_bytes: usize, - sum_bytes: u64, - bar: Arc>, - } - - impl ProgressWrapper { - fn new(bar: indicatif::ProgressBar) -> Self { - Self { - download_count: 0, - finished_downloads: 0, - current_bytes: 0, - sum_bytes: 0, - bar: Arc::new(Mutex::new(bar)) - } - } - - async fn inc_download_count(&mut self) { - self.download_count += 1; - self.set_message().await; - } - - async fn inc_download_bytes(&mut self, bytes: u64) { - self.sum_bytes += bytes; - self.set_message().await; - } - - async fn finish_one_download(&mut self) { - self.finished_downloads += 1; - self.bar.lock().await.inc(1); - self.set_message().await; - } - - async fn add_bytes(&mut self, len: usize) { - self.current_bytes += len; - self.set_message().await; - } - - async fn set_message(&self) { - let bar = self.bar.lock().await; - bar.set_message(format!("Downloading ({current_bytes}/{sum_bytes} bytes, {dlfinished}/{dlsum} downloads finished", - current_bytes = self.current_bytes, - sum_bytes = self.sum_bytes, - dlfinished = self.finished_downloads, - dlsum = self.download_count)); - } - - async fn success(&self) { - let bar = self.bar.lock().await; - bar.finish_with_message(format!("Succeeded {}/{} downloads", self.finished_downloads, self.download_count)); - } - - async fn error(&self) { - let bar = self.bar.lock().await; - bar.finish_with_message(format!("At least one download of {} failed", self.download_count)); - } - } - - async fn perform_download(source: &SourceEntry, progress: Arc>, timeout: Option) -> Result<()> { - trace!("Creating: {:?}", source); - let file = source.create().await.with_context(|| { - anyhow!( - "Creating source file destination: {}", - source.path().display() - ) - })?; - - let mut file = tokio::io::BufWriter::new(file); - let client_builder = reqwest::Client::builder() - .redirect(reqwest::redirect::Policy::limited(10)); - - let client_builder = if let Some(to) = timeout { - client_builder.timeout(std::time::Duration::from_secs(to)) - } else { - client_builder - }; - - let client = client_builder.build().context("Building HTTP client failed")?; - - let request = client.get(source.url().as_ref()) - .build() - .with_context(|| anyhow!("Building request for {} failed", source.url().as_ref()))?; - - let response = match client.execute(request).await { - Ok(resp) => resp, - Err(e) => { - return Err(e).with_context(|| anyhow!("Downloading '{}'", source.url())) - } - }; - - progress.lock() - .await - .inc_download_bytes(response.content_length().unwrap_or(0)) - .await; - - let mut stream = response.bytes_stream(); - while let Some(bytes) = stream.next().await { - let bytes = bytes?; - file.write_all(bytes.as_ref()).await?; - progress.lock() - .await - .add_bytes(bytes.len()) - .await - } - - progress.lock().await.finish_one_download().await; - file.flush() - .await - .map_err(Error::from) - .map(|_| ()) - } - - let force = matches.is_present("force"); - let timeout = matches.value_of("timeout") - .map(u64::from_str) - .transpose() - .context("Parsing timeout argument to integer")?; - let cache = PathBuf::from(config.source_cache_root()); - let sc = SourceCache::new(cache); - let pname = matches - .value_of("package_name") - .map(String::from) - .map(PackageName::from); - let pvers = matches - .value_of("package_version") - .map(PackageVersionConstraint::try_from) - .transpose()?; - - let matching_regexp = matches.value_of("matching") - .map(crate::commands::util::mk_package_name_regex) - .transpose()?; - - let progressbar = Arc::new(Mutex::new(ProgressWrapper::new(progressbars.bar()))); - - let r = repo.packages() - .filter(|p| { - match (pname.as_ref(), pvers.as_ref(), matching_regexp.as_ref()) { - (None, None, None) => true, - (Some(pname), None, None) => p.name() == pname, - (Some(pname), Some(vers), None) => p.name() == pname && vers.matches(p.version()), - (None, None, Some(regex)) => regex.is_match(p.name()), - - (_, _, _) => { - panic!("This should not be possible, either we select packages by name and (optionally) version, or by regex.") - }, - } - }) - .map(|p| { - sc.sources_for(p).into_iter().map(|source| { - let progressbar = progressbar.clone(); - async move { - let source_path_exists = source.path().exists(); - if !source_path_exists && source.download_manually() { - return Err(anyhow!( - "Cannot download source that is marked for manual download" - )) - .context(anyhow!("Creating source: {}", source.path().display())) - .context(anyhow!("Downloading source: {}", source.url())) - .map_err(Error::from); - } - - if source_path_exists && !force { - Err(anyhow!("Source exists: {}", source.path().display())) - } else { - progressbar.lock() - .await - .inc_download_count() - .await; - - if source_path_exists /* && force is implied by 'if' above*/ { - if let Err(e) = source.remove_file().await { - progressbar.lock().await.finish_one_download().await; - return Err(e) - } - } - - perform_download(&source, progressbar.clone(), timeout).await?; - progressbar.lock().await.finish_one_download().await; - Ok(()) - } - } - }) - }) - .flatten() - .collect::>() - .collect::>>() - .await - .into_iter() - .collect::>(); - - if r.is_err() { - progressbar.lock().await.error().await; - } else { - progressbar.lock().await.success().await; - } - - debug!("r = {:?}", r); - r -} - -async fn of( - matches: &ArgMatches, - config: &Configuration, - repo: Repository, -) -> Result<()> { - let cache = PathBuf::from(config.source_cache_root()); - let sc = SourceCache::new(cache); - let pname = matches - .value_of("package_name") - .map(String::from) - .map(PackageName::from); - let pvers = matches - .value_of("package_version") - .map(PackageVersionConstraint::try_from) - .transpose()?; - - repo.packages() - .filter(|p| pname.as_ref().map(|n| p.name() == n).unwrap_or(true)) - .filter(|p| { - pvers - .as_ref() - .map(|v| v.matches(p.version())) - .unwrap_or(true) - }) - .map(|p| { - let pathes = sc.sources_for(p) - .into_iter() - .map(|source| source.path()) - .collect::>(); - - (p, pathes) - }) - .fold(Ok(std::io::stdout()), |out, (package, pathes)| { - out.and_then(|mut out| { - writeln!(out, "{} {}", package.name(), package.version())?; - for path in pathes { - writeln!(out, "\t{}", path.display())?; - } - - Ok(out) - }) - }) - .map(|_| ()) -} diff --git a/src/commands/source/mod.rs b/src/commands/source/mod.rs new file mode 100644 index 0000000..d4e6af5 --- /dev/null +++ b/src/commands/source/mod.rs @@ -0,0 +1,475 @@ +// +// Copyright (c) 2020-2021 science+computing ag and other contributors +// +// This program and the accompanying materials are made +// available under the terms of the Eclipse Public License 2.0 +// which is available at https://www.eclipse.org/legal/epl-2.0/ +// +// SPDX-License-Identifier: EPL-2.0 +// + +//! Implementation of the 'source' subcommand + +use std::convert::TryFrom; +use std::io::Write; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; + +use anyhow::Context; +use anyhow::Error; +use anyhow::Result; +use anyhow::anyhow; +use clap::ArgMatches; +use colored::Colorize; +use log::{debug, info, trace}; +use tokio::io::AsyncWriteExt; +use tokio::sync::Mutex; +use tokio_stream::StreamExt; + +use crate::config::*; +use crate::package::Package; +use crate::package::PackageName; +use crate::package::PackageVersionConstraint; +use crate::repository::Repository; +use crate::source::*; +use crate::util::progress::ProgressBars; + +/// Implementation of the "source" subcommand +pub async fn source( + matches: &ArgMatches, + config: &Configuration, + repo: Repository, + progressbars: ProgressBars, +) -> Result<()> { + match matches.subcommand() { + Some(("verify", matches)) => verify(matches, config, repo, progressbars).await, + Some(("list-missing", matches)) => list_missing(matches, config, repo).await, + Some(("url", matches)) => url(matches, repo).await, + Some(("download", matches)) => download(matches, config, repo, progressbars).await, + Some(("of", matches)) => of(matches, config, repo).await, + Some((other, _)) => return Err(anyhow!("Unknown subcommand: {}", other)), + None => Err(anyhow!("No subcommand")), + } +} + +pub async fn verify( + matches: &ArgMatches, + config: &Configuration, + repo: Repository, + progressbars: ProgressBars, +) -> Result<()> { + let sc = SourceCache::new(config.source_cache_root().clone()); + let pname = matches + .value_of("package_name") + .map(String::from) + .map(PackageName::from); + let pvers = matches + .value_of("package_version") + .map(PackageVersionConstraint::try_from) + .transpose()?; + + let matching_regexp = matches.value_of("matching") + .map(crate::commands::util::mk_package_name_regex) + .transpose()?; + + let packages = repo + .packages() + .filter(|p| { + match (pname.as_ref(), pvers.as_ref(), matching_regexp.as_ref()) { + (None, None, None) => true, + (Some(pname), None, None) => p.name() == pname, + (Some(pname), Some(vers), None) => p.name() == pname && vers.matches(p.version()), + (None, None, Some(regex)) => regex.is_match(p.name()), + + (_, _, _) => { + panic!("This should not be possible, either we select packages by name and (optionally) version, or by regex.") + }, + } + }) + .inspect(|p| trace!("Found for verification: {} {}", p.name(), p.version())); + + verify_impl(packages, &sc, &progressbars).await +} + +pub(in crate::commands) async fn verify_impl<'a, I>( + packages: I, + sc: &SourceCache, + progressbars: &ProgressBars, +) -> Result<()> +where + I: Iterator + 'a, +{ + let sources = packages + .map(|p| sc.sources_for(p).into_iter()) + .flatten() + .collect::>(); + + let bar = progressbars.bar(); + bar.set_message("Verifying sources"); + bar.set_length(sources.len() as u64); + + let results = sources.into_iter() + .map(|src| (bar.clone(), src)) + .map(|(bar, source)| async move { + trace!("Verifying: {}", source.path().display()); + if source.path().exists() { + trace!("Exists: {}", source.path().display()); + source.verify_hash().await.with_context(|| { + anyhow!("Hash verification failed for: {}", source.path().display()) + })?; + + trace!("Success verifying: {}", source.path().display()); + bar.inc(1); + Ok(()) + } else { + trace!("Failed verifying: {}", source.path().display()); + bar.inc(1); + Err(anyhow!("Source missing: {}", source.path().display())) + } + }) + .collect::>() + .collect::>>() + .await; + + info!("Verification processes finished"); + + if results.iter().any(Result::is_err) { + bar.finish_with_message("Source verification failed"); + } else { + bar.finish_with_message("Source verification successfull"); + } + + let out = std::io::stdout(); + let mut any_error = false; + for result in results { + if let Err(e) = result { + let mut outlock = out.lock(); + any_error = true; + for cause in e.chain() { + let _ = writeln!(outlock, "Error: {}", cause.to_string().red()); + } + let _ = writeln!(outlock); + } + } + + if any_error { + Err(anyhow!( + "At least one package failed with source verification" + )) + } else { + Ok(()) + } +} + +pub async fn list_missing(_: &ArgMatches, config: &Configuration, repo: Repository) -> Result<()> { + let sc = SourceCache::new(config.source_cache_root().clone()); + let out = std::io::stdout(); + let mut outlock = out.lock(); + + repo.packages().try_for_each(|p| { + for source in sc.sources_for(p) { + if !source.path().exists() { + writeln!( + outlock, + "{} {} -> {}", + p.name(), + p.version(), + source.path().display() + )?; + } + } + + Ok(()) + }) +} + +pub async fn url(matches: &ArgMatches, repo: Repository) -> Result<()> { + let out = std::io::stdout(); + let mut outlock = out.lock(); + + let pname = matches + .value_of("package_name") + .map(String::from) + .map(PackageName::from); + let pvers = matches + .value_of("package_version") + .map(PackageVersionConstraint::try_from) + .transpose()?; + + repo.packages() + .filter(|p| pname.as_ref().map(|n| p.name() == n).unwrap_or(true)) + .filter(|p| { + pvers + .as_ref() + .map(|v| v.matches(p.version())) + .unwrap_or(true) + }) + .try_for_each(|p| { + p.sources().iter().try_for_each(|(source_name, source)| { + writeln!( + outlock, + "{} {} -> {} = {}", + p.name(), + p.version(), + source_name, + source.url() + ) + .map_err(Error::from) + }) + }) +} + +pub async fn download( + matches: &ArgMatches, + config: &Configuration, + repo: Repository, + progressbars: ProgressBars, +) -> Result<()> { + #[derive(Clone)] + struct ProgressWrapper { + download_count: u64, + finished_downloads: u64, + current_bytes: usize, + sum_bytes: u64, + bar: Arc>, + } + + impl ProgressWrapper { + fn new(bar: indicatif::ProgressBar) -> Self { + Self { + download_count: 0, + finished_downloads: 0, + current_bytes: 0, + sum_bytes: 0, + bar: Arc::new(Mutex::new(bar)) + } + } + + async fn inc_download_count(&mut self) { + self.download_count += 1; + self.set_message().await; + } + + async fn inc_download_bytes(&mut self, bytes: u64) { + self.sum_bytes += bytes; + self.set_message().await; + } + + async fn finish_one_download(&mut self) { + self.finished_downloads += 1; + self.bar.lock().await.inc(1); + self.set_message().await; + } + + async fn add_bytes(&mut self, len: usize) { + self.current_bytes += len; + self.set_message().await; + } + + async fn set_message(&self) { + let bar = self.bar.lock().await; + bar.set_message(format!("Downloading ({current_bytes}/{sum_bytes} bytes, {dlfinished}/{dlsum} downloads finished", + current_bytes = self.current_bytes, + sum_bytes = self.sum_bytes, + dlfinished = self.finished_downloads, + dlsum = self.download_count)); + } + + async fn success(&self) { + let bar = self.bar.lock().await; + bar.finish_with_message(format!("Succeeded {}/{} downloads", self.finished_downloads, self.download_count)); + } + + async fn error(&self) { + let bar = self.bar.lock().await; + bar.finish_with_message(format!("At least one download of {} failed", self.download_count)); + } + } + + async fn perform_download(source: &SourceEntry, progress: Arc>, timeout: Option) -> Result<()> { + trace!("Creating: {:?}", source); + let file = source.create().await.with_context(|| { + anyhow!( + "Creating source file destination: {}", + source.path().display() + ) + })?; + + let mut file = tokio::io::BufWriter::new(file); + let client_builder = reqwest::Client::builder() + .redirect(reqwest::redirect::Policy::limited(10)); + + let client_builder = if let Some(to) = timeout { + client_builder.timeout(std::time::Duration::from_secs(to)) + } else { + client_builder + }; + + let client = client_builder.build().context("Building HTTP client failed")?; + + let request = client.get(source.url().as_ref()) + .build() + .with_context(|| anyhow!("Building request for {} failed", source.url().as_ref()))?; + + let response = match client.execute(request).await { + Ok(resp) => resp, + Err(e) => { + return Err(e).with_context(|| anyhow!("Downloading '{}'", source.url())) + } + }; + + progress.lock() + .await + .inc_download_bytes(response.content_length().unwrap_or(0)) + .await; + + let mut stream = response.bytes_stream(); + while let Some(bytes) = stream.next().await { + let bytes = bytes?; + file.write_all(bytes.as_ref()).await?; + progress.lock() + .await + .add_bytes(bytes.len()) + .await + } + + progress.lock().await.finish_one_download().await; + file.flush() + .await + .map_err(Error::from) + .map(|_| ()) + } + + let force = matches.is_present("force"); + let timeout = matches.value_of("timeout") + .map(u64::from_str) + .transpose() + .context("Parsing timeout argument to integer")?; + let cache = PathBuf::from(config.source_cache_root()); + let sc = SourceCache::new(cache); + let pname = matches + .value_of("package_name") + .map(String::from) + .map(PackageName::from); + let pvers = matches + .value_of("package_version") + .map(PackageVersionConstraint::try_from) + .transpose()?; + + let matching_regexp = matches.value_of("matching") + .map(crate::commands::util::mk_package_name_regex) + .transpose()?; + + let progressbar = Arc::new(Mutex::new(ProgressWrapper::new(progressbars.bar()))); + + let r = repo.packages() + .filter(|p| { + match (pname.as_ref(), pvers.as_ref(), matching_regexp.as_ref()) { + (None, None, None) => true, + (Some(pname), None, None) => p.name() == pname, + (Some(pname), Some(vers), None) => p.name() == pname && vers.matches(p.version()), + (None, None, Some(regex)) => regex.is_match(p.name()), + + (_, _, _) => { + panic!("This should not be possible, either we select packages by name and (optionally) version, or by regex.") + }, + } + }) + .map(|p| { + sc.sources_for(p).into_iter().map(|source| { + let progressbar = progressbar.clone(); + async move { + let source_path_exists = source.path().exists(); + if !source_path_exists && source.download_manually() { + return Err(anyhow!( + "Cannot download source that is marked for manual download" + )) + .context(anyhow!("Creating source: {}", source.path().display())) + .context(anyhow!("Downloading source: {}", source.url())) + .map_err(Error::from); + } + + if source_path_exists && !force { + Err(anyhow!("Source exists: {}", source.path().display())) + } else { + progressbar.lock() + .await + .inc_download_count() + .await; + + if source_path_exists /* && force is implied by 'if' above*/ { + if let Err(e) = source.remove_file().await { + progressbar.lock().await.finish_one_download().await; + return Err(e) + } + } + + perform_download(&source, progressbar.clone(), timeout).await?; + progressbar.lock().await.finish_one_download().await; + Ok(()) + } + } + }) + }) + .flatten() + .collect::>() + .collect::>>() + .await + .into_iter() + .collect::>(); + + if r.is_err() { + progressbar.lock().await.error().await; + } else { + progressbar.lock().await.success().await; + } + + debug!("r = {:?}", r); + r +} + +async fn of( + matches: &ArgMatches, + config: &Configuration, + repo: Repository, +) -> Result<()> { + let cache = PathBuf::from(config.source_cache_root()); + let sc = SourceCache::new(cache); + let pname = matches + .value_of("package_name") + .map(String::from) + .map(PackageName::from); + let pvers = matches + .value_of("package_version") + .map(PackageVersionConstraint::try_from) + .transpose()?; + + repo.packages() + .filter(|p| pname.as_ref().map(|n| p.name() == n).unwrap_or(true)) + .filter(|p| { + pvers + .as_ref() + .map(|v| v.matches(p.version())) + .unwrap_or(true) + }) + .map(|p| { + let pathes = sc.sources_for(p) + .into_iter() + .map(|source| source.path()) + .collect::>(); + + (p, pathes) + }) + .fold(Ok(std::io::stdout()), |out, (package, pathes)| { + out.and_then(|mut out| { + writeln!(out, "{} {}", package.name(), package.version())?; + for path in pathes { + writeln!(out, "\t{}", path.display())?; + } + + Ok(out) + }) + }) + .map(|_| ()) +} -- cgit v1.2.3 From 2b804658727c2c78d6ffdc2836924b18d2efe792 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Thu, 2 Dec 2021 14:04:23 +0100 Subject: Outsource "source download" subcommand impl Signed-off-by: Matthias Beyer --- src/commands/source/download.rs | 244 ++++++++++++++++++++++++++++++++++++++++ src/commands/source/mod.rs | 219 +----------------------------------- 2 files changed, 248 insertions(+), 215 deletions(-) create mode 100644 src/commands/source/download.rs diff --git a/src/commands/source/download.rs b/src/commands/source/download.rs new file mode 100644 index 0000000..f8d5a95 --- /dev/null +++ b/src/commands/source/download.rs @@ -0,0 +1,244 @@ +// +// Copyright (c) 2020-2021 science+computing ag and other contributors +// +// This program and the accompanying materials are made +// available under the terms of the Eclipse Public License 2.0 +// which is available at https://www.eclipse.org/legal/epl-2.0/ +// +// SPDX-License-Identifier: EPL-2.0 +// + +use std::convert::TryFrom; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; + +use anyhow::Context; +use anyhow::Error; +use anyhow::Result; +use anyhow::anyhow; +use clap::ArgMatches; +use log::{debug, trace}; +use tokio::io::AsyncWriteExt; +use tokio::sync::Mutex; +use tokio_stream::StreamExt; + +use crate::config::*; +use crate::package::PackageName; +use crate::package::PackageVersionConstraint; +use crate::repository::Repository; +use crate::source::*; +use crate::util::progress::ProgressBars; + + +#[derive(Clone)] +struct ProgressWrapper { + download_count: u64, + finished_downloads: u64, + current_bytes: usize, + sum_bytes: u64, + bar: Arc>, +} + +impl ProgressWrapper { + fn new(bar: indicatif::ProgressBar) -> Self { + Self { + download_count: 0, + finished_downloads: 0, + current_bytes: 0, + sum_bytes: 0, + bar: Arc::new(Mutex::new(bar)) + } + } + + async fn inc_download_count(&mut self) { + self.download_count += 1; + self.set_message().await; + } + + async fn inc_download_bytes(&mut self, bytes: u64) { + self.sum_bytes += bytes; + self.set_message().await; + } + + async fn finish_one_download(&mut self) { + self.finished_downloads += 1; + self.bar.lock().await.inc(1); + self.set_message().await; + } + + async fn add_bytes(&mut self, len: usize) { + self.current_bytes += len; + self.set_message().await; + } + + async fn set_message(&self) { + let bar = self.bar.lock().await; + bar.set_message(format!("Downloading ({current_bytes}/{sum_bytes} bytes, {dlfinished}/{dlsum} downloads finished", + current_bytes = self.current_bytes, + sum_bytes = self.sum_bytes, + dlfinished = self.finished_downloads, + dlsum = self.download_count)); + } + + async fn success(&self) { + let bar = self.bar.lock().await; + bar.finish_with_message(format!("Succeeded {}/{} downloads", self.finished_downloads, self.download_count)); + } + + async fn error(&self) { + let bar = self.bar.lock().await; + bar.finish_with_message(format!("At least one download of {} failed", self.download_count)); + } +} + +async fn perform_download(source: &SourceEntry, progress: Arc>, timeout: Option) -> Result<()> { + trace!("Creating: {:?}", source); + let file = source.create().await.with_context(|| { + anyhow!( + "Creating source file destination: {}", + source.path().display() + ) + })?; + + let mut file = tokio::io::BufWriter::new(file); + let client_builder = reqwest::Client::builder() + .redirect(reqwest::redirect::Policy::limited(10)); + + let client_builder = if let Some(to) = timeout { + client_builder.timeout(std::time::Duration::from_secs(to)) + } else { + client_builder + }; + + let client = client_builder.build().context("Building HTTP client failed")?; + + let request = client.get(source.url().as_ref()) + .build() + .with_context(|| anyhow!("Building request for {} failed", source.url().as_ref()))?; + + let response = match client.execute(request).await { + Ok(resp) => resp, + Err(e) => { + return Err(e).with_context(|| anyhow!("Downloading '{}'", source.url())) + } + }; + + progress.lock() + .await + .inc_download_bytes(response.content_length().unwrap_or(0)) + .await; + + let mut stream = response.bytes_stream(); + while let Some(bytes) = stream.next().await { + let bytes = bytes?; + file.write_all(bytes.as_ref()).await?; + progress.lock() + .await + .add_bytes(bytes.len()) + .await + } + + progress.lock().await.finish_one_download().await; + file.flush() + .await + .map_err(Error::from) + .map(|_| ()) +} + + +// Implementation of the 'source download' subcommand +pub async fn download( + matches: &ArgMatches, + config: &Configuration, + repo: Repository, + progressbars: ProgressBars, +) -> Result<()> { + let force = matches.is_present("force"); + let timeout = matches.value_of("timeout") + .map(u64::from_str) + .transpose() + .context("Parsing timeout argument to integer")?; + let cache = PathBuf::from(config.source_cache_root()); + let sc = SourceCache::new(cache); + let pname = matches + .value_of("package_name") + .map(String::from) + .map(PackageName::from); + let pvers = matches + .value_of("package_version") + .map(PackageVersionConstraint::try_from) + .transpose()?; + + let matching_regexp = matches.value_of("matching") + .map(crate::commands::util::mk_package_name_regex) + .transpose()?; + + let progressbar = Arc::new(Mutex::new(ProgressWrapper::new(progressbars.bar()))); + + let r = repo.packages() + .filter(|p| { + match (pname.as_ref(), pvers.as_ref(), matching_regexp.as_ref()) { + (None, None, None) => true, + (Some(pname), None, None) => p.name() == pname, + (Some(pname), Some(vers), None) => p.name() == pname && vers.matches(p.version()), + (None, None, Some(regex)) => regex.is_match(p.name()), + + (_, _, _) => { + panic!("This should not be possible, either we select packages by name and (optionally) version, or by regex.") + }, + } + }) + .map(|p| { + sc.sources_for(p).into_iter().map(|source| { + let progressbar = progressbar.clone(); + async move { + let source_path_exists = source.path().exists(); + if !source_path_exists && source.download_manually() { + return Err(anyhow!( + "Cannot download source that is marked for manual download" + )) + .context(anyhow!("Creating source: {}", source.path().display())) + .context(anyhow!("Downloading source: {}", source.url())) + .map_err(Error::from); + } + + if source_path_exists && !force { + Err(anyhow!("Source exists: {}", source.path().display())) + } else { + progressbar.lock() + .await + .inc_download_count() + .await; + + if source_path_exists /* && force is implied by 'if' above*/ { + if let Err(e) = source.remove_file().await { + progressbar.lock().await.finish_one_download().await; + return Err(e) + } + } + + perform_download(&source, progressbar.clone(), timeout).await?; + progressbar.lock().await.finish_one_download().await; + Ok(()) + } + } + }) + }) + .flatten() + .collect::>() + .collect::>>() + .await + .into_iter() + .collect::>(); + + if r.is_err() { + progressbar.lock().await.error().await; + } else { + progressbar.lock().await.success().await; + } + + debug!("r = {:?}", r); + r +} + diff --git a/src/commands/source/mod.rs b/src/commands/source/mod.rs index d4e6af5..6d775dc 100644 --- a/src/commands/source/mod.rs +++ b/src/commands/source/mod.rs @@ -13,8 +13,6 @@ use std::convert::TryFrom; use std::io::Write; use std::path::PathBuf; -use std::str::FromStr; -use std::sync::Arc; use anyhow::Context; use anyhow::Error; @@ -22,9 +20,7 @@ use anyhow::Result; use anyhow::anyhow; use clap::ArgMatches; use colored::Colorize; -use log::{debug, info, trace}; -use tokio::io::AsyncWriteExt; -use tokio::sync::Mutex; +use log::{info, trace}; use tokio_stream::StreamExt; use crate::config::*; @@ -35,6 +31,8 @@ use crate::repository::Repository; use crate::source::*; use crate::util::progress::ProgressBars; +mod download; + /// Implementation of the "source" subcommand pub async fn source( matches: &ArgMatches, @@ -46,7 +44,7 @@ pub async fn source( Some(("verify", matches)) => verify(matches, config, repo, progressbars).await, Some(("list-missing", matches)) => list_missing(matches, config, repo).await, Some(("url", matches)) => url(matches, repo).await, - Some(("download", matches)) => download(matches, config, repo, progressbars).await, + Some(("download", matches)) => crate::commands::source::download::download(matches, config, repo, progressbars).await, Some(("of", matches)) => of(matches, config, repo).await, Some((other, _)) => return Err(anyhow!("Unknown subcommand: {}", other)), None => Err(anyhow!("No subcommand")), @@ -220,215 +218,6 @@ pub async fn url(matches: &ArgMatches, repo: Repository) -> Result<()> { }) } -pub async fn download( - matches: &ArgMatches, - config: &Configuration, - repo: Repository, - progressbars: ProgressBars, -) -> Result<()> { - #[derive(Clone)] - struct ProgressWrapper { - download_count: u64, - finished_downloads: u64, - current_bytes: usize, - sum_bytes: u64, - bar: Arc>, - } - - impl ProgressWrapper { - fn new(bar: indicatif::ProgressBar) -> Self { - Self { - download_count: 0, - finished_downloads: 0, - current_bytes: 0, - sum_bytes: 0, - bar: Arc::new(Mutex::new(bar)) - } - } - - async fn inc_download_count(&mut self) { - self.download_count += 1; - self.set_message().await; - } - - async fn inc_download_bytes(&mut self, bytes: u64) { - self.sum_bytes += bytes; - self.set_message().await; - } - - async fn finish_one_download(&mut self) { - self.finished_downloads += 1; - self.bar.lock().await.inc(1); - self.set_message().await; - } - - async fn add_bytes(&mut self, len: usize) { - self.current_bytes += len; - self.set_message().await; - } - - async fn set_message(&self) { - let bar = self.bar.lock().await; - bar.set_message(format!("Downloading ({current_bytes}/{sum_bytes} bytes, {dlfinished}/{dlsum} downloads finished", - current_bytes = self.current_bytes, - sum_bytes = self.sum_bytes, - dlfinished = self.finished_downloads, - dlsum = self.download_count)); - } - - async fn success(&self) { - let bar = self.bar.lock().await; - bar.finish_with_message(format!("Succeeded {}/{} downloads", self.finished_downloads, self.download_count)); - } - - async fn error(&self) { - let bar = self.bar.lock().await; - bar.finish_with_message(format!("At least one download of {} failed", self.download_count)); - } - } - - async fn perform_download(source: &SourceEntry, progress: Arc>, timeout: Option) -> Result<()> { - trace!("Creating: {:?}", source); - let file = source.create().await.with_context(|| { - anyhow!( - "Creating source file destination: {}", - source.path().display() - ) - })?; - - let mut file = tokio::io::BufWriter::new(file); - let client_builder = reqwest::Client::builder() - .redirect(reqwest::redirect::Policy::limited(10)); - - let client_builder = if let Some(to) = timeout { - client_builder.timeout(std::time::Duration::from_secs(to)) - } else { - client_builder - }; - - let client = client_builder.build().context("Building HTTP client failed")?; - - let request = client.get(source.url().as_ref()) - .build() - .with_context(|| anyhow!("Building request for {} failed", source.url().as_ref()))?; - - let response = match client.execute(request).await { - Ok(resp) => resp, - Err(e) => { - return Err(e).with_context(|| anyhow!("Downloading '{}'", source.url())) - } - }; - - progress.lock() - .await - .inc_download_bytes(response.content_length().unwrap_or(0)) - .await; - - let mut stream = response.bytes_stream(); - while let Some(bytes) = stream.next().await { - let bytes = bytes?; - file.write_all(bytes.as_ref()).await?; - progress.lock() - .await - .add_bytes(bytes.len()) - .await - } - - progress.lock().await.finish_one_download().await; - file.flush() - .await - .map_err(Error::from) - .map(|_| ()) - } - - let force = matches.is_present("force"); - let timeout = matches.value_of("timeout") - .map(u64::from_str) - .transpose() - .context("Parsing timeout argument to integer")?; - let cache = PathBuf::from(config.source_cache_root()); - let sc = SourceCache::new(cache); - let pname = matches - .value_of("package_name") - .map(String::from) - .map(PackageName::from); - let pvers = matches - .value_of("package_version") - .map(PackageVersionConstraint::try_from) - .transpose()?; - - let matching_regexp = matches.value_of("matching") - .map(crate::commands::util::mk_package_name_regex) - .transpose()?; - - let progressbar = Arc::new(Mutex::new(ProgressWrapper::new(progressbars.bar()))); - - let r = repo.packages() - .filter(|p| { - match (pname.as_ref(), pvers.as_ref(), matching_regexp.as_ref()) { - (None, None, None) => true, - (Some(pname), None, None) => p.name() == pname, - (Some(pname), Some(vers), None) => p.name() == pname && vers.matches(p.version()), - (None, None, Some(regex)) => regex.is_match(p.name()), - - (_, _,