summaryrefslogtreecommitdiffstats
path: root/src/repository/fs.rs
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-08-18 10:01:29 +0200
committerMatthias Beyer <matthias.beyer@atos.net>2021-08-18 10:01:29 +0200
commita02df5bda0fd490c34b2dd80f2a2e85b59f08cb8 (patch)
tree0252a3a7d92dc5059b3d491bdd6f5461807e9301 /src/repository/fs.rs
parent6edb7878fb4a490fbb1fd37a09af1fcf907eeecf (diff)
Introduce proxy object for limiting filesystem access with semaphoreparallel-repository-loading
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/repository/fs.rs')
-rw-r--r--src/repository/fs.rs93
1 files changed, 93 insertions, 0 deletions
diff --git a/src/repository/fs.rs b/src/repository/fs.rs
new file mode 100644
index 0000000..435e2be
--- /dev/null
+++ b/src/repository/fs.rs
@@ -0,0 +1,93 @@
+use std::ops::Deref;
+use std::path::Path;
+use std::sync::Arc;
+
+use anyhow::Error;
+use anyhow::Result;
+use tokio_stream::Stream;
+
+
+/// Helper type to limit access to the filesystem in massive-parallel cases where we would run into
+/// system error 24 ("too many open files").
+#[derive(Clone)]
+pub struct FileSystemAccessor {
+ sem: Arc<tokio::sync::Semaphore>,
+}
+
+impl FileSystemAccessor {
+ pub fn new(limit: usize) -> Self {
+ FileSystemAccessor { sem: Arc::new(tokio::sync::Semaphore::new(limit)) }
+ }
+
+ #[inline]
+ pub fn available_permits(&self) -> usize {
+ self.sem.available_permits()
+ }
+
+ pub async fn read_dir(&self, path: impl AsRef<Path>) -> Result<impl Stream<Item = Result<DirEntry>>> {
+ log::trace!("Accessor::read_dir({}): Acquiring Semaphore: {} - 1", path.as_ref().display(), self.available_permits());
+ let perm = self.sem.acquire().await?;
+ let sema = self.sem.clone();
+ let mut read_dir = tokio::fs::read_dir(&path).await?;
+
+ let stream = async_stream::stream! {
+ loop {
+ log::trace!("Accessor::read_dir(_): Acquiring Semaphore: {} - 1", sema.available_permits());
+ let permit = sema.clone().acquire_owned().await?;
+ if let Some(entry) = read_dir.next_entry().await? {
+ let dentry = DirEntry { permit, entry };
+ yield Ok(dentry) as Result<_>;
+ } else {
+ break;
+ }
+ }
+ };
+
+ log::trace!("Accessor::read_dir({}): Dropping Semaphore: {} + 1", path.as_ref().display(), self.available_permits());
+ drop(perm);
+ Ok(stream)
+ }
+
+ pub async fn exists(&self, p: impl AsRef<Path>) -> Result<bool> {
+ log::trace!("Accessor::exists({}): Acquiring Semaphore: {} - 1", p.as_ref().display(), self.available_permits());
+ let perm = self.sem.acquire().await?;
+ let r = p.as_ref().exists();
+ log::trace!("Accessor::exists({}): Dropping Semaphore: {} + 1", p.as_ref().display(), self.available_permits());
+ drop(perm);
+ Ok(r)
+ }
+
+ pub async fn is_file(&self, p: impl AsRef<Path>) -> Result<bool> {
+ log::trace!("Accessor::is_file({}): Acquiring Semaphore: {} - 1", p.as_ref().display(), self.available_permits());
+ let perm = self.sem.acquire().await?;
+ let r = p.as_ref().is_file();
+ log::trace!("Accessor::is_file({}): Dropping Semaphore: {} + 1", p.as_ref().display(), self.available_permits());
+ drop(perm);
+ Ok(r)
+ }
+
+ pub async fn read_to_string(&self, p: impl AsRef<Path>) -> Result<String> {
+ log::trace!("Accessor::read_to_string({}): Acquiring Semaphore: {} - 1", p.as_ref().display(), self.available_permits());
+ let perm = self.sem.acquire().await?;
+ let r = tokio::fs::read_to_string(&p).await.map_err(Error::from);
+ log::trace!("Accessor::read_to_string({}): Dropping Semaphore: {} + 1", p.as_ref().display(), self.available_permits());
+ drop(perm);
+ r
+ }
+}
+
+#[derive(Debug)]
+pub struct DirEntry {
+ permit: tokio::sync::OwnedSemaphorePermit,
+ entry: tokio::fs::DirEntry,
+}
+
+
+impl Deref for DirEntry {
+ type Target = tokio::fs::DirEntry;
+
+ fn deref(&self) -> &Self::Target {
+ &self.entry
+ }
+}
+