diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/commands/source.rs | 121 |
1 files changed, 99 insertions, 22 deletions
diff --git a/src/commands/source.rs b/src/commands/source.rs index a24aac6..d4e6af5 100644 --- a/src/commands/source.rs +++ b/src/commands/source.rs @@ -10,10 +10,11 @@ //! Implementation of the 'source' subcommand +use std::convert::TryFrom; use std::io::Write; use std::path::PathBuf; -use std::convert::TryFrom; use std::str::FromStr; +use std::sync::Arc; use anyhow::Context; use anyhow::Error; @@ -21,10 +22,9 @@ use anyhow::Result; use anyhow::anyhow; use clap::ArgMatches; use colored::Colorize; -use log::{info, trace}; -use result_inspect::ResultInspect; -use result_inspect::ResultInspectErr; +use log::{debug, info, trace}; use tokio::io::AsyncWriteExt; +use tokio::sync::Mutex; use tokio_stream::StreamExt; use crate::config::*; @@ -226,7 +226,68 @@ pub async fn download( repo: Repository, progressbars: ProgressBars, ) -> Result<()> { - async fn perform_download(source: &SourceEntry, timeout: Option<u64>) -> Result<()> { + #[derive(Clone)] + struct ProgressWrapper { + download_count: u64, + finished_downloads: u64, + current_bytes: usize, + sum_bytes: u64, + bar: Arc<Mutex<indicatif::ProgressBar>>, + } + + impl ProgressWrapper { + fn new(bar: indicatif::ProgressBar) -> Self { + Self { + download_count: 0, + finished_downloads: 0, + current_bytes: 0, + sum_bytes: 0, + bar: Arc::new(Mutex::new(bar)) + } + } + + async fn inc_download_count(&mut self) { + self.download_count += 1; + self.set_message().await; + } + + async fn inc_download_bytes(&mut self, bytes: u64) { + self.sum_bytes += bytes; + self.set_message().await; + } + + async fn finish_one_download(&mut self) { + self.finished_downloads += 1; + self.bar.lock().await.inc(1); + self.set_message().await; + } + + async fn add_bytes(&mut self, len: usize) { + self.current_bytes += len; + self.set_message().await; + } + + async fn set_message(&self) { + let bar = self.bar.lock().await; + bar.set_message(format!("Downloading ({current_bytes}/{sum_bytes} bytes, {dlfinished}/{dlsum} downloads finished", + current_bytes = self.current_bytes, + sum_bytes = self.sum_bytes, + dlfinished = self.finished_downloads, + dlsum = self.download_count)); + } + + async fn success(&self) { + let bar = self.bar.lock().await; + bar.finish_with_message(format!("Succeeded {}/{} downloads", self.finished_downloads, self.download_count)); + } + + async fn error(&self) { + let bar = self.bar.lock().await; + bar.finish_with_message(format!("At least one download of {} failed", self.download_count)); + } + } + + async fn perform_download(source: &SourceEntry, progress: Arc<Mutex<ProgressWrapper>>, timeout: Option<u64>) -> Result<()> { trace!("Creating: {:?}", source); let file = source.create().await.with_context(|| { anyhow!( @@ -258,11 +319,22 @@ pub async fn download( } }; + progress.lock() + .await + .inc_download_bytes(response.content_length().unwrap_or(0)) + .await; + let mut stream = response.bytes_stream(); while let Some(bytes) = stream.next().await { - file.write_all(bytes?.as_ref()).await?; + let bytes = bytes?; + file.write_all(bytes.as_ref()).await?; + progress.lock() + .await + .add_bytes(bytes.len()) + .await } + progress.lock().await.finish_one_download().await; file.flush() .await .map_err(Error::from) @@ -289,9 +361,9 @@ pub async fn download( .map(crate::commands::util::mk_package_name_regex) .transpose()?; - let progressbar = progressbars.bar(); + let progressbar = Arc::new(Mutex::new(ProgressWrapper::new(progressbars.bar()))); - repo.packages() + let r = repo.packages() .filter(|p| { match (pname.as_ref(), pvers.as_ref(), matching_regexp.as_ref()) { (None, None, None) => true, @@ -321,19 +393,21 @@ pub async fn download( if source_path_exists && !force { Err(anyhow!("Source exists: {}", source.path().display())) } else { + progressbar.lock() + .await + .inc_download_count() + .await; + if source_path_exists /* && force is implied by 'if' above*/ { if let Err(e) = source.remove_file().await { - progressbar.inc(1); + progressbar.lock().await.finish_one_download().await; return Err(e) } } - - perform_download(&source, timeout) - .await - .inspect(|_| { - progressbar.inc(1); - }) + perform_download(&source, progressbar.clone(), timeout).await?; + progressbar.lock().await.finish_one_download().await; + Ok(()) } } }) @@ -343,13 +417,16 @@ pub async fn download( .collect::<Vec<Result<()>>>() .await .into_iter() - .collect::<Result<()>>() - .inspect_err(|_| { - progressbar.finish_with_message("At least one package failed"); - }) - .inspect(|_| { - progressbar.finish_with_message("Succeeded"); - }) + .collect::<Result<()>>(); + + if r.is_err() { + progressbar.lock().await.error().await; + } else { + progressbar.lock().await.success().await; + } + + debug!("r = {:?}", r); + r } async fn of( |