summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/commands/source.rs121
1 files changed, 99 insertions, 22 deletions
diff --git a/src/commands/source.rs b/src/commands/source.rs
index a24aac6..d4e6af5 100644
--- a/src/commands/source.rs
+++ b/src/commands/source.rs
@@ -10,10 +10,11 @@
//! Implementation of the 'source' subcommand
+use std::convert::TryFrom;
use std::io::Write;
use std::path::PathBuf;
-use std::convert::TryFrom;
use std::str::FromStr;
+use std::sync::Arc;
use anyhow::Context;
use anyhow::Error;
@@ -21,10 +22,9 @@ use anyhow::Result;
use anyhow::anyhow;
use clap::ArgMatches;
use colored::Colorize;
-use log::{info, trace};
-use result_inspect::ResultInspect;
-use result_inspect::ResultInspectErr;
+use log::{debug, info, trace};
use tokio::io::AsyncWriteExt;
+use tokio::sync::Mutex;
use tokio_stream::StreamExt;
use crate::config::*;
@@ -226,7 +226,68 @@ pub async fn download(
repo: Repository,
progressbars: ProgressBars,
) -> Result<()> {
- async fn perform_download(source: &SourceEntry, timeout: Option<u64>) -> Result<()> {
+ #[derive(Clone)]
+ struct ProgressWrapper {
+ download_count: u64,
+ finished_downloads: u64,
+ current_bytes: usize,
+ sum_bytes: u64,
+ bar: Arc<Mutex<indicatif::ProgressBar>>,
+ }
+
+ impl ProgressWrapper {
+ fn new(bar: indicatif::ProgressBar) -> Self {
+ Self {
+ download_count: 0,
+ finished_downloads: 0,
+ current_bytes: 0,
+ sum_bytes: 0,
+ bar: Arc::new(Mutex::new(bar))
+ }
+ }
+
+ async fn inc_download_count(&mut self) {
+ self.download_count += 1;
+ self.set_message().await;
+ }
+
+ async fn inc_download_bytes(&mut self, bytes: u64) {
+ self.sum_bytes += bytes;
+ self.set_message().await;
+ }
+
+ async fn finish_one_download(&mut self) {
+ self.finished_downloads += 1;
+ self.bar.lock().await.inc(1);
+ self.set_message().await;
+ }
+
+ async fn add_bytes(&mut self, len: usize) {
+ self.current_bytes += len;
+ self.set_message().await;
+ }
+
+ async fn set_message(&self) {
+ let bar = self.bar.lock().await;
+ bar.set_message(format!("Downloading ({current_bytes}/{sum_bytes} bytes, {dlfinished}/{dlsum} downloads finished",
+ current_bytes = self.current_bytes,
+ sum_bytes = self.sum_bytes,
+ dlfinished = self.finished_downloads,
+ dlsum = self.download_count));
+ }
+
+ async fn success(&self) {
+ let bar = self.bar.lock().await;
+ bar.finish_with_message(format!("Succeeded {}/{} downloads", self.finished_downloads, self.download_count));
+ }
+
+ async fn error(&self) {
+ let bar = self.bar.lock().await;
+ bar.finish_with_message(format!("At least one download of {} failed", self.download_count));
+ }
+ }
+
+ async fn perform_download(source: &SourceEntry, progress: Arc<Mutex<ProgressWrapper>>, timeout: Option<u64>) -> Result<()> {
trace!("Creating: {:?}", source);
let file = source.create().await.with_context(|| {
anyhow!(
@@ -258,11 +319,22 @@ pub async fn download(
}
};
+ progress.lock()
+ .await
+ .inc_download_bytes(response.content_length().unwrap_or(0))
+ .await;
+
let mut stream = response.bytes_stream();
while let Some(bytes) = stream.next().await {
- file.write_all(bytes?.as_ref()).await?;
+ let bytes = bytes?;
+ file.write_all(bytes.as_ref()).await?;
+ progress.lock()
+ .await
+ .add_bytes(bytes.len())
+ .await
}
+ progress.lock().await.finish_one_download().await;
file.flush()
.await
.map_err(Error::from)
@@ -289,9 +361,9 @@ pub async fn download(
.map(crate::commands::util::mk_package_name_regex)
.transpose()?;
- let progressbar = progressbars.bar();
+ let progressbar = Arc::new(Mutex::new(ProgressWrapper::new(progressbars.bar())));
- repo.packages()
+ let r = repo.packages()
.filter(|p| {
match (pname.as_ref(), pvers.as_ref(), matching_regexp.as_ref()) {
(None, None, None) => true,
@@ -321,19 +393,21 @@ pub async fn download(
if source_path_exists && !force {
Err(anyhow!("Source exists: {}", source.path().display()))
} else {
+ progressbar.lock()
+ .await
+ .inc_download_count()
+ .await;
+
if source_path_exists /* && force is implied by 'if' above*/ {
if let Err(e) = source.remove_file().await {
- progressbar.inc(1);
+ progressbar.lock().await.finish_one_download().await;
return Err(e)
}
}
-
- perform_download(&source, timeout)
- .await
- .inspect(|_| {
- progressbar.inc(1);
- })
+ perform_download(&source, progressbar.clone(), timeout).await?;
+ progressbar.lock().await.finish_one_download().await;
+ Ok(())
}
}
})
@@ -343,13 +417,16 @@ pub async fn download(
.collect::<Vec<Result<()>>>()
.await
.into_iter()
- .collect::<Result<()>>()
- .inspect_err(|_| {
- progressbar.finish_with_message("At least one package failed");
- })
- .inspect(|_| {
- progressbar.finish_with_message("Succeeded");
- })
+ .collect::<Result<()>>();
+
+ if r.is_err() {
+ progressbar.lock().await.error().await;
+ } else {
+ progressbar.lock().await.success().await;
+ }
+
+ debug!("r = {:?}", r);
+ r
}
async fn of(