summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-08-13 17:01:43 +0200
committerMatthias Beyer <matthias.beyer@atos.net>2021-08-17 12:53:23 +0200
commit1de4fcb9bc1855e3149500e445a51c86905183d2 (patch)
tree2a1273fb7f72e4500a64bd98cf528fe60f42283e
parentff2e2f28a44ee2016850b030383d35b7f4871389 (diff)
Make Repository-loading stop sync over 1000 open files with a semaphore
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
-rw-r--r--Cargo.toml1
-rw-r--r--src/repository/repository.rs96
2 files changed, 66 insertions, 31 deletions
diff --git a/Cargo.toml b/Cargo.toml
index eeecfe2..dee49f6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -24,6 +24,7 @@ maintenance = { status = "actively-developed" }
anyhow = "1"
aquamarine = "0.1"
ascii_table = ">= 3.0.2"
+async-stream = "0.3"
atty = "0.2"
bytesize = "1"
chrono = "0.4"
diff --git a/src/repository/repository.rs b/src/repository/repository.rs
index b2827ba..a955ddb 100644
--- a/src/repository/repository.rs
+++ b/src/repository/repository.rs
@@ -43,24 +43,47 @@ impl Repository {
use futures::StreamExt;
use futures::FutureExt;
- async fn all_subdirs(p: &Path) -> Result<impl futures::stream::Stream<Item = Result<PathBuf>>> {
- let read_dir = tokio::fs::read_dir(p).await?;
-
- let stream = tokio_stream::wrappers::ReadDirStream::new(read_dir)
- .then(|entry| async move {
- let de = entry?;
- let is_dir = de.file_type().await?.is_dir();
- let is_hidden = de
- .path()
- .file_name()
- .and_then(|s| s.to_str())
- .map(|s| s.starts_with('.'))
- .unwrap_or(false);
-
- if is_dir && !is_hidden {
- Ok(Some(de.path()))
+ async fn all_subdirs(p: &Path, open_files_sema: std::sync::Arc<tokio::sync::Semaphore>) -> Result<impl futures::stream::Stream<Item = Result<PathBuf>>> {
+ let mut read_dir = tokio::fs::read_dir(p).await?;
+ let sema = open_files_sema.clone();
+ let stream = async_stream::stream! {
+ let sema = sema.clone();
+ loop {
+ log::trace!("Acquiring Semapore: {} - 1", sema.available_permits());
+ let sema_permit = sema.acquire().await?;
+ if let Some(item) = read_dir.next_entry().await? {
+ yield Ok(item) as Result<tokio::fs::DirEntry>;
} else {
- Ok(None)
+ break;
+ }
+ log::trace!("Dropping Semaphore: {} + 1", sema.available_permits());
+ drop(sema_permit);
+ }
+ };
+
+ let stream = stream
+ .then(move |entry| {
+ let sem = open_files_sema.clone();
+
+ async move {
+ log::trace!("[{:?}] Acquiring Semaphore: {} - 1", entry, sem.available_permits());
+ let open_files_permit = sem.acquire().await?;
+ let de = entry?;
+ let is_dir = de.file_type().await?.is_dir();
+ let is_hidden = de
+ .path()
+ .file_name()
+ .and_then(|s| s.to_str())
+ .map(|s| s.starts_with('.'))
+ .unwrap_or(false);
+ log::trace!("[{:?}] Dropping Semaphore: {} + 1", de, sem.available_permits());
+ drop(open_files_permit);
+
+ if is_dir && !is_hidden {
+ Ok(Some(de.path()))
+ } else {
+ Ok(None)
+ }
}
})
.filter_map(|t| async {
@@ -79,14 +102,23 @@ impl Repository {
path: &'a Path,
mut config: config::Config,
progress: &'a indicatif::ProgressBar,
+ open_files_sema: std::sync::Arc<tokio::sync::Semaphore>,
) -> futures::future::BoxFuture<'a, Result<Vec<Result<Package>>>> {
async move {
let pkg_file = path.join("pkg.toml");
if pkg_file.is_file() {
- let buf = tokio::fs::read_to_string(&pkg_file)
- .await
- .with_context(|| format!("Reading {}", pkg_file.display()))?;
+ log::trace!("Reading: {}", pkg_file.display());
+ let buf = {
+ log::trace!("[{}] Acquiring Semaphore: {} - 1", pkg_file.display(), open_files_sema.available_permits());
+ let open_files_permit = open_files_sema.acquire().await?;
+ let r = tokio::fs::read_to_string(&pkg_file)
+ .await
+ .with_context(|| format!("Reading {}", pkg_file.display()))?;
+ log::trace!("[{}] Dropping Semaphore: {} + 1", pkg_file.display(), open_files_sema.available_permits());
+ drop(open_files_permit);
+ r
+ };
// This function has an issue: It loads packages recursively, but if there are
// patches set for a package, these patches are set _relative_ to the current
@@ -137,7 +169,7 @@ impl Repository {
Err(config::ConfigError::NotFound(_)) => vec![],
Err(e) => return Err(e).map_err(Error::from),
};
- trace!("Patches before merging: {:?}", patches_before_merge);
+ trace!("[{}] Patches before merging: {:?}", path.display(), patches_before_merge);
// Merge the new pkg.toml file over the already loaded configuration
config
@@ -149,7 +181,7 @@ impl Repository {
// get the patches that are in the `config` object after the merge
let patches = match config.get_array("patches") {
Ok(v) => {
- trace!("Patches after merging: {:?}", v);
+ trace!("[{}] Patches after merging: {:?}", path.display(), v);
v
},
@@ -164,13 +196,13 @@ impl Repository {
// the root directory of the repository.
.map(|patch| patch.into_str().map_err(Error::from))
.map_ok(|patch| path_relative_to_root.join(patch))
- .inspect(|patch| trace!("Patch relative to root: {:?}", patch.as_ref().map(|p| p.display())))
+ .inspect(|patch| trace!("[{}] Patch relative to root: {:?}", path.display(), patch.as_ref().map(|p| p.display())))
// if the patch file exists, use it (as config::Value).
//
// Otherwise we have an error here, because we're refering to a non-existing file.
.and_then_ok(|patch| if patch.exists() {
- trace!("Path to patch exists: {}", patch.display());
+ trace!("[{}] Path to patch exists: {}", path.display(), patch.display());
Ok(Some(patch))
} else if patches_before_merge.iter().any(|pb| pb.file_name() == patch.file_name()) {
// We have a patch already in the array that is named equal to the patch
@@ -179,7 +211,7 @@ impl Repository {
// because we loaded a deeper pkg.toml file.
Ok(None)
} else {
- trace!("Path to patch does not exist: {}", patch.display());
+ trace!("[{}] Path to patch does not exist: {}", path.display(), patch.display());
Err(anyhow!("Patch does not exist: {}", patch.display()))
})
.filter_map_ok(|o| o)
@@ -193,7 +225,7 @@ impl Repository {
patches_before_merge
};
- trace!("Patches after postprocessing merge: {:?}", patches);
+ trace!("[{}] Patches after postprocessing merge: {:?}", path.display(), patches);
let patches = patches
.into_iter()
.map(|p| p.display().to_string())
@@ -202,7 +234,7 @@ impl Repository {
config.set_once("patches", config::Value::from(patches))?;
}
- let subdirs = all_subdirs(path)
+ let subdirs = all_subdirs(path, open_files_sema.clone())
.await?
.map(|subdir| {
let config = config.clone();
@@ -211,10 +243,10 @@ impl Repository {
let dir = subdir
.with_context(|| format!("Finding subdirs for {}", pkg_file.display()))?;
- trace!("Recursing into {}", dir.display());
- load_recursive(root, &dir, config, progress)
+ trace!("[{}] Recursing into {}", path.display(), dir.display());
+ load_recursive(root, &dir, config, progress, open_files_sema.clone())
.await
- .with_context(|| format!("Reading package from {}", pkg_file.display()))
+ .with_context(|| format!("Reading package from {}", dir.display()))
.map_err(Error::from)
}
})
@@ -253,7 +285,9 @@ impl Repository {
}.boxed()
}
- let inner = load_recursive(path, path, config::Config::default(), progress)
+ let open_files_sema = std::sync::Arc::new(tokio::sync::Semaphore::new(100));
+
+ let inner = load_recursive(path, path, config::Config::default(), progress, open_files_sema)
.await
.with_context(|| format!("Recursing for {}", path.display()))?
.into_iter()