diff options
author | Matthias Beyer <matthias.beyer@atos.net> | 2021-08-17 12:50:06 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@atos.net> | 2021-08-17 12:53:24 +0200 |
commit | 9a3a579c09c532220a986abf5880d9b2d6f445ef (patch) | |
tree | 291cbadc1b828bbc246ffccaaa7ee210b0f3c06f | |
parent | 1de4fcb9bc1855e3149500e445a51c86905183d2 (diff) |
Limit opened files by limiting recursion
This patch limits the recursion to only parallelize recursion by factor 5 for
now.
This seems to work with an ulimit of 1000 files for a repository with ~1800
pkg.toml files in ~3500 directories.
Unfortunately, this does not _really_ speed up things (20 sec vs 16 sec, not
really scientifically measured).
So this is not a nice solution and we really should re-think our loading
mechanics.
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Tested-by: Matthias Beyer <matthias.beyer@atos.net>
-rw-r--r-- | src/repository/repository.rs | 34 |
1 files changed, 10 insertions, 24 deletions
diff --git a/src/repository/repository.rs b/src/repository/repository.rs index a955ddb..6429770 100644 --- a/src/repository/repository.rs +++ b/src/repository/repository.rs @@ -43,31 +43,21 @@ impl Repository { use futures::StreamExt; use futures::FutureExt; - async fn all_subdirs(p: &Path, open_files_sema: std::sync::Arc<tokio::sync::Semaphore>) -> Result<impl futures::stream::Stream<Item = Result<PathBuf>>> { + async fn all_subdirs(p: &Path) -> Result<impl futures::stream::Stream<Item = Result<PathBuf>>> { let mut read_dir = tokio::fs::read_dir(p).await?; - let sema = open_files_sema.clone(); let stream = async_stream::stream! { - let sema = sema.clone(); loop { - log::trace!("Acquiring Semapore: {} - 1", sema.available_permits()); - let sema_permit = sema.acquire().await?; if let Some(item) = read_dir.next_entry().await? { yield Ok(item) as Result<tokio::fs::DirEntry>; } else { break; } - log::trace!("Dropping Semaphore: {} + 1", sema.available_permits()); - drop(sema_permit); } }; let stream = stream .then(move |entry| { - let sem = open_files_sema.clone(); - async move { - log::trace!("[{:?}] Acquiring Semaphore: {} - 1", entry, sem.available_permits()); - let open_files_permit = sem.acquire().await?; let de = entry?; let is_dir = de.file_type().await?.is_dir(); let is_hidden = de @@ -76,8 +66,6 @@ impl Repository { .and_then(|s| s.to_str()) .map(|s| s.starts_with('.')) .unwrap_or(false); - log::trace!("[{:?}] Dropping Semaphore: {} + 1", de, sem.available_permits()); - drop(open_files_permit); if is_dir && !is_hidden { Ok(Some(de.path())) @@ -102,7 +90,6 @@ impl Repository { path: &'a Path, mut config: config::Config, progress: &'a indicatif::ProgressBar, - open_files_sema: std::sync::Arc<tokio::sync::Semaphore>, ) -> futures::future::BoxFuture<'a, Result<Vec<Result<Package>>>> { async move { let pkg_file = path.join("pkg.toml"); @@ -110,13 +97,9 @@ impl Repository { if pkg_file.is_file() { log::trace!("Reading: {}", pkg_file.display()); let buf = { - log::trace!("[{}] Acquiring Semaphore: {} - 1", pkg_file.display(), open_files_sema.available_permits()); - let open_files_permit = open_files_sema.acquire().await?; let r = tokio::fs::read_to_string(&pkg_file) .await .with_context(|| format!("Reading {}", pkg_file.display()))?; - log::trace!("[{}] Dropping Semaphore: {} + 1", pkg_file.display(), open_files_sema.available_permits()); - drop(open_files_permit); r }; @@ -234,7 +217,9 @@ impl Repository { config.set_once("patches", config::Value::from(patches))?; } - let subdirs = all_subdirs(path, open_files_sema.clone()) + let recursion_limit_sema = tokio::sync::Semaphore::new(5); + + let subdirs = all_subdirs(path) .await? .map(|subdir| { let config = config.clone(); @@ -244,10 +229,13 @@ impl Repository { .with_context(|| format!("Finding subdirs for {}", pkg_file.display()))?; trace!("[{}] Recursing into {}", path.display(), dir.display()); - load_recursive(root, &dir, config, progress, open_files_sema.clone()) + let permit = recursion_limit_sema.acquire().await?; + let res = load_recursive(root, &dir, config, progress) .await .with_context(|| format!("Reading package from {}", dir.display())) - .map_err(Error::from) + .map_err(Error::from); + drop(permit); + res } }) .collect::<futures::stream::FuturesUnordered<_>>() @@ -285,9 +273,7 @@ impl Repository { }.boxed() } - let open_files_sema = std::sync::Arc::new(tokio::sync::Semaphore::new(100)); - - let inner = load_recursive(path, path, config::Config::default(), progress, open_files_sema) + let inner = load_recursive(path, path, config::Config::default(), progress) .await .with_context(|| format!("Recursing for {}", path.display()))? .into_iter() |