summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-08-18 10:01:29 +0200
committerMatthias Beyer <matthias.beyer@atos.net>2021-08-18 10:01:29 +0200
commita02df5bda0fd490c34b2dd80f2a2e85b59f08cb8 (patch)
tree0252a3a7d92dc5059b3d491bdd6f5461807e9301
parent6edb7878fb4a490fbb1fd37a09af1fcf907eeecf (diff)
Introduce proxy object for limiting filesystem access with semaphoreparallel-repository-loading
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
-rw-r--r--src/repository/fs.rs93
-rw-r--r--src/repository/mod.rs2
-rw-r--r--src/repository/repository.rs139
3 files changed, 163 insertions, 71 deletions
diff --git a/src/repository/fs.rs b/src/repository/fs.rs
new file mode 100644
index 0000000..435e2be
--- /dev/null
+++ b/src/repository/fs.rs
@@ -0,0 +1,93 @@
+use std::ops::Deref;
+use std::path::Path;
+use std::sync::Arc;
+
+use anyhow::Error;
+use anyhow::Result;
+use tokio_stream::Stream;
+
+
+/// Helper type to limit access to the filesystem in massive-parallel cases where we would run into
+/// system error 24 ("too many open files").
+#[derive(Clone)]
+pub struct FileSystemAccessor {
+ sem: Arc<tokio::sync::Semaphore>,
+}
+
+impl FileSystemAccessor {
+ pub fn new(limit: usize) -> Self {
+ FileSystemAccessor { sem: Arc::new(tokio::sync::Semaphore::new(limit)) }
+ }
+
+ #[inline]
+ pub fn available_permits(&self) -> usize {
+ self.sem.available_permits()
+ }
+
+ pub async fn read_dir(&self, path: impl AsRef<Path>) -> Result<impl Stream<Item = Result<DirEntry>>> {
+ log::trace!("Accessor::read_dir({}): Acquiring Semaphore: {} - 1", path.as_ref().display(), self.available_permits());
+ let perm = self.sem.acquire().await?;
+ let sema = self.sem.clone();
+ let mut read_dir = tokio::fs::read_dir(&path).await?;
+
+ let stream = async_stream::stream! {
+ loop {
+ log::trace!("Accessor::read_dir(_): Acquiring Semaphore: {} - 1", sema.available_permits());
+ let permit = sema.clone().acquire_owned().await?;
+ if let Some(entry) = read_dir.next_entry().await? {
+ let dentry = DirEntry { permit, entry };
+ yield Ok(dentry) as Result<_>;
+ } else {
+ break;
+ }
+ }
+ };
+
+ log::trace!("Accessor::read_dir({}): Dropping Semaphore: {} + 1", path.as_ref().display(), self.available_permits());
+ drop(perm);
+ Ok(stream)
+ }
+
+ pub async fn exists(&self, p: impl AsRef<Path>) -> Result<bool> {
+ log::trace!("Accessor::exists({}): Acquiring Semaphore: {} - 1", p.as_ref().display(), self.available_permits());
+ let perm = self.sem.acquire().await?;
+ let r = p.as_ref().exists();
+ log::trace!("Accessor::exists({}): Dropping Semaphore: {} + 1", p.as_ref().display(), self.available_permits());
+ drop(perm);
+ Ok(r)
+ }
+
+ pub async fn is_file(&self, p: impl AsRef<Path>) -> Result<bool> {
+ log::trace!("Accessor::is_file({}): Acquiring Semaphore: {} - 1", p.as_ref().display(), self.available_permits());
+ let perm = self.sem.acquire().await?;
+ let r = p.as_ref().is_file();
+ log::trace!("Accessor::is_file({}): Dropping Semaphore: {} + 1", p.as_ref().display(), self.available_permits());
+ drop(perm);
+ Ok(r)
+ }
+
+ pub async fn read_to_string(&self, p: impl AsRef<Path>) -> Result<String> {
+ log::trace!("Accessor::read_to_string({}): Acquiring Semaphore: {} - 1", p.as_ref().display(), self.available_permits());
+ let perm = self.sem.acquire().await?;
+ let r = tokio::fs::read_to_string(&p).await.map_err(Error::from);
+ log::trace!("Accessor::read_to_string({}): Dropping Semaphore: {} + 1", p.as_ref().display(), self.available_permits());
+ drop(perm);
+ r
+ }
+}
+
+#[derive(Debug)]
+pub struct DirEntry {
+ permit: tokio::sync::OwnedSemaphorePermit,
+ entry: tokio::fs::DirEntry,
+}
+
+
+impl Deref for DirEntry {
+ type Target = tokio::fs::DirEntry;
+
+ fn deref(&self) -> &Self::Target {
+ &self.entry
+ }
+}
+
diff --git a/src/repository/mod.rs b/src/repository/mod.rs
index ca24117..a577f04 100644
--- a/src/repository/mod.rs
+++ b/src/repository/mod.rs
@@ -11,3 +11,5 @@
#![allow(clippy::module_inception)]
mod repository;
pub use repository::*;
+
+mod fs;
diff --git a/src/repository/repository.rs b/src/repository/repository.rs
index a955ddb..5b358f3 100644
--- a/src/repository/repository.rs
+++ b/src/repository/repository.rs
@@ -17,14 +17,13 @@ use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use log::trace;
-use resiter::AndThen;
-use resiter::FilterMap;
use resiter::Map;
use crate::package::Package;
use crate::package::PackageName;
use crate::package::PackageVersion;
use crate::package::PackageVersionConstraint;
+use crate::repository::fs::FileSystemAccessor;
/// A repository represents a collection of packages
pub struct Repository {
@@ -43,31 +42,11 @@ impl Repository {
use futures::StreamExt;
use futures::FutureExt;
- async fn all_subdirs(p: &Path, open_files_sema: std::sync::Arc<tokio::sync::Semaphore>) -> Result<impl futures::stream::Stream<Item = Result<PathBuf>>> {
- let mut read_dir = tokio::fs::read_dir(p).await?;
- let sema = open_files_sema.clone();
- let stream = async_stream::stream! {
- let sema = sema.clone();
- loop {
- log::trace!("Acquiring Semapore: {} - 1", sema.available_permits());
- let sema_permit = sema.acquire().await?;
- if let Some(item) = read_dir.next_entry().await? {
- yield Ok(item) as Result<tokio::fs::DirEntry>;
- } else {
- break;
- }
- log::trace!("Dropping Semaphore: {} + 1", sema.available_permits());
- drop(sema_permit);
- }
- };
-
- let stream = stream
+ async fn all_subdirs(p: PathBuf, accessor: FileSystemAccessor) -> Result<impl futures::stream::Stream<Item = Result<PathBuf>>> {
+ let stream = accessor.read_dir(p)
+ .await?
.then(move |entry| {
- let sem = open_files_sema.clone();
-
async move {
- log::trace!("[{:?}] Acquiring Semaphore: {} - 1", entry, sem.available_permits());
- let open_files_permit = sem.acquire().await?;
let de = entry?;
let is_dir = de.file_type().await?.is_dir();
let is_hidden = de
@@ -76,8 +55,6 @@ impl Repository {
.and_then(|s| s.to_str())
.map(|s| s.starts_with('.'))
.unwrap_or(false);
- log::trace!("[{:?}] Dropping Semaphore: {} + 1", de, sem.available_permits());
- drop(open_files_permit);
if is_dir && !is_hidden {
Ok(Some(de.path()))
@@ -102,23 +79,16 @@ impl Repository {
path: &'a Path,
mut config: config::Config,
progress: &'a indicatif::ProgressBar,
- open_files_sema: std::sync::Arc<tokio::sync::Semaphore>,
+ accessor: FileSystemAccessor,
) -> futures::future::BoxFuture<'a, Result<Vec<Result<Package>>>> {
async move {
let pkg_file = path.join("pkg.toml");
- if pkg_file.is_file() {
+ if accessor.is_file(&pkg_file).await? {
log::trace!("Reading: {}", pkg_file.display());
- let buf = {
- log::trace!("[{}] Acquiring Semaphore: {} - 1", pkg_file.display(), open_files_sema.available_permits());
- let open_files_permit = open_files_sema.acquire().await?;
- let r = tokio::fs::read_to_string(&pkg_file)
- .await
- .with_context(|| format!("Reading {}", pkg_file.display()))?;
- log::trace!("[{}] Dropping Semaphore: {} + 1", pkg_file.display(), open_files_sema.available_permits());
- drop(open_files_permit);
- r
- };
+ let buf = accessor.read_to_string(&pkg_file)
+ .await
+ .with_context(|| format!("Reading {}", pkg_file.display()))?;
// This function has an issue: It loads packages recursively, but if there are
// patches set for a package, these patches are set _relative_ to the current
@@ -155,16 +125,20 @@ impl Repository {
// first of all, we get the patches array.
// This is either the patches array from the last recursion or the newly set one,
// that doesn't matter here.
+ log::trace!("Fetching patches before merge");
let patches_before_merge = match config.get_array("patches") {
Ok(v) => {
+ log::trace!("Array found");
v.into_iter()
.map(|p| {
+ log::trace!("Patch path to string: {:?}", p);
p.into_str()
.map(PathBuf::from)
.with_context(|| anyhow!("patches must be strings"))
.map_err(Error::from)
})
- .collect::<Result<Vec<_>>>()?
+ .collect::<Result<Vec<_>>>()
+ .context("Collecting patches")?
},
Err(config::ConfigError::NotFound(_)) => vec![],
Err(e) => return Err(e).map_err(Error::from),
@@ -179,6 +153,7 @@ impl Repository {
let path_relative_to_root = path.strip_prefix(root)?;
// get the patches that are in the `config` object after the merge
+ log::trace!("Fetching patches after merge");
let patches = match config.get_array("patches") {
Ok(v) => {
trace!("[{}] Patches after merging: {:?}", path.display(), v);
@@ -196,30 +171,46 @@ impl Repository {
// the root directory of the repository.
.map(|patch| patch.into_str().map_err(Error::from))
.map_ok(|patch| path_relative_to_root.join(patch))
- .inspect(|patch| trace!("[{}] Patch relative to root: {:?}", path.display(), patch.as_ref().map(|p| p.display())))
+ .inspect(|patch| trace!("[{}] Patch relative to root: {:?}", path.display(), patch.as_ref().map(|p| p.display())));
- // if the patch file exists, use it (as config::Value).
+ // now, we need to "manually" iter over the iterator here, because we need to
+ // call FileSystemAccessor::exists for each patch, and the method is async.
//
- // Otherwise we have an error here, because we're refering to a non-existing file.
- .and_then_ok(|patch| if patch.exists() {
- trace!("[{}] Path to patch exists: {}", path.display(), patch.display());
- Ok(Some(patch))
- } else if patches_before_merge.iter().any(|pb| pb.file_name() == patch.file_name()) {
- // We have a patch already in the array that is named equal to the patch
- // we have in the current recursion.
- // It seems like this patch was already in the list and we re-found it
- // because we loaded a deeper pkg.toml file.
- Ok(None)
- } else {
- trace!("[{}] Path to patch does not exist: {}", path.display(), patch.display());
- Err(anyhow!("Patch does not exist: {}", patch.display()))
- })
- .filter_map_ok(|o| o)
- .collect::<Result<Vec<_>>>()?;
+ // We collect the patches "manually" here into a preallocated vector of decent
+ // size
+ log::trace!("Allocating collection for found patches with size {}", patches.size_hint().0);
+ let mut collected_patches = Vec::with_capacity(patches.size_hint().0);
+ log::trace!("Collecting patches...");
+ for patch in patches {
+ match patch {
+ Ok(patch) => if accessor.exists(&patch).await? {
+ trace!("[{}] Path to patch exists: {}", path.display(), patch.display());
+
+ collected_patches.push(patch);
+ } else if patches_before_merge.iter().any(|pb| pb.file_name() == patch.file_name()) {
+ // We have a patch already in the array that is named equal to the patch
+ // we have in the current recursion.
+ // It seems like this patch was already in the list and we re-found it
+ // because we loaded a deeper pkg.toml file.
+ /* do nothing */
+ } else {
+ trace!("[{}] Path to patch does not exist: {}", path.display(), patch.display());
+ Err(anyhow!("Patch does not exist: {}", patch.display()))
+ .context("Collecting patches after merge")?
+ },
+
+ Err(e) => Err(e).context("Collecting patches after merge")?,
+ }
+ }
+ log::trace!("Collected patches");
+ let patches = collected_patches; // rebind variable
// If we found any patches, use them. Otherwise use the array from before the merge
// (which already has the correct pathes from the previous recursion).
- let patches = if !patches.is_empty() && patches.iter().all(|p| p.exists()) {
+ //
+ // We don't need to check whether the patches all exist, because we checked
+ // that above when collecting them
+ let patches = if !patches.is_empty() /* && patches.iter().all(|p| accessor.exists(p).await?) */ {
patches
} else {
patches_before_merge
@@ -231,11 +222,14 @@ impl Repository {
.map(|p| p.display().to_string())
.map(config::Value::from)
.collect::<Vec<_>>();
- config.set_once("patches", config::Value::from(patches))?;
+
+ config.set_once("patches", config::Value::from(patches))
+ .context("Setting patches after merge")?;
}
- let subdirs = all_subdirs(path, open_files_sema.clone())
- .await?
+ let subdirs = all_subdirs(path.to_path_buf(), accessor.clone())
+ .await
+ .with_context(|| anyhow!("Fetching subdirs for {}", path.display()))?
.map(|subdir| {
let config = config.clone();
@@ -244,25 +238,28 @@ impl Repository {
.with_context(|| format!("Finding subdirs for {}", pkg_file.display()))?;
trace!("[{}] Recursing into {}", path.display(), dir.display());
- load_recursive(root, &dir, config, progress, open_files_sema.clone())
+ let res = load_recursive(root, &dir, config, progress, accessor.clone())
.await
.with_context(|| format!("Reading package from {}", dir.display()))
- .map_err(Error::from)
+ .map_err(Error::from);
+ res
}
})
- .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<futures::stream::FuturesOrdered<_>>()
.await
.collect::<Vec<Result<Vec<Result<Package>>>>>()
.await
.into_iter()
- .collect::<Result<Vec<Vec<Result<Package>>>>>()?
+ .collect::<Result<Vec<Vec<Result<Package>>>>>()
+ .context("Collecting recursively loaded packages")?
.into_iter()
.flatten()
.collect::<Vec<Result<Package>>>();
+ progress.tick();
+
if subdirs.is_empty() {
- progress.tick();
- if pkg_file.is_file() {
+ if accessor.is_file(&pkg_file).await? {
let package = config.try_into()
.with_context(|| format!("Failed to parse {} into package", path.display()))
.and_then(|package: Package| {
@@ -285,9 +282,9 @@ impl Repository {
}.boxed()
}
- let open_files_sema = std::sync::Arc::new(tokio::sync::Semaphore::new(100));
+ let fs_accessor = FileSystemAccessor::new(1);
- let inner = load_recursive(path, path, config::Config::default(), progress, open_files_sema)
+ let inner = load_recursive(path, path, config::Config::default(), progress, fs_accessor)
.await
.with_context(|| format!("Recursing for {}", path.display()))?
.into_iter()
@@ -298,7 +295,7 @@ impl Repository {
Ok(Repository { inner })
}
- pub fn find_by_name<'a>(&'a self, name: &PackageName) -> Vec<&'a Package> {
+pub fn find_by_name<'a>(&'a self, name: &PackageName) -> Vec<&'a Package> {
trace!("Searching for '{}' in repository", name);
self.inner
.iter()