summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-12-02 13:44:08 +0100
committerMatthias Beyer <matthias.beyer@atos.net>2021-12-02 14:23:49 +0100
commit9ad578dc96cdf7b74d5f58f7798d0327523b07f7 (patch)
tree88d10b36d919ea567fa7c8461caf948e1f1a2a07 /src
parent5d98b4b6210c0ef5635cc5a97be61ae7d93ac139 (diff)
Add progress wrapper
When creating only one progress bar for all downloads, we still want to know how many downloads are happening and the remaining number of bytes to be received. This patch implements a ProgressWrapper that synchronizes between the download tasks and the progress bar. Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src')
-rw-r--r--src/commands/source.rs121
1 files changed, 99 insertions, 22 deletions
diff --git a/src/commands/source.rs b/src/commands/source.rs
index a24aac6..d4e6af5 100644
--- a/src/commands/source.rs
+++ b/src/commands/source.rs
@@ -10,10 +10,11 @@
//! Implementation of the 'source' subcommand
+use std::convert::TryFrom;
use std::io::Write;
use std::path::PathBuf;
-use std::convert::TryFrom;
use std::str::FromStr;
+use std::sync::Arc;
use anyhow::Context;
use anyhow::Error;
@@ -21,10 +22,9 @@ use anyhow::Result;
use anyhow::anyhow;
use clap::ArgMatches;
use colored::Colorize;
-use log::{info, trace};
-use result_inspect::ResultInspect;
-use result_inspect::ResultInspectErr;
+use log::{debug, info, trace};
use tokio::io::AsyncWriteExt;
+use tokio::sync::Mutex;
use tokio_stream::StreamExt;
use crate::config::*;
@@ -226,7 +226,68 @@ pub async fn download(
repo: Repository,
progressbars: ProgressBars,
) -> Result<()> {
- async fn perform_download(source: &SourceEntry, timeout: Option<u64>) -> Result<()> {
+ #[derive(Clone)]
+ struct ProgressWrapper {
+ download_count: u64,
+ finished_downloads: u64,
+ current_bytes: usize,
+ sum_bytes: u64,
+ bar: Arc<Mutex<indicatif::ProgressBar>>,
+ }
+
+ impl ProgressWrapper {
+ fn new(bar: indicatif::ProgressBar) -> Self {
+ Self {
+ download_count: 0,
+ finished_downloads: 0,
+ current_bytes: 0,
+ sum_bytes: 0,
+ bar: Arc::new(Mutex::new(bar))
+ }
+ }
+
+ async fn inc_download_count(&mut self) {
+ self.download_count += 1;
+ self.set_message().await;
+ }
+
+ async fn inc_download_bytes(&mut self, bytes: u64) {
+ self.sum_bytes += bytes;
+ self.set_message().await;
+ }
+
+ async fn finish_one_download(&mut self) {
+ self.finished_downloads += 1;
+ self.bar.lock().await.inc(1);
+ self.set_message().await;
+ }
+
+ async fn add_bytes(&mut self, len: usize) {
+ self.current_bytes += len;
+ self.set_message().await;
+ }
+
+ async fn set_message(&self) {
+ let bar = self.bar.lock().await;
+ bar.set_message(format!("Downloading ({current_bytes}/{sum_bytes} bytes, {dlfinished}/{dlsum} downloads finished",
+ current_bytes = self.current_bytes,
+ sum_bytes = self.sum_bytes,
+ dlfinished = self.finished_downloads,
+ dlsum = self.download_count));
+ }
+
+ async fn success(&self) {
+ let bar = self.bar.lock().await;
+ bar.finish_with_message(format!("Succeeded {}/{} downloads", self.finished_downloads, self.download_count));
+ }
+
+ async fn error(&self) {
+ let bar = self.bar.lock().await;
+ bar.finish_with_message(format!("At least one download of {} failed", self.download_count));
+ }
+ }
+
+ async fn perform_download(source: &SourceEntry, progress: Arc<Mutex<ProgressWrapper>>, timeout: Option<u64>) -> Result<()> {
trace!("Creating: {:?}", source);
let file = source.create().await.with_context(|| {
anyhow!(
@@ -258,11 +319,22 @@ pub async fn download(
}
};
+ progress.lock()
+ .await
+ .inc_download_bytes(response.content_length().unwrap_or(0))
+ .await;
+
let mut stream = response.bytes_stream();
while let Some(bytes) = stream.next().await {
- file.write_all(bytes?.as_ref()).await?;
+ let bytes = bytes?;
+ file.write_all(bytes.as_ref()).await?;
+ progress.lock()
+ .await
+ .add_bytes(bytes.len())
+ .await
}
+ progress.lock().await.finish_one_download().await;
file.flush()
.await
.map_err(Error::from)
@@ -289,9 +361,9 @@ pub async fn download(
.map(crate::commands::util::mk_package_name_regex)
.transpose()?;
- let progressbar = progressbars.bar();
+ let progressbar = Arc::new(Mutex::new(ProgressWrapper::new(progressbars.bar())));
- repo.packages()
+ let r = repo.packages()
.filter(|p| {
match (pname.as_ref(), pvers.as_ref(), matching_regexp.as_ref()) {
(None, None, None) => true,
@@ -321,19 +393,21 @@ pub async fn download(
if source_path_exists && !force {
Err(anyhow!("Source exists: {}", source.path().display()))
} else {
+ progressbar.lock()
+ .await
+ .inc_download_count()
+ .await;
+
if source_path_exists /* && force is implied by 'if' above*/ {
if let Err(e) = source.remove_file().await {
- progressbar.inc(1);
+ progressbar.lock().await.finish_one_download().await;
return Err(e)
}
}
-
- perform_download(&source, timeout)
- .await
- .inspect(|_| {
- progressbar.inc(1);
- })
+ perform_download(&source, progressbar.clone(), timeout).await?;
+ progressbar.lock().await.finish_one_download().await;
+ Ok(())
}
}
})
@@ -343,13 +417,16 @@ pub async fn download(
.collect::<Vec<Result<()>>>()
.await
.into_iter()
- .collect::<Result<()>>()
- .inspect_err(|_| {
- progressbar.finish_with_message("At least one package failed");
- })
- .inspect(|_| {
- progressbar.finish_with_message("Succeeded");
- })
+ .collect::<Result<()>>();
+
+ if r.is_err() {
+ progressbar.lock().await.error().await;
+ } else {
+ progressbar.lock().await.success().await;
+ }
+
+ debug!("r = {:?}", r);
+ r
}
async fn of(