summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-08-17 13:39:52 +0200
committerMatthias Beyer <matthias.beyer@atos.net>2021-08-17 13:39:52 +0200
commit6edb7878fb4a490fbb1fd37a09af1fcf907eeecf (patch)
tree2a1273fb7f72e4500a64bd98cf528fe60f42283e
parent9a3a579c09c532220a986abf5880d9b2d6f445ef (diff)
Revert "Limit opened files by limiting recursion"
-rw-r--r--src/repository/repository.rs34
1 files changed, 24 insertions, 10 deletions
diff --git a/src/repository/repository.rs b/src/repository/repository.rs
index 6429770..a955ddb 100644
--- a/src/repository/repository.rs
+++ b/src/repository/repository.rs
@@ -43,21 +43,31 @@ impl Repository {
use futures::StreamExt;
use futures::FutureExt;
- async fn all_subdirs(p: &Path) -> Result<impl futures::stream::Stream<Item = Result<PathBuf>>> {
+ async fn all_subdirs(p: &Path, open_files_sema: std::sync::Arc<tokio::sync::Semaphore>) -> Result<impl futures::stream::Stream<Item = Result<PathBuf>>> {
let mut read_dir = tokio::fs::read_dir(p).await?;
+ let sema = open_files_sema.clone();
let stream = async_stream::stream! {
+ let sema = sema.clone();
loop {
+ log::trace!("Acquiring Semapore: {} - 1", sema.available_permits());
+ let sema_permit = sema.acquire().await?;
if let Some(item) = read_dir.next_entry().await? {
yield Ok(item) as Result<tokio::fs::DirEntry>;
} else {
break;
}
+ log::trace!("Dropping Semaphore: {} + 1", sema.available_permits());
+ drop(sema_permit);
}
};
let stream = stream
.then(move |entry| {
+ let sem = open_files_sema.clone();
+
async move {
+ log::trace!("[{:?}] Acquiring Semaphore: {} - 1", entry, sem.available_permits());
+ let open_files_permit = sem.acquire().await?;
let de = entry?;
let is_dir = de.file_type().await?.is_dir();
let is_hidden = de
@@ -66,6 +76,8 @@ impl Repository {
.and_then(|s| s.to_str())
.map(|s| s.starts_with('.'))
.unwrap_or(false);
+ log::trace!("[{:?}] Dropping Semaphore: {} + 1", de, sem.available_permits());
+ drop(open_files_permit);
if is_dir && !is_hidden {
Ok(Some(de.path()))
@@ -90,6 +102,7 @@ impl Repository {
path: &'a Path,
mut config: config::Config,
progress: &'a indicatif::ProgressBar,
+ open_files_sema: std::sync::Arc<tokio::sync::Semaphore>,
) -> futures::future::BoxFuture<'a, Result<Vec<Result<Package>>>> {
async move {
let pkg_file = path.join("pkg.toml");
@@ -97,9 +110,13 @@ impl Repository {
if pkg_file.is_file() {
log::trace!("Reading: {}", pkg_file.display());
let buf = {
+ log::trace!("[{}] Acquiring Semaphore: {} - 1", pkg_file.display(), open_files_sema.available_permits());
+ let open_files_permit = open_files_sema.acquire().await?;
let r = tokio::fs::read_to_string(&pkg_file)
.await
.with_context(|| format!("Reading {}", pkg_file.display()))?;
+ log::trace!("[{}] Dropping Semaphore: {} + 1", pkg_file.display(), open_files_sema.available_permits());
+ drop(open_files_permit);
r
};
@@ -217,9 +234,7 @@ impl Repository {
config.set_once("patches", config::Value::from(patches))?;
}
- let recursion_limit_sema = tokio::sync::Semaphore::new(5);
-
- let subdirs = all_subdirs(path)
+ let subdirs = all_subdirs(path, open_files_sema.clone())
.await?
.map(|subdir| {
let config = config.clone();
@@ -229,13 +244,10 @@ impl Repository {
.with_context(|| format!("Finding subdirs for {}", pkg_file.display()))?;
trace!("[{}] Recursing into {}", path.display(), dir.display());
- let permit = recursion_limit_sema.acquire().await?;
- let res = load_recursive(root, &dir, config, progress)
+ load_recursive(root, &dir, config, progress, open_files_sema.clone())
.await
.with_context(|| format!("Reading package from {}", dir.display()))
- .map_err(Error::from);
- drop(permit);
- res
+ .map_err(Error::from)
}
})
.collect::<futures::stream::FuturesUnordered<_>>()
@@ -273,7 +285,9 @@ impl Repository {
}.boxed()
}
- let inner = load_recursive(path, path, config::Config::default(), progress)
+ let open_files_sema = std::sync::Arc::new(tokio::sync::Semaphore::new(100));
+
+ let inner = load_recursive(path, path, config::Config::default(), progress, open_files_sema)
.await
.with_context(|| format!("Recursing for {}", path.display()))?
.into_iter()