From a02df5bda0fd490c34b2dd80f2a2e85b59f08cb8 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Wed, 18 Aug 2021 10:01:29 +0200 Subject: Introduce proxy object for limiting filesystem access with semaphore Signed-off-by: Matthias Beyer --- src/repository/fs.rs | 93 +++++++++++++++++++++++++++++ src/repository/mod.rs | 2 + src/repository/repository.rs | 139 +++++++++++++++++++++---------------------- 3 files changed, 163 insertions(+), 71 deletions(-) create mode 100644 src/repository/fs.rs diff --git a/src/repository/fs.rs b/src/repository/fs.rs new file mode 100644 index 0000000..435e2be --- /dev/null +++ b/src/repository/fs.rs @@ -0,0 +1,93 @@ +use std::ops::Deref; +use std::path::Path; +use std::sync::Arc; + +use anyhow::Error; +use anyhow::Result; +use tokio_stream::Stream; + + +/// Helper type to limit access to the filesystem in massive-parallel cases where we would run into +/// system error 24 ("too many open files"). +#[derive(Clone)] +pub struct FileSystemAccessor { + sem: Arc, +} + +impl FileSystemAccessor { + pub fn new(limit: usize) -> Self { + FileSystemAccessor { sem: Arc::new(tokio::sync::Semaphore::new(limit)) } + } + + #[inline] + pub fn available_permits(&self) -> usize { + self.sem.available_permits() + } + + pub async fn read_dir(&self, path: impl AsRef) -> Result>> { + log::trace!("Accessor::read_dir({}): Acquiring Semaphore: {} - 1", path.as_ref().display(), self.available_permits()); + let perm = self.sem.acquire().await?; + let sema = self.sem.clone(); + let mut read_dir = tokio::fs::read_dir(&path).await?; + + let stream = async_stream::stream! { + loop { + log::trace!("Accessor::read_dir(_): Acquiring Semaphore: {} - 1", sema.available_permits()); + let permit = sema.clone().acquire_owned().await?; + if let Some(entry) = read_dir.next_entry().await? { + let dentry = DirEntry { permit, entry }; + yield Ok(dentry) as Result<_>; + } else { + break; + } + } + }; + + log::trace!("Accessor::read_dir({}): Dropping Semaphore: {} + 1", path.as_ref().display(), self.available_permits()); + drop(perm); + Ok(stream) + } + + pub async fn exists(&self, p: impl AsRef) -> Result { + log::trace!("Accessor::exists({}): Acquiring Semaphore: {} - 1", p.as_ref().display(), self.available_permits()); + let perm = self.sem.acquire().await?; + let r = p.as_ref().exists(); + log::trace!("Accessor::exists({}): Dropping Semaphore: {} + 1", p.as_ref().display(), self.available_permits()); + drop(perm); + Ok(r) + } + + pub async fn is_file(&self, p: impl AsRef) -> Result { + log::trace!("Accessor::is_file({}): Acquiring Semaphore: {} - 1", p.as_ref().display(), self.available_permits()); + let perm = self.sem.acquire().await?; + let r = p.as_ref().is_file(); + log::trace!("Accessor::is_file({}): Dropping Semaphore: {} + 1", p.as_ref().display(), self.available_permits()); + drop(perm); + Ok(r) + } + + pub async fn read_to_string(&self, p: impl AsRef) -> Result { + log::trace!("Accessor::read_to_string({}): Acquiring Semaphore: {} - 1", p.as_ref().display(), self.available_permits()); + let perm = self.sem.acquire().await?; + let r = tokio::fs::read_to_string(&p).await.map_err(Error::from); + log::trace!("Accessor::read_to_string({}): Dropping Semaphore: {} + 1", p.as_ref().display(), self.available_permits()); + drop(perm); + r + } +} + +#[derive(Debug)] +pub struct DirEntry { + permit: tokio::sync::OwnedSemaphorePermit, + entry: tokio::fs::DirEntry, +} + + +impl Deref for DirEntry { + type Target = tokio::fs::DirEntry; + + fn deref(&self) -> &Self::Target { + &self.entry + } +} + diff --git a/src/repository/mod.rs b/src/repository/mod.rs index ca24117..a577f04 100644 --- a/src/repository/mod.rs +++ b/src/repository/mod.rs @@ -11,3 +11,5 @@ #![allow(clippy::module_inception)] mod repository; pub use repository::*; + +mod fs; diff --git a/src/repository/repository.rs b/src/repository/repository.rs index a955ddb..5b358f3 100644 --- a/src/repository/repository.rs +++ b/src/repository/repository.rs @@ -17,14 +17,13 @@ use anyhow::Context; use anyhow::Error; use anyhow::Result; use log::trace; -use resiter::AndThen; -use resiter::FilterMap; use resiter::Map; use crate::package::Package; use crate::package::PackageName; use crate::package::PackageVersion; use crate::package::PackageVersionConstraint; +use crate::repository::fs::FileSystemAccessor; /// A repository represents a collection of packages pub struct Repository { @@ -43,31 +42,11 @@ impl Repository { use futures::StreamExt; use futures::FutureExt; - async fn all_subdirs(p: &Path, open_files_sema: std::sync::Arc) -> Result>> { - let mut read_dir = tokio::fs::read_dir(p).await?; - let sema = open_files_sema.clone(); - let stream = async_stream::stream! { - let sema = sema.clone(); - loop { - log::trace!("Acquiring Semapore: {} - 1", sema.available_permits()); - let sema_permit = sema.acquire().await?; - if let Some(item) = read_dir.next_entry().await? { - yield Ok(item) as Result; - } else { - break; - } - log::trace!("Dropping Semaphore: {} + 1", sema.available_permits()); - drop(sema_permit); - } - }; - - let stream = stream + async fn all_subdirs(p: PathBuf, accessor: FileSystemAccessor) -> Result>> { + let stream = accessor.read_dir(p) + .await? .then(move |entry| { - let sem = open_files_sema.clone(); - async move { - log::trace!("[{:?}] Acquiring Semaphore: {} - 1", entry, sem.available_permits()); - let open_files_permit = sem.acquire().await?; let de = entry?; let is_dir = de.file_type().await?.is_dir(); let is_hidden = de @@ -76,8 +55,6 @@ impl Repository { .and_then(|s| s.to_str()) .map(|s| s.starts_with('.')) .unwrap_or(false); - log::trace!("[{:?}] Dropping Semaphore: {} + 1", de, sem.available_permits()); - drop(open_files_permit); if is_dir && !is_hidden { Ok(Some(de.path())) @@ -102,23 +79,16 @@ impl Repository { path: &'a Path, mut config: config::Config, progress: &'a indicatif::ProgressBar, - open_files_sema: std::sync::Arc, + accessor: FileSystemAccessor, ) -> futures::future::BoxFuture<'a, Result>>> { async move { let pkg_file = path.join("pkg.toml"); - if pkg_file.is_file() { + if accessor.is_file(&pkg_file).await? { log::trace!("Reading: {}", pkg_file.display()); - let buf = { - log::trace!("[{}] Acquiring Semaphore: {} - 1", pkg_file.display(), open_files_sema.available_permits()); - let open_files_permit = open_files_sema.acquire().await?; - let r = tokio::fs::read_to_string(&pkg_file) - .await - .with_context(|| format!("Reading {}", pkg_file.display()))?; - log::trace!("[{}] Dropping Semaphore: {} + 1", pkg_file.display(), open_files_sema.available_permits()); - drop(open_files_permit); - r - }; + let buf = accessor.read_to_string(&pkg_file) + .await + .with_context(|| format!("Reading {}", pkg_file.display()))?; // This function has an issue: It loads packages recursively, but if there are // patches set for a package, these patches are set _relative_ to the current @@ -155,16 +125,20 @@ impl Repository { // first of all, we get the patches array. // This is either the patches array from the last recursion or the newly set one, // that doesn't matter here. + log::trace!("Fetching patches before merge"); let patches_before_merge = match config.get_array("patches") { Ok(v) => { + log::trace!("Array found"); v.into_iter() .map(|p| { + log::trace!("Patch path to string: {:?}", p); p.into_str() .map(PathBuf::from) .with_context(|| anyhow!("patches must be strings")) .map_err(Error::from) }) - .collect::>>()? + .collect::>>() + .context("Collecting patches")? }, Err(config::ConfigError::NotFound(_)) => vec![], Err(e) => return Err(e).map_err(Error::from), @@ -179,6 +153,7 @@ impl Repository { let path_relative_to_root = path.strip_prefix(root)?; // get the patches that are in the `config` object after the merge + log::trace!("Fetching patches after merge"); let patches = match config.get_array("patches") { Ok(v) => { trace!("[{}] Patches after merging: {:?}", path.display(), v); @@ -196,30 +171,46 @@ impl Repository { // the root directory of the repository. .map(|patch| patch.into_str().map_err(Error::from)) .map_ok(|patch| path_relative_to_root.join(patch)) - .inspect(|patch| trace!("[{}] Patch relative to root: {:?}", path.display(), patch.as_ref().map(|p| p.display()))) + .inspect(|patch| trace!("[{}] Patch relative to root: {:?}", path.display(), patch.as_ref().map(|p| p.display()))); - // if the patch file exists, use it (as config::Value). + // now, we need to "manually" iter over the iterator here, because we need to + // call FileSystemAccessor::exists for each patch, and the method is async. // - // Otherwise we have an error here, because we're refering to a non-existing file. - .and_then_ok(|patch| if patch.exists() { - trace!("[{}] Path to patch exists: {}", path.display(), patch.display()); - Ok(Some(patch)) - } else if patches_before_merge.iter().any(|pb| pb.file_name() == patch.file_name()) { - // We have a patch already in the array that is named equal to the patch - // we have in the current recursion. - // It seems like this patch was already in the list and we re-found it - // because we loaded a deeper pkg.toml file. - Ok(None) - } else { - trace!("[{}] Path to patch does not exist: {}", path.display(), patch.display()); - Err(anyhow!("Patch does not exist: {}", patch.display())) - }) - .filter_map_ok(|o| o) - .collect::>>()?; + // We collect the patches "manually" here into a preallocated vector of decent + // size + log::trace!("Allocating collection for found patches with size {}", patches.size_hint().0); + let mut collected_patches = Vec::with_capacity(patches.size_hint().0); + log::trace!("Collecting patches..."); + for patch in patches { + match patch { + Ok(patch) => if accessor.exists(&patch).await? { + trace!("[{}] Path to patch exists: {}", path.display(), patch.display()); + + collected_patches.push(patch); + } else if patches_before_merge.iter().any(|pb| pb.file_name() == patch.file_name()) { + // We have a patch already in the array that is named equal to the patch + // we have in the current recursion. + // It seems like this patch was already in the list and we re-found it + // because we loaded a deeper pkg.toml file. + /* do nothing */ + } else { + trace!("[{}] Path to patch does not exist: {}", path.display(), patch.display()); + Err(anyhow!("Patch does not exist: {}", patch.display())) + .context("Collecting patches after merge")? + }, + + Err(e) => Err(e).context("Collecting patches after merge")?, + } + } + log::trace!("Collected patches"); + let patches = collected_patches; // rebind variable // If we found any patches, use them. Otherwise use the array from before the merge // (which already has the correct pathes from the previous recursion). - let patches = if !patches.is_empty() && patches.iter().all(|p| p.exists()) { + // + // We don't need to check whether the patches all exist, because we checked + // that above when collecting them + let patches = if !patches.is_empty() /* && patches.iter().all(|p| accessor.exists(p).await?) */ { patches } else { patches_before_merge @@ -231,11 +222,14 @@ impl Repository { .map(|p| p.display().to_string()) .map(config::Value::from) .collect::>(); - config.set_once("patches", config::Value::from(patches))?; + + config.set_once("patches", config::Value::from(patches)) + .context("Setting patches after merge")?; } - let subdirs = all_subdirs(path, open_files_sema.clone()) - .await? + let subdirs = all_subdirs(path.to_path_buf(), accessor.clone()) + .await + .with_context(|| anyhow!("Fetching subdirs for {}", path.display()))? .map(|subdir| { let config = config.clone(); @@ -244,25 +238,28 @@ impl Repository { .with_context(|| format!("Finding subdirs for {}", pkg_file.display()))?; trace!("[{}] Recursing into {}", path.display(), dir.display()); - load_recursive(root, &dir, config, progress, open_files_sema.clone()) + let res = load_recursive(root, &dir, config, progress, accessor.clone()) .await .with_context(|| format!("Reading package from {}", dir.display())) - .map_err(Error::from) + .map_err(Error::from); + res } }) - .collect::>() + .collect::>() .await .collect::>>>>() .await .into_iter() - .collect::>>>>()? + .collect::>>>>() + .context("Collecting recursively loaded packages")? .into_iter() .flatten() .collect::>>(); + progress.tick(); + if subdirs.is_empty() { - progress.tick(); - if pkg_file.is_file() { + if accessor.is_file(&pkg_file).await? { let package = config.try_into() .with_context(|| format!("Failed to parse {} into package", path.display())) .and_then(|package: Package| { @@ -285,9 +282,9 @@ impl Repository { }.boxed() } - let open_files_sema = std::sync::Arc::new(tokio::sync::Semaphore::new(100)); + let fs_accessor = FileSystemAccessor::new(1); - let inner = load_recursive(path, path, config::Config::default(), progress, open_files_sema) + let inner = load_recursive(path, path, config::Config::default(), progress, fs_accessor) .await .with_context(|| format!("Recursing for {}", path.display()))? .into_iter() @@ -298,7 +295,7 @@ impl Repository { Ok(Repository { inner }) } - pub fn find_by_name<'a>(&'a self, name: &PackageName) -> Vec<&'a Package> { +pub fn find_by_name<'a>(&'a self, name: &PackageName) -> Vec<&'a Package> { trace!("Searching for '{}' in repository", name); self.inner .iter() -- cgit v1.2.3