From 9ad578dc96cdf7b74d5f58f7798d0327523b07f7 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Thu, 2 Dec 2021 13:44:08 +0100 Subject: Add progress wrapper When creating only one progress bar for all downloads, we still want to know how many downloads are happening and the remaining number of bytes to be received. This patch implements a ProgressWrapper that synchronizes between the download tasks and the progress bar. Signed-off-by: Matthias Beyer --- src/commands/source.rs | 121 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 99 insertions(+), 22 deletions(-) (limited to 'src') diff --git a/src/commands/source.rs b/src/commands/source.rs index a24aac6..d4e6af5 100644 --- a/src/commands/source.rs +++ b/src/commands/source.rs @@ -10,10 +10,11 @@ //! Implementation of the 'source' subcommand +use std::convert::TryFrom; use std::io::Write; use std::path::PathBuf; -use std::convert::TryFrom; use std::str::FromStr; +use std::sync::Arc; use anyhow::Context; use anyhow::Error; @@ -21,10 +22,9 @@ use anyhow::Result; use anyhow::anyhow; use clap::ArgMatches; use colored::Colorize; -use log::{info, trace}; -use result_inspect::ResultInspect; -use result_inspect::ResultInspectErr; +use log::{debug, info, trace}; use tokio::io::AsyncWriteExt; +use tokio::sync::Mutex; use tokio_stream::StreamExt; use crate::config::*; @@ -226,7 +226,68 @@ pub async fn download( repo: Repository, progressbars: ProgressBars, ) -> Result<()> { - async fn perform_download(source: &SourceEntry, timeout: Option) -> Result<()> { + #[derive(Clone)] + struct ProgressWrapper { + download_count: u64, + finished_downloads: u64, + current_bytes: usize, + sum_bytes: u64, + bar: Arc>, + } + + impl ProgressWrapper { + fn new(bar: indicatif::ProgressBar) -> Self { + Self { + download_count: 0, + finished_downloads: 0, + current_bytes: 0, + sum_bytes: 0, + bar: Arc::new(Mutex::new(bar)) + } + } + + async fn inc_download_count(&mut self) { + self.download_count += 1; + self.set_message().await; + } + + async fn inc_download_bytes(&mut self, bytes: u64) { + self.sum_bytes += bytes; + self.set_message().await; + } + + async fn finish_one_download(&mut self) { + self.finished_downloads += 1; + self.bar.lock().await.inc(1); + self.set_message().await; + } + + async fn add_bytes(&mut self, len: usize) { + self.current_bytes += len; + self.set_message().await; + } + + async fn set_message(&self) { + let bar = self.bar.lock().await; + bar.set_message(format!("Downloading ({current_bytes}/{sum_bytes} bytes, {dlfinished}/{dlsum} downloads finished", + current_bytes = self.current_bytes, + sum_bytes = self.sum_bytes, + dlfinished = self.finished_downloads, + dlsum = self.download_count)); + } + + async fn success(&self) { + let bar = self.bar.lock().await; + bar.finish_with_message(format!("Succeeded {}/{} downloads", self.finished_downloads, self.download_count)); + } + + async fn error(&self) { + let bar = self.bar.lock().await; + bar.finish_with_message(format!("At least one download of {} failed", self.download_count)); + } + } + + async fn perform_download(source: &SourceEntry, progress: Arc>, timeout: Option) -> Result<()> { trace!("Creating: {:?}", source); let file = source.create().await.with_context(|| { anyhow!( @@ -258,11 +319,22 @@ pub async fn download( } }; + progress.lock() + .await + .inc_download_bytes(response.content_length().unwrap_or(0)) + .await; + let mut stream = response.bytes_stream(); while let Some(bytes) = stream.next().await { - file.write_all(bytes?.as_ref()).await?; + let bytes = bytes?; + file.write_all(bytes.as_ref()).await?; + progress.lock() + .await + .add_bytes(bytes.len()) + .await } + progress.lock().await.finish_one_download().await; file.flush() .await .map_err(Error::from) @@ -289,9 +361,9 @@ pub async fn download( .map(crate::commands::util::mk_package_name_regex) .transpose()?; - let progressbar = progressbars.bar(); + let progressbar = Arc::new(Mutex::new(ProgressWrapper::new(progressbars.bar()))); - repo.packages() + let r = repo.packages() .filter(|p| { match (pname.as_ref(), pvers.as_ref(), matching_regexp.as_ref()) { (None, None, None) => true, @@ -321,19 +393,21 @@ pub async fn download( if source_path_exists && !force { Err(anyhow!("Source exists: {}", source.path().display())) } else { + progressbar.lock() + .await + .inc_download_count() + .await; + if source_path_exists /* && force is implied by 'if' above*/ { if let Err(e) = source.remove_file().await { - progressbar.inc(1); + progressbar.lock().await.finish_one_download().await; return Err(e) } } - - perform_download(&source, timeout) - .await - .inspect(|_| { - progressbar.inc(1); - }) + perform_download(&source, progressbar.clone(), timeout).await?; + progressbar.lock().await.finish_one_download().await; + Ok(()) } } }) @@ -343,13 +417,16 @@ pub async fn download( .collect::>>() .await .into_iter() - .collect::>() - .inspect_err(|_| { - progressbar.finish_with_message("At least one package failed"); - }) - .inspect(|_| { - progressbar.finish_with_message("Succeeded"); - }) + .collect::>(); + + if r.is_err() { + progressbar.lock().await.error().await; + } else { + progressbar.lock().await.success().await; + } + + debug!("r = {:?}", r); + r } async fn of( -- cgit v1.2.3