diff options
author | Matthias Beyer <matthias.beyer@atos.net> | 2021-08-13 17:01:43 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@atos.net> | 2021-08-17 12:53:23 +0200 |
commit | 1de4fcb9bc1855e3149500e445a51c86905183d2 (patch) | |
tree | 2a1273fb7f72e4500a64bd98cf528fe60f42283e | |
parent | ff2e2f28a44ee2016850b030383d35b7f4871389 (diff) |
Make Repository-loading stop sync over 1000 open files with a semaphore
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/repository/repository.rs | 96 |
2 files changed, 66 insertions, 31 deletions
@@ -24,6 +24,7 @@ maintenance = { status = "actively-developed" } anyhow = "1" aquamarine = "0.1" ascii_table = ">= 3.0.2" +async-stream = "0.3" atty = "0.2" bytesize = "1" chrono = "0.4" diff --git a/src/repository/repository.rs b/src/repository/repository.rs index b2827ba..a955ddb 100644 --- a/src/repository/repository.rs +++ b/src/repository/repository.rs @@ -43,24 +43,47 @@ impl Repository { use futures::StreamExt; use futures::FutureExt; - async fn all_subdirs(p: &Path) -> Result<impl futures::stream::Stream<Item = Result<PathBuf>>> { - let read_dir = tokio::fs::read_dir(p).await?; - - let stream = tokio_stream::wrappers::ReadDirStream::new(read_dir) - .then(|entry| async move { - let de = entry?; - let is_dir = de.file_type().await?.is_dir(); - let is_hidden = de - .path() - .file_name() - .and_then(|s| s.to_str()) - .map(|s| s.starts_with('.')) - .unwrap_or(false); - - if is_dir && !is_hidden { - Ok(Some(de.path())) + async fn all_subdirs(p: &Path, open_files_sema: std::sync::Arc<tokio::sync::Semaphore>) -> Result<impl futures::stream::Stream<Item = Result<PathBuf>>> { + let mut read_dir = tokio::fs::read_dir(p).await?; + let sema = open_files_sema.clone(); + let stream = async_stream::stream! { + let sema = sema.clone(); + loop { + log::trace!("Acquiring Semapore: {} - 1", sema.available_permits()); + let sema_permit = sema.acquire().await?; + if let Some(item) = read_dir.next_entry().await? { + yield Ok(item) as Result<tokio::fs::DirEntry>; } else { - Ok(None) + break; + } + log::trace!("Dropping Semaphore: {} + 1", sema.available_permits()); + drop(sema_permit); + } + }; + + let stream = stream + .then(move |entry| { + let sem = open_files_sema.clone(); + + async move { + log::trace!("[{:?}] Acquiring Semaphore: {} - 1", entry, sem.available_permits()); + let open_files_permit = sem.acquire().await?; + let de = entry?; + let is_dir = de.file_type().await?.is_dir(); + let is_hidden = de + .path() + .file_name() + .and_then(|s| s.to_str()) + .map(|s| s.starts_with('.')) + .unwrap_or(false); + log::trace!("[{:?}] Dropping Semaphore: {} + 1", de, sem.available_permits()); + drop(open_files_permit); + + if is_dir && !is_hidden { + Ok(Some(de.path())) + } else { + Ok(None) + } } }) .filter_map(|t| async { @@ -79,14 +102,23 @@ impl Repository { path: &'a Path, mut config: config::Config, progress: &'a indicatif::ProgressBar, + open_files_sema: std::sync::Arc<tokio::sync::Semaphore>, ) -> futures::future::BoxFuture<'a, Result<Vec<Result<Package>>>> { async move { let pkg_file = path.join("pkg.toml"); if pkg_file.is_file() { - let buf = tokio::fs::read_to_string(&pkg_file) - .await - .with_context(|| format!("Reading {}", pkg_file.display()))?; + log::trace!("Reading: {}", pkg_file.display()); + let buf = { + log::trace!("[{}] Acquiring Semaphore: {} - 1", pkg_file.display(), open_files_sema.available_permits()); + let open_files_permit = open_files_sema.acquire().await?; + let r = tokio::fs::read_to_string(&pkg_file) + .await + .with_context(|| format!("Reading {}", pkg_file.display()))?; + log::trace!("[{}] Dropping Semaphore: {} + 1", pkg_file.display(), open_files_sema.available_permits()); + drop(open_files_permit); + r + }; // This function has an issue: It loads packages recursively, but if there are // patches set for a package, these patches are set _relative_ to the current @@ -137,7 +169,7 @@ impl Repository { Err(config::ConfigError::NotFound(_)) => vec![], Err(e) => return Err(e).map_err(Error::from), }; - trace!("Patches before merging: {:?}", patches_before_merge); + trace!("[{}] Patches before merging: {:?}", path.display(), patches_before_merge); // Merge the new pkg.toml file over the already loaded configuration config @@ -149,7 +181,7 @@ impl Repository { // get the patches that are in the `config` object after the merge let patches = match config.get_array("patches") { Ok(v) => { - trace!("Patches after merging: {:?}", v); + trace!("[{}] Patches after merging: {:?}", path.display(), v); v }, @@ -164,13 +196,13 @@ impl Repository { // the root directory of the repository. .map(|patch| patch.into_str().map_err(Error::from)) .map_ok(|patch| path_relative_to_root.join(patch)) - .inspect(|patch| trace!("Patch relative to root: {:?}", patch.as_ref().map(|p| p.display()))) + .inspect(|patch| trace!("[{}] Patch relative to root: {:?}", path.display(), patch.as_ref().map(|p| p.display()))) // if the patch file exists, use it (as config::Value). // // Otherwise we have an error here, because we're refering to a non-existing file. .and_then_ok(|patch| if patch.exists() { - trace!("Path to patch exists: {}", patch.display()); + trace!("[{}] Path to patch exists: {}", path.display(), patch.display()); Ok(Some(patch)) } else if patches_before_merge.iter().any(|pb| pb.file_name() == patch.file_name()) { // We have a patch already in the array that is named equal to the patch @@ -179,7 +211,7 @@ impl Repository { // because we loaded a deeper pkg.toml file. Ok(None) } else { - trace!("Path to patch does not exist: {}", patch.display()); + trace!("[{}] Path to patch does not exist: {}", path.display(), patch.display()); Err(anyhow!("Patch does not exist: {}", patch.display())) }) .filter_map_ok(|o| o) @@ -193,7 +225,7 @@ impl Repository { patches_before_merge }; - trace!("Patches after postprocessing merge: {:?}", patches); + trace!("[{}] Patches after postprocessing merge: {:?}", path.display(), patches); let patches = patches .into_iter() .map(|p| p.display().to_string()) @@ -202,7 +234,7 @@ impl Repository { config.set_once("patches", config::Value::from(patches))?; } - let subdirs = all_subdirs(path) + let subdirs = all_subdirs(path, open_files_sema.clone()) .await? .map(|subdir| { let config = config.clone(); @@ -211,10 +243,10 @@ impl Repository { let dir = subdir .with_context(|| format!("Finding subdirs for {}", pkg_file.display()))?; - trace!("Recursing into {}", dir.display()); - load_recursive(root, &dir, config, progress) + trace!("[{}] Recursing into {}", path.display(), dir.display()); + load_recursive(root, &dir, config, progress, open_files_sema.clone()) .await - .with_context(|| format!("Reading package from {}", pkg_file.display())) + .with_context(|| format!("Reading package from {}", dir.display())) .map_err(Error::from) } }) @@ -253,7 +285,9 @@ impl Repository { }.boxed() } - let inner = load_recursive(path, path, config::Config::default(), progress) + let open_files_sema = std::sync::Arc::new(tokio::sync::Semaphore::new(100)); + + let inner = load_recursive(path, path, config::Config::default(), progress, open_files_sema) .await .with_context(|| format!("Recursing for {}", path.display()))? .into_iter() |