summaryrefslogtreecommitdiffstats
path: root/src/commands/source/mod.rs
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-12-02 14:04:23 +0100
committerMatthias Beyer <matthias.beyer@atos.net>2021-12-02 14:23:50 +0100
commit2b804658727c2c78d6ffdc2836924b18d2efe792 (patch)
tree5ea70ab46da564ad837a7dd57206d6fd25915bfe /src/commands/source/mod.rs
parente1b39ee4873da5a8b488419764fa1ae0d8220056 (diff)
Outsource "source download" subcommand impl
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/commands/source/mod.rs')
-rw-r--r--src/commands/source/mod.rs219
1 files changed, 4 insertions, 215 deletions
diff --git a/src/commands/source/mod.rs b/src/commands/source/mod.rs
index d4e6af5..6d775dc 100644
--- a/src/commands/source/mod.rs
+++ b/src/commands/source/mod.rs
@@ -13,8 +13,6 @@
use std::convert::TryFrom;
use std::io::Write;
use std::path::PathBuf;
-use std::str::FromStr;
-use std::sync::Arc;
use anyhow::Context;
use anyhow::Error;
@@ -22,9 +20,7 @@ use anyhow::Result;
use anyhow::anyhow;
use clap::ArgMatches;
use colored::Colorize;
-use log::{debug, info, trace};
-use tokio::io::AsyncWriteExt;
-use tokio::sync::Mutex;
+use log::{info, trace};
use tokio_stream::StreamExt;
use crate::config::*;
@@ -35,6 +31,8 @@ use crate::repository::Repository;
use crate::source::*;
use crate::util::progress::ProgressBars;
+mod download;
+
/// Implementation of the "source" subcommand
pub async fn source(
matches: &ArgMatches,
@@ -46,7 +44,7 @@ pub async fn source(
Some(("verify", matches)) => verify(matches, config, repo, progressbars).await,
Some(("list-missing", matches)) => list_missing(matches, config, repo).await,
Some(("url", matches)) => url(matches, repo).await,
- Some(("download", matches)) => download(matches, config, repo, progressbars).await,
+ Some(("download", matches)) => crate::commands::source::download::download(matches, config, repo, progressbars).await,
Some(("of", matches)) => of(matches, config, repo).await,
Some((other, _)) => return Err(anyhow!("Unknown subcommand: {}", other)),
None => Err(anyhow!("No subcommand")),
@@ -220,215 +218,6 @@ pub async fn url(matches: &ArgMatches, repo: Repository) -> Result<()> {
})
}
-pub async fn download(
- matches: &ArgMatches,
- config: &Configuration,
- repo: Repository,
- progressbars: ProgressBars,
-) -> Result<()> {
- #[derive(Clone)]
- struct ProgressWrapper {
- download_count: u64,
- finished_downloads: u64,
- current_bytes: usize,
- sum_bytes: u64,
- bar: Arc<Mutex<indicatif::ProgressBar>>,
- }
-
- impl ProgressWrapper {
- fn new(bar: indicatif::ProgressBar) -> Self {
- Self {
- download_count: 0,
- finished_downloads: 0,
- current_bytes: 0,
- sum_bytes: 0,
- bar: Arc::new(Mutex::new(bar))
- }
- }
-
- async fn inc_download_count(&mut self) {
- self.download_count += 1;
- self.set_message().await;
- }
-
- async fn inc_download_bytes(&mut self, bytes: u64) {
- self.sum_bytes += bytes;
- self.set_message().await;
- }
-
- async fn finish_one_download(&mut self) {
- self.finished_downloads += 1;
- self.bar.lock().await.inc(1);
- self.set_message().await;
- }
-
- async fn add_bytes(&mut self, len: usize) {
- self.current_bytes += len;
- self.set_message().await;
- }
-
- async fn set_message(&self) {
- let bar = self.bar.lock().await;
- bar.set_message(format!("Downloading ({current_bytes}/{sum_bytes} bytes, {dlfinished}/{dlsum} downloads finished",
- current_bytes = self.current_bytes,
- sum_bytes = self.sum_bytes,
- dlfinished = self.finished_downloads,
- dlsum = self.download_count));
- }
-
- async fn success(&self) {
- let bar = self.bar.lock().await;
- bar.finish_with_message(format!("Succeeded {}/{} downloads", self.finished_downloads, self.download_count));
- }
-
- async fn error(&self) {
- let bar = self.bar.lock().await;
- bar.finish_with_message(format!("At least one download of {} failed", self.download_count));
- }
- }
-
- async fn perform_download(source: &SourceEntry, progress: Arc<Mutex<ProgressWrapper>>, timeout: Option<u64>) -> Result<()> {
- trace!("Creating: {:?}", source);
- let file = source.create().await.with_context(|| {
- anyhow!(
- "Creating source file destination: {}",
- source.path().display()
- )
- })?;
-
- let mut file = tokio::io::BufWriter::new(file);
- let client_builder = reqwest::Client::builder()
- .redirect(reqwest::redirect::Policy::limited(10));
-
- let client_builder = if let Some(to) = timeout {
- client_builder.timeout(std::time::Duration::from_secs(to))
- } else {
- client_builder
- };
-
- let client = client_builder.build().context("Building HTTP client failed")?;
-
- let request = client.get(source.url().as_ref())
- .build()
- .with_context(|| anyhow!("Building request for {} failed", source.url().as_ref()))?;
-
- let response = match client.execute(request).await {
- Ok(resp) => resp,
- Err(e) => {
- return Err(e).with_context(|| anyhow!("Downloading '{}'", source.url()))
- }
- };
-
- progress.lock()
- .await
- .inc_download_bytes(response.content_length().unwrap_or(0))
- .await;
-
- let mut stream = response.bytes_stream();
- while let Some(bytes) = stream.next().await {
- let bytes = bytes?;
- file.write_all(bytes.as_ref()).await?;
- progress.lock()
- .await
- .add_bytes(bytes.len())
- .await
- }
-
- progress.lock().await.finish_one_download().await;
- file.flush()
- .await
- .map_err(Error::from)
- .map(|_| ())
- }
-
- let force = matches.is_present("force");
- let timeout = matches.value_of("timeout")
- .map(u64::from_str)
- .transpose()
- .context("Parsing timeout argument to integer")?;
- let cache = PathBuf::from(config.source_cache_root());
- let sc = SourceCache::new(cache);
- let pname = matches
- .value_of("package_name")
- .map(String::from)
- .map(PackageName::from);
- let pvers = matches
- .value_of("package_version")
- .map(PackageVersionConstraint::try_from)
- .transpose()?;
-
- let matching_regexp = matches.value_of("matching")
- .map(crate::commands::util::mk_package_name_regex)
- .transpose()?;
-
- let progressbar = Arc::new(Mutex::new(ProgressWrapper::new(progressbars.bar())));
-
- let r = repo.packages()
- .filter(|p| {
- match (pname.as_ref(), pvers.as_ref(), matching_regexp.as_ref()) {
- (None, None, None) => true,
- (Some(pname), None, None) => p.name() == pname,
- (Some(pname), Some(vers), None) => p.name() == pname && vers.matches(p.version()),
- (None, None, Some(regex)) => regex.is_match(p.name()),
-
- (_, _, _) => {
- panic!("This should not be possible, either we select packages by name and (optionally) version, or by regex.")
- },
- }
- })
- .map(|p| {
- sc.sources_for(p).into_iter().map(|source| {
- let progressbar = progressbar.clone();
- async move {
- let source_path_exists = source.path().exists();
- if !source_path_exists && source.download_manually() {
- return Err(anyhow!(
- "Cannot download source that is marked for manual download"
- ))
- .context(anyhow!("Creating source: {}", source.path().display()))
- .context(anyhow!("Downloading source: {}", source.url()))
- .map_err(Error::from);
- }
-
- if source_path_exists && !force {
- Err(anyhow!("Source exists: {}", source.path().display()))
- } else {
- progressbar.lock()
- .await
- .inc_download_count()
- .await;
-
- if source_path_exists /* && force is implied by 'if' above*/ {
- if let Err(e) = source.remove_file().await {
- progressbar.lock().await.finish_one_download().await;
- return Err(e)
- }
- }
-
- perform_download(&source, progressbar.clone(), timeout).await?;
- progressbar.lock().await.finish_one_download().await;
- Ok(())
- }
- }
- })
- })
- .flatten()
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Vec<Result<()>>>()
- .await
- .into_iter()
- .collect::<Result<()>>();
-
- if r.is_err() {
- progressbar.lock().await.error().await;
- } else {
- progressbar.lock().await.success().await;
- }
-
- debug!("r = {:?}", r);
- r
-}
-
async fn of(
matches: &ArgMatches,
config: &Configuration,