diff options
author | Matthias Beyer <matthias.beyer@atos.net> | 2021-08-18 10:01:29 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@atos.net> | 2021-08-18 10:01:29 +0200 |
commit | a02df5bda0fd490c34b2dd80f2a2e85b59f08cb8 (patch) | |
tree | 0252a3a7d92dc5059b3d491bdd6f5461807e9301 /src/repository/fs.rs | |
parent | 6edb7878fb4a490fbb1fd37a09af1fcf907eeecf (diff) |
Introduce proxy object for limiting filesystem access with semaphoreparallel-repository-loading
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/repository/fs.rs')
-rw-r--r-- | src/repository/fs.rs | 93 |
1 files changed, 93 insertions, 0 deletions
diff --git a/src/repository/fs.rs b/src/repository/fs.rs new file mode 100644 index 0000000..435e2be --- /dev/null +++ b/src/repository/fs.rs @@ -0,0 +1,93 @@ +use std::ops::Deref; +use std::path::Path; +use std::sync::Arc; + +use anyhow::Error; +use anyhow::Result; +use tokio_stream::Stream; + + +/// Helper type to limit access to the filesystem in massive-parallel cases where we would run into +/// system error 24 ("too many open files"). +#[derive(Clone)] +pub struct FileSystemAccessor { + sem: Arc<tokio::sync::Semaphore>, +} + +impl FileSystemAccessor { + pub fn new(limit: usize) -> Self { + FileSystemAccessor { sem: Arc::new(tokio::sync::Semaphore::new(limit)) } + } + + #[inline] + pub fn available_permits(&self) -> usize { + self.sem.available_permits() + } + + pub async fn read_dir(&self, path: impl AsRef<Path>) -> Result<impl Stream<Item = Result<DirEntry>>> { + log::trace!("Accessor::read_dir({}): Acquiring Semaphore: {} - 1", path.as_ref().display(), self.available_permits()); + let perm = self.sem.acquire().await?; + let sema = self.sem.clone(); + let mut read_dir = tokio::fs::read_dir(&path).await?; + + let stream = async_stream::stream! { + loop { + log::trace!("Accessor::read_dir(_): Acquiring Semaphore: {} - 1", sema.available_permits()); + let permit = sema.clone().acquire_owned().await?; + if let Some(entry) = read_dir.next_entry().await? { + let dentry = DirEntry { permit, entry }; + yield Ok(dentry) as Result<_>; + } else { + break; + } + } + }; + + log::trace!("Accessor::read_dir({}): Dropping Semaphore: {} + 1", path.as_ref().display(), self.available_permits()); + drop(perm); + Ok(stream) + } + + pub async fn exists(&self, p: impl AsRef<Path>) -> Result<bool> { + log::trace!("Accessor::exists({}): Acquiring Semaphore: {} - 1", p.as_ref().display(), self.available_permits()); + let perm = self.sem.acquire().await?; + let r = p.as_ref().exists(); + log::trace!("Accessor::exists({}): Dropping Semaphore: {} + 1", p.as_ref().display(), self.available_permits()); + drop(perm); + Ok(r) + } + + pub async fn is_file(&self, p: impl AsRef<Path>) -> Result<bool> { + log::trace!("Accessor::is_file({}): Acquiring Semaphore: {} - 1", p.as_ref().display(), self.available_permits()); + let perm = self.sem.acquire().await?; + let r = p.as_ref().is_file(); + log::trace!("Accessor::is_file({}): Dropping Semaphore: {} + 1", p.as_ref().display(), self.available_permits()); + drop(perm); + Ok(r) + } + + pub async fn read_to_string(&self, p: impl AsRef<Path>) -> Result<String> { + log::trace!("Accessor::read_to_string({}): Acquiring Semaphore: {} - 1", p.as_ref().display(), self.available_permits()); + let perm = self.sem.acquire().await?; + let r = tokio::fs::read_to_string(&p).await.map_err(Error::from); + log::trace!("Accessor::read_to_string({}): Dropping Semaphore: {} + 1", p.as_ref().display(), self.available_permits()); + drop(perm); + r + } +} + +#[derive(Debug)] +pub struct DirEntry { + permit: tokio::sync::OwnedSemaphorePermit, + entry: tokio::fs::DirEntry, +} + + +impl Deref for DirEntry { + type Target = tokio::fs::DirEntry; + + fn deref(&self) -> &Self::Target { + &self.entry + } +} + |