From 6edb7878fb4a490fbb1fd37a09af1fcf907eeecf Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Tue, 17 Aug 2021 13:39:52 +0200 Subject: Revert "Limit opened files by limiting recursion" This reverts commit 9a3a579c09c532220a986abf5880d9b2d6f445ef. --- src/repository/repository.rs | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/src/repository/repository.rs b/src/repository/repository.rs index 6429770..a955ddb 100644 --- a/src/repository/repository.rs +++ b/src/repository/repository.rs @@ -43,21 +43,31 @@ impl Repository { use futures::StreamExt; use futures::FutureExt; - async fn all_subdirs(p: &Path) -> Result>> { + async fn all_subdirs(p: &Path, open_files_sema: std::sync::Arc) -> Result>> { let mut read_dir = tokio::fs::read_dir(p).await?; + let sema = open_files_sema.clone(); let stream = async_stream::stream! { + let sema = sema.clone(); loop { + log::trace!("Acquiring Semapore: {} - 1", sema.available_permits()); + let sema_permit = sema.acquire().await?; if let Some(item) = read_dir.next_entry().await? { yield Ok(item) as Result; } else { break; } + log::trace!("Dropping Semaphore: {} + 1", sema.available_permits()); + drop(sema_permit); } }; let stream = stream .then(move |entry| { + let sem = open_files_sema.clone(); + async move { + log::trace!("[{:?}] Acquiring Semaphore: {} - 1", entry, sem.available_permits()); + let open_files_permit = sem.acquire().await?; let de = entry?; let is_dir = de.file_type().await?.is_dir(); let is_hidden = de @@ -66,6 +76,8 @@ impl Repository { .and_then(|s| s.to_str()) .map(|s| s.starts_with('.')) .unwrap_or(false); + log::trace!("[{:?}] Dropping Semaphore: {} + 1", de, sem.available_permits()); + drop(open_files_permit); if is_dir && !is_hidden { Ok(Some(de.path())) @@ -90,6 +102,7 @@ impl Repository { path: &'a Path, mut config: config::Config, progress: &'a indicatif::ProgressBar, + open_files_sema: std::sync::Arc, ) -> futures::future::BoxFuture<'a, Result>>> { async move { let pkg_file = path.join("pkg.toml"); @@ -97,9 +110,13 @@ impl Repository { if pkg_file.is_file() { log::trace!("Reading: {}", pkg_file.display()); let buf = { + log::trace!("[{}] Acquiring Semaphore: {} - 1", pkg_file.display(), open_files_sema.available_permits()); + let open_files_permit = open_files_sema.acquire().await?; let r = tokio::fs::read_to_string(&pkg_file) .await .with_context(|| format!("Reading {}", pkg_file.display()))?; + log::trace!("[{}] Dropping Semaphore: {} + 1", pkg_file.display(), open_files_sema.available_permits()); + drop(open_files_permit); r }; @@ -217,9 +234,7 @@ impl Repository { config.set_once("patches", config::Value::from(patches))?; } - let recursion_limit_sema = tokio::sync::Semaphore::new(5); - - let subdirs = all_subdirs(path) + let subdirs = all_subdirs(path, open_files_sema.clone()) .await? .map(|subdir| { let config = config.clone(); @@ -229,13 +244,10 @@ impl Repository { .with_context(|| format!("Finding subdirs for {}", pkg_file.display()))?; trace!("[{}] Recursing into {}", path.display(), dir.display()); - let permit = recursion_limit_sema.acquire().await?; - let res = load_recursive(root, &dir, config, progress) + load_recursive(root, &dir, config, progress, open_files_sema.clone()) .await .with_context(|| format!("Reading package from {}", dir.display())) - .map_err(Error::from); - drop(permit); - res + .map_err(Error::from) } }) .collect::>() @@ -273,7 +285,9 @@ impl Repository { }.boxed() } - let inner = load_recursive(path, path, config::Config::default(), progress) + let open_files_sema = std::sync::Arc::new(tokio::sync::Semaphore::new(100)); + + let inner = load_recursive(path, path, config::Config::default(), progress, open_files_sema) .await .with_context(|| format!("Recursing for {}", path.display()))? .into_iter() -- cgit v1.2.3