summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-08-13 17:00:11 +0200
committerMatthias Beyer <matthias.beyer@atos.net>2021-08-17 12:53:23 +0200
commitff2e2f28a44ee2016850b030383d35b7f4871389 (patch)
tree9c5ba91fbdbca381ccbb6ea9a92137df54788a8f
parent6317ccb3c5759fd7be6007ba7c08251f422cb526 (diff)
Make repository loading async
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
-rw-r--r--src/repository/repository.rs374
1 files changed, 203 insertions, 171 deletions
diff --git a/src/repository/repository.rs b/src/repository/repository.rs
index 7a3b4eb..b2827ba 100644
--- a/src/repository/repository.rs
+++ b/src/repository/repository.rs
@@ -40,189 +40,221 @@ impl From<BTreeMap<(PackageName, PackageVersion), Package>> for Repository {
impl Repository {
pub async fn load(path: &Path, progress: &indicatif::ProgressBar) -> Result<Self> {
- fn all_subdirs(p: &Path) -> Result<Vec<PathBuf>> {
- let mut v = Vec::new();
- for de in p.read_dir()? {
- let de = de?;
- let is_dir = de.file_type()?.is_dir();
- let is_hidden = de
- .path()
- .file_name()
- .and_then(|s| s.to_str())
- .map(|s| s.starts_with('.'))
- .unwrap_or(false);
-
- if is_dir && !is_hidden {
- v.push(de.path());
- }
- }
-
- Ok(v)
+ use futures::StreamExt;
+ use futures::FutureExt;
+
+ async fn all_subdirs(p: &Path) -> Result<impl futures::stream::Stream<Item = Result<PathBuf>>> {
+ let read_dir = tokio::fs::read_dir(p).await?;
+
+ let stream = tokio_stream::wrappers::ReadDirStream::new(read_dir)
+ .then(|entry| async move {
+ let de = entry?;
+ let is_dir = de.file_type().await?.is_dir();
+ let is_hidden = de
+ .path()
+ .file_name()
+ .and_then(|s| s.to_str())
+ .map(|s| s.starts_with('.'))
+ .unwrap_or(false);
+
+ if is_dir && !is_hidden {
+ Ok(Some(de.path()))
+ } else {
+ Ok(None)
+ }
+ })
+ .filter_map(|t| async {
+ match t {
+ Err(e) => Some(Err(e)),
+ Ok(None) => None,
+ Ok(Some(o)) => Some(Ok(o)),
+ }
+ });
+
+ Ok(stream)
}
- fn load_recursive(
- root: &Path,
- path: &Path,
+ fn load_recursive<'a>(
+ root: &'a Path,
+ path: &'a Path,
mut config: config::Config,
- progress: &indicatif::ProgressBar,
- ) -> Result<Vec<Result<Package>>> {
- let pkg_file = path.join("pkg.toml");
-
- if pkg_file.is_file() {
- let buf = std::fs::read_to_string(&pkg_file)
- .with_context(|| format!("Reading {}", pkg_file.display()))?;
-
- // This function has an issue: It loads packages recursively, but if there are
- // patches set for a package, these patches are set _relative_ to the current
- // pkg.toml file.
- //
- // E.G.:
- // (1) /pkg.toml
- // (2) /a/pkg.toml
- // (3) /a/1.0/pkg.toml
- // (4) /a/2.0/pkg.toml
- //
- // If (2) defines a patches = ["./1.patch"], the patch exists at /a/1.patch.
- // We can fix that by modifying the Config object after loading (2) and fixing the
- // path of the patch to be relative to the repostory root.
- //
- // But if we continue loading the /a/ subdirectory recursively, this value gets
- // overwritten by Config::refresh(), which is called by Config::merge, for example.
- //
- // The trick is, to get the list of patches _before_ the merge, and later
- // re-setting them after the merge, if there were no new patches set (which itself
- // is tricky to find out, because the `Config` object _looks like_ there is a new
- // array set).
- //
- // If (3), for example, does set a new patches=[] array, the old array is
- // invalidated and no longer relevant for that package!
- // Thus, we can savely throw it away and continue with the new array, fixing the
- // pathes to be relative to repo root again.
- //
- // If (4) does _not_ set any patches, we must ensure that the patches from the
- // loading of (2) are used and not overwritten by the Config::refresh() call
- // happening during Config::merge().
- //
-
- // first of all, we get the patches array.
- // This is either the patches array from the last recursion or the newly set one,
- // that doesn't matter here.
- let patches_before_merge = match config.get_array("patches") {
- Ok(v) => {
- v.into_iter()
- .map(|p| {
- p.into_str()
- .map(PathBuf::from)
- .with_context(|| anyhow!("patches must be strings"))
- .map_err(Error::from)
- })
- .collect::<Result<Vec<_>>>()?
- },
- Err(config::ConfigError::NotFound(_)) => vec![],
- Err(e) => return Err(e).map_err(Error::from),
- };
- trace!("Patches before merging: {:?}", patches_before_merge);
-
- // Merge the new pkg.toml file over the already loaded configuration
- config
- .merge(config::File::from_str(&buf, config::FileFormat::Toml))
- .with_context(|| format!("Loading contents of {}", pkg_file.display()))?;
-
- let path_relative_to_root = path.strip_prefix(root)?;
-
- // get the patches that are in the `config` object after the merge
- let patches = match config.get_array("patches") {
- Ok(v) => {
- trace!("Patches after merging: {:?}", v);
- v
- },
-
- // if there was none, we simply use an empty array
- // This is cheap because Vec::with_capacity(0) does not allocate
- Err(config::ConfigError::NotFound(_)) => Vec::with_capacity(0),
- Err(e) => return Err(e).map_err(Error::from),
- }
- .into_iter()
-
- // Map all `Value`s to String and then join them on the path that is relative to
- // the root directory of the repository.
- .map(|patch| patch.into_str().map_err(Error::from))
- .map_ok(|patch| path_relative_to_root.join(patch))
- .inspect(|patch| trace!("Patch relative to root: {:?}", patch.as_ref().map(|p| p.display())))
-
- // if the patch file exists, use it (as config::Value).
- //
- // Otherwise we have an error here, because we're refering to a non-existing file.
- .and_then_ok(|patch| if patch.exists() {
- trace!("Path to patch exists: {}", patch.display());
- Ok(Some(patch))
- } else if patches_before_merge.iter().any(|pb| pb.file_name() == patch.file_name()) {
- // We have a patch already in the array that is named equal to the patch
- // we have in the current recursion.
- // It seems like this patch was already in the list and we re-found it
- // because we loaded a deeper pkg.toml file.
- Ok(None)
- } else {
- trace!("Path to patch does not exist: {}", patch.display());
- Err(anyhow!("Patch does not exist: {}", patch.display()))
- })
- .filter_map_ok(|o| o)
- .collect::<Result<Vec<_>>>()?;
-
- // If we found any patches, use them. Otherwise use the array from before the merge
- // (which already has the correct pathes from the previous recursion).
- let patches = if !patches.is_empty() && patches.iter().all(|p| p.exists()) {
- patches
- } else {
- patches_before_merge
- };
+ progress: &'a indicatif::ProgressBar,
+ ) -> futures::future::BoxFuture<'a, Result<Vec<Result<Package>>>> {
+ async move {
+ let pkg_file = path.join("pkg.toml");
- trace!("Patches after postprocessing merge: {:?}", patches);
- let patches = patches
+ if pkg_file.is_file() {
+ let buf = tokio::fs::read_to_string(&pkg_file)
+ .await
+ .with_context(|| format!("Reading {}", pkg_file.display()))?;
+
+ // This function has an issue: It loads packages recursively, but if there are
+ // patches set for a package, these patches are set _relative_ to the current
+ // pkg.toml file.
+ //
+ // E.G.:
+ // (1) /pkg.toml
+ // (2) /a/pkg.toml
+ // (3) /a/1.0/pkg.toml
+ // (4) /a/2.0/pkg.toml
+ //
+ // If (2) defines a patches = ["./1.patch"], the patch exists at /a/1.patch.
+ // We can fix that by modifying the Config object after loading (2) and fixing the
+ // path of the patch to be relative to the repostory root.
+ //
+ // But if we continue loading the /a/ subdirectory recursively, this value gets
+ // overwritten by Config::refresh(), which is called by Config::merge, for example.
+ //
+ // The trick is, to get the list of patches _before_ the merge, and later
+ // re-setting them after the merge, if there were no new patches set (which itself
+ // is tricky to find out, because the `Config` object _looks like_ there is a new
+ // array set).
+ //
+ // If (3), for example, does set a new patches=[] array, the old array is
+ // invalidated and no longer relevant for that package!
+ // Thus, we can savely throw it away and continue with the new array, fixing the
+ // pathes to be relative to repo root again.
+ //
+ // If (4) does _not_ set any patches, we must ensure that the patches from the
+ // loading of (2) are used and not overwritten by the Config::refresh() call
+ // happening during Config::merge().
+ //
+
+ // first of all, we get the patches array.
+ // This is either the patches array from the last recursion or the newly set one,
+ // that doesn't matter here.
+ let patches_before_merge = match config.get_array("patches") {
+ Ok(v) => {
+ v.into_iter()
+ .map(|p| {
+ p.into_str()
+ .map(PathBuf::from)
+ .with_context(|| anyhow!("patches must be strings"))
+ .map_err(Error::from)
+ })
+ .collect::<Result<Vec<_>>>()?
+ },
+ Err(config::ConfigError::NotFound(_)) => vec![],
+ Err(e) => return Err(e).map_err(Error::from),
+ };
+ trace!("Patches before merging: {:?}", patches_before_merge);
+
+ // Merge the new pkg.toml file over the already loaded configuration
+ config
+ .merge(config::File::from_str(&buf, config::FileFormat::Toml))
+ .with_context(|| format!("Loading contents of {}", pkg_file.display()))?;
+
+ let path_relative_to_root = path.strip_prefix(root)?;
+
+ // get the patches that are in the `config` object after the merge
+ let patches = match config.get_array("patches") {
+ Ok(v) => {
+ trace!("Patches after merging: {:?}", v);
+ v
+ },
+
+ // if there was none, we simply use an empty array
+ // This is cheap because Vec::with_capacity(0) does not allocate
+ Err(config::ConfigError::NotFound(_)) => Vec::with_capacity(0),
+ Err(e) => return Err(e).map_err(Error::from),
+ }
.into_iter()
- .map(|p| p.display().to_string())
- .map(config::Value::from)
- .collect::<Vec<_>>();
- config.set_once("patches", config::Value::from(patches))?;
- }
- let subdirs = all_subdirs(path)
- .with_context(|| format!("Finding subdirs for {}", pkg_file.display()))?;
+ // Map all `Value`s to String and then join them on the path that is relative to
+ // the root directory of the repository.
+ .map(|patch| patch.into_str().map_err(Error::from))
+ .map_ok(|patch| path_relative_to_root.join(patch))
+ .inspect(|patch| trace!("Patch relative to root: {:?}", patch.as_ref().map(|p| p.display())))
+
+ // if the patch file exists, use it (as config::Value).
+ //
+ // Otherwise we have an error here, because we're refering to a non-existing file.
+ .and_then_ok(|patch| if patch.exists() {
+ trace!("Path to patch exists: {}", patch.display());
+ Ok(Some(patch))
+ } else if patches_before_merge.iter().any(|pb| pb.file_name() == patch.file_name()) {
+ // We have a patch already in the array that is named equal to the patch
+ // we have in the current recursion.
+ // It seems like this patch was already in the list and we re-found it
+ // because we loaded a deeper pkg.toml file.
+ Ok(None)
+ } else {
+ trace!("Path to patch does not exist: {}", patch.display());
+ Err(anyhow!("Patch does not exist: {}", patch.display()))
+ })
+ .filter_map_ok(|o| o)
+ .collect::<Result<Vec<_>>>()?;
+
+ // If we found any patches, use them. Otherwise use the array from before the merge
+ // (which already has the correct pathes from the previous recursion).
+ let patches = if !patches.is_empty() && patches.iter().all(|p| p.exists()) {
+ patches
+ } else {
+ patches_before_merge
+ };
+
+ trace!("Patches after postprocessing merge: {:?}", patches);
+ let patches = patches
+ .into_iter()
+ .map(|p| p.display().to_string())
+ .map(config::Value::from)
+ .collect::<Vec<_>>();
+ config.set_once("patches", config::Value::from(patches))?;
+ }
- if subdirs.is_empty() {
- progress.tick();
- if pkg_file.is_file() {
- let package = config.try_into()
- .with_context(|| format!("Failed to parse {} into package", path.display()))
- .and_then(|package: Package| {
- if package.name().is_empty() {
- Err(anyhow!("Package name cannot be empty: {}", pkg_file.display()))
- } else if package.version().is_empty() {
- Err(anyhow!("Package version cannot be empty: {}", pkg_file.display()))
- } else {
- Ok(package)
- }
- });
-
- Ok(vec![package])
+ let subdirs = all_subdirs(path)
+ .await?
+ .map(|subdir| {
+ let config = config.clone();
+
+ async {
+ let dir = subdir
+ .with_context(|| format!("Finding subdirs for {}", pkg_file.display()))?;
+
+ trace!("Recursing into {}", dir.display());
+ load_recursive(root, &dir, config, progress)
+ .await
+ .with_context(|| format!("Reading package from {}", pkg_file.display()))
+ .map_err(Error::from)
+ }
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .await
+ .collect::<Vec<Result<Vec<Result<Package>>>>>()
+ .await
+ .into_iter()
+ .collect::<Result<Vec<Vec<Result<Package>>>>>()?
+ .into_iter()
+ .flatten()
+ .collect::<Vec<Result<Package>>>();
+
+ if subdirs.is_empty() {
+ progress.tick();
+ if pkg_file.is_file() {
+ let package = config.try_into()
+ .with_context(|| format!("Failed to parse {} into package", path.display()))
+ .and_then(|package: Package| {
+ if package.name().is_empty() {
+ Err(anyhow!("Package name cannot be empty: {}", pkg_file.display()))
+ } else if package.version().is_empty() {
+ Err(anyhow!("Package version cannot be empty: {}", pkg_file.display()))
+ } else {
+ Ok(package)
+ }
+ });
+
+ Ok(vec![package])
+ } else {
+ Ok(vec![])
+ }
} else {
- Ok(vec![])
+ Ok(subdirs)
}
- } else {
- subdirs.into_iter().fold(Ok(Vec::new()), |vec, dir| {
- vec.and_then(|mut v| {
- trace!("Recursing into {}", dir.display());
- let mut loaded = load_recursive(root, &dir, config.clone(), progress)
- .with_context(|| format!("Reading package from {}", pkg_file.display()))?;
-
- v.append(&mut loaded);
- Ok(v)
- })
- })
- }
+ }.boxed()
}
let inner = load_recursive(path, path, config::Config::default(), progress)
+ .await
.with_context(|| format!("Recursing for {}", path.display()))?
.into_iter()
.inspect(|p| trace!("Loading into repository: {:?}", p))