summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-08-17 12:50:06 +0200
committerMatthias Beyer <matthias.beyer@atos.net>2021-08-17 12:53:24 +0200
commit9a3a579c09c532220a986abf5880d9b2d6f445ef (patch)
tree291cbadc1b828bbc246ffccaaa7ee210b0f3c06f
parent1de4fcb9bc1855e3149500e445a51c86905183d2 (diff)
Limit opened files by limiting recursion
This patch limits the recursion to only parallelize recursion by factor 5 for now. This seems to work with an ulimit of 1000 files for a repository with ~1800 pkg.toml files in ~3500 directories. Unfortunately, this does not _really_ speed up things (20 sec vs 16 sec, not really scientifically measured). So this is not a nice solution and we really should re-think our loading mechanics. Signed-off-by: Matthias Beyer <matthias.beyer@atos.net> Tested-by: Matthias Beyer <matthias.beyer@atos.net>
-rw-r--r--src/repository/repository.rs34
1 files changed, 10 insertions, 24 deletions
diff --git a/src/repository/repository.rs b/src/repository/repository.rs
index a955ddb..6429770 100644
--- a/src/repository/repository.rs
+++ b/src/repository/repository.rs
@@ -43,31 +43,21 @@ impl Repository {
use futures::StreamExt;
use futures::FutureExt;
- async fn all_subdirs(p: &Path, open_files_sema: std::sync::Arc<tokio::sync::Semaphore>) -> Result<impl futures::stream::Stream<Item = Result<PathBuf>>> {
+ async fn all_subdirs(p: &Path) -> Result<impl futures::stream::Stream<Item = Result<PathBuf>>> {
let mut read_dir = tokio::fs::read_dir(p).await?;
- let sema = open_files_sema.clone();
let stream = async_stream::stream! {
- let sema = sema.clone();
loop {
- log::trace!("Acquiring Semapore: {} - 1", sema.available_permits());
- let sema_permit = sema.acquire().await?;
if let Some(item) = read_dir.next_entry().await? {
yield Ok(item) as Result<tokio::fs::DirEntry>;
} else {
break;
}
- log::trace!("Dropping Semaphore: {} + 1", sema.available_permits());
- drop(sema_permit);
}
};
let stream = stream
.then(move |entry| {
- let sem = open_files_sema.clone();
-
async move {
- log::trace!("[{:?}] Acquiring Semaphore: {} - 1", entry, sem.available_permits());
- let open_files_permit = sem.acquire().await?;
let de = entry?;
let is_dir = de.file_type().await?.is_dir();
let is_hidden = de
@@ -76,8 +66,6 @@ impl Repository {
.and_then(|s| s.to_str())
.map(|s| s.starts_with('.'))
.unwrap_or(false);
- log::trace!("[{:?}] Dropping Semaphore: {} + 1", de, sem.available_permits());
- drop(open_files_permit);
if is_dir && !is_hidden {
Ok(Some(de.path()))
@@ -102,7 +90,6 @@ impl Repository {
path: &'a Path,
mut config: config::Config,
progress: &'a indicatif::ProgressBar,
- open_files_sema: std::sync::Arc<tokio::sync::Semaphore>,
) -> futures::future::BoxFuture<'a, Result<Vec<Result<Package>>>> {
async move {
let pkg_file = path.join("pkg.toml");
@@ -110,13 +97,9 @@ impl Repository {
if pkg_file.is_file() {
log::trace!("Reading: {}", pkg_file.display());
let buf = {
- log::trace!("[{}] Acquiring Semaphore: {} - 1", pkg_file.display(), open_files_sema.available_permits());
- let open_files_permit = open_files_sema.acquire().await?;
let r = tokio::fs::read_to_string(&pkg_file)
.await
.with_context(|| format!("Reading {}", pkg_file.display()))?;
- log::trace!("[{}] Dropping Semaphore: {} + 1", pkg_file.display(), open_files_sema.available_permits());
- drop(open_files_permit);
r
};
@@ -234,7 +217,9 @@ impl Repository {
config.set_once("patches", config::Value::from(patches))?;
}
- let subdirs = all_subdirs(path, open_files_sema.clone())
+ let recursion_limit_sema = tokio::sync::Semaphore::new(5);
+
+ let subdirs = all_subdirs(path)
.await?
.map(|subdir| {
let config = config.clone();
@@ -244,10 +229,13 @@ impl Repository {
.with_context(|| format!("Finding subdirs for {}", pkg_file.display()))?;
trace!("[{}] Recursing into {}", path.display(), dir.display());
- load_recursive(root, &dir, config, progress, open_files_sema.clone())
+ let permit = recursion_limit_sema.acquire().await?;
+ let res = load_recursive(root, &dir, config, progress)
.await
.with_context(|| format!("Reading package from {}", dir.display()))
- .map_err(Error::from)
+ .map_err(Error::from);
+ drop(permit);
+ res
}
})
.collect::<futures::stream::FuturesUnordered<_>>()
@@ -285,9 +273,7 @@ impl Repository {
}.boxed()
}
- let open_files_sema = std::sync::Arc::new(tokio::sync::Semaphore::new(100));
-
- let inner = load_recursive(path, path, config::Config::default(), progress, open_files_sema)
+ let inner = load_recursive(path, path, config::Config::default(), progress)
.await
.with_context(|| format!("Recursing for {}", path.display()))?
.into_iter()