diff options
author | Matthias Beyer <matthias.beyer@atos.net> | 2021-08-13 17:00:11 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@atos.net> | 2021-08-17 12:53:23 +0200 |
commit | ff2e2f28a44ee2016850b030383d35b7f4871389 (patch) | |
tree | 9c5ba91fbdbca381ccbb6ea9a92137df54788a8f | |
parent | 6317ccb3c5759fd7be6007ba7c08251f422cb526 (diff) |
Make repository loading async
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
-rw-r--r-- | src/repository/repository.rs | 374 |
1 files changed, 203 insertions, 171 deletions
diff --git a/src/repository/repository.rs b/src/repository/repository.rs index 7a3b4eb..b2827ba 100644 --- a/src/repository/repository.rs +++ b/src/repository/repository.rs @@ -40,189 +40,221 @@ impl From<BTreeMap<(PackageName, PackageVersion), Package>> for Repository { impl Repository { pub async fn load(path: &Path, progress: &indicatif::ProgressBar) -> Result<Self> { - fn all_subdirs(p: &Path) -> Result<Vec<PathBuf>> { - let mut v = Vec::new(); - for de in p.read_dir()? { - let de = de?; - let is_dir = de.file_type()?.is_dir(); - let is_hidden = de - .path() - .file_name() - .and_then(|s| s.to_str()) - .map(|s| s.starts_with('.')) - .unwrap_or(false); - - if is_dir && !is_hidden { - v.push(de.path()); - } - } - - Ok(v) + use futures::StreamExt; + use futures::FutureExt; + + async fn all_subdirs(p: &Path) -> Result<impl futures::stream::Stream<Item = Result<PathBuf>>> { + let read_dir = tokio::fs::read_dir(p).await?; + + let stream = tokio_stream::wrappers::ReadDirStream::new(read_dir) + .then(|entry| async move { + let de = entry?; + let is_dir = de.file_type().await?.is_dir(); + let is_hidden = de + .path() + .file_name() + .and_then(|s| s.to_str()) + .map(|s| s.starts_with('.')) + .unwrap_or(false); + + if is_dir && !is_hidden { + Ok(Some(de.path())) + } else { + Ok(None) + } + }) + .filter_map(|t| async { + match t { + Err(e) => Some(Err(e)), + Ok(None) => None, + Ok(Some(o)) => Some(Ok(o)), + } + }); + + Ok(stream) } - fn load_recursive( - root: &Path, - path: &Path, + fn load_recursive<'a>( + root: &'a Path, + path: &'a Path, mut config: config::Config, - progress: &indicatif::ProgressBar, - ) -> Result<Vec<Result<Package>>> { - let pkg_file = path.join("pkg.toml"); - - if pkg_file.is_file() { - let buf = std::fs::read_to_string(&pkg_file) - .with_context(|| format!("Reading {}", pkg_file.display()))?; - - // This function has an issue: It loads packages recursively, but if there are - // patches set for a package, these patches are set _relative_ to the current - // pkg.toml file. - // - // E.G.: - // (1) /pkg.toml - // (2) /a/pkg.toml - // (3) /a/1.0/pkg.toml - // (4) /a/2.0/pkg.toml - // - // If (2) defines a patches = ["./1.patch"], the patch exists at /a/1.patch. - // We can fix that by modifying the Config object after loading (2) and fixing the - // path of the patch to be relative to the repostory root. - // - // But if we continue loading the /a/ subdirectory recursively, this value gets - // overwritten by Config::refresh(), which is called by Config::merge, for example. - // - // The trick is, to get the list of patches _before_ the merge, and later - // re-setting them after the merge, if there were no new patches set (which itself - // is tricky to find out, because the `Config` object _looks like_ there is a new - // array set). - // - // If (3), for example, does set a new patches=[] array, the old array is - // invalidated and no longer relevant for that package! - // Thus, we can savely throw it away and continue with the new array, fixing the - // pathes to be relative to repo root again. - // - // If (4) does _not_ set any patches, we must ensure that the patches from the - // loading of (2) are used and not overwritten by the Config::refresh() call - // happening during Config::merge(). - // - - // first of all, we get the patches array. - // This is either the patches array from the last recursion or the newly set one, - // that doesn't matter here. - let patches_before_merge = match config.get_array("patches") { - Ok(v) => { - v.into_iter() - .map(|p| { - p.into_str() - .map(PathBuf::from) - .with_context(|| anyhow!("patches must be strings")) - .map_err(Error::from) - }) - .collect::<Result<Vec<_>>>()? - }, - Err(config::ConfigError::NotFound(_)) => vec![], - Err(e) => return Err(e).map_err(Error::from), - }; - trace!("Patches before merging: {:?}", patches_before_merge); - - // Merge the new pkg.toml file over the already loaded configuration - config - .merge(config::File::from_str(&buf, config::FileFormat::Toml)) - .with_context(|| format!("Loading contents of {}", pkg_file.display()))?; - - let path_relative_to_root = path.strip_prefix(root)?; - - // get the patches that are in the `config` object after the merge - let patches = match config.get_array("patches") { - Ok(v) => { - trace!("Patches after merging: {:?}", v); - v - }, - - // if there was none, we simply use an empty array - // This is cheap because Vec::with_capacity(0) does not allocate - Err(config::ConfigError::NotFound(_)) => Vec::with_capacity(0), - Err(e) => return Err(e).map_err(Error::from), - } - .into_iter() - - // Map all `Value`s to String and then join them on the path that is relative to - // the root directory of the repository. - .map(|patch| patch.into_str().map_err(Error::from)) - .map_ok(|patch| path_relative_to_root.join(patch)) - .inspect(|patch| trace!("Patch relative to root: {:?}", patch.as_ref().map(|p| p.display()))) - - // if the patch file exists, use it (as config::Value). - // - // Otherwise we have an error here, because we're refering to a non-existing file. - .and_then_ok(|patch| if patch.exists() { - trace!("Path to patch exists: {}", patch.display()); - Ok(Some(patch)) - } else if patches_before_merge.iter().any(|pb| pb.file_name() == patch.file_name()) { - // We have a patch already in the array that is named equal to the patch - // we have in the current recursion. - // It seems like this patch was already in the list and we re-found it - // because we loaded a deeper pkg.toml file. - Ok(None) - } else { - trace!("Path to patch does not exist: {}", patch.display()); - Err(anyhow!("Patch does not exist: {}", patch.display())) - }) - .filter_map_ok(|o| o) - .collect::<Result<Vec<_>>>()?; - - // If we found any patches, use them. Otherwise use the array from before the merge - // (which already has the correct pathes from the previous recursion). - let patches = if !patches.is_empty() && patches.iter().all(|p| p.exists()) { - patches - } else { - patches_before_merge - }; + progress: &'a indicatif::ProgressBar, + ) -> futures::future::BoxFuture<'a, Result<Vec<Result<Package>>>> { + async move { + let pkg_file = path.join("pkg.toml"); - trace!("Patches after postprocessing merge: {:?}", patches); - let patches = patches + if pkg_file.is_file() { + let buf = tokio::fs::read_to_string(&pkg_file) + .await + .with_context(|| format!("Reading {}", pkg_file.display()))?; + + // This function has an issue: It loads packages recursively, but if there are + // patches set for a package, these patches are set _relative_ to the current + // pkg.toml file. + // + // E.G.: + // (1) /pkg.toml + // (2) /a/pkg.toml + // (3) /a/1.0/pkg.toml + // (4) /a/2.0/pkg.toml + // + // If (2) defines a patches = ["./1.patch"], the patch exists at /a/1.patch. + // We can fix that by modifying the Config object after loading (2) and fixing the + // path of the patch to be relative to the repostory root. + // + // But if we continue loading the /a/ subdirectory recursively, this value gets + // overwritten by Config::refresh(), which is called by Config::merge, for example. + // + // The trick is, to get the list of patches _before_ the merge, and later + // re-setting them after the merge, if there were no new patches set (which itself + // is tricky to find out, because the `Config` object _looks like_ there is a new + // array set). + // + // If (3), for example, does set a new patches=[] array, the old array is + // invalidated and no longer relevant for that package! + // Thus, we can savely throw it away and continue with the new array, fixing the + // pathes to be relative to repo root again. + // + // If (4) does _not_ set any patches, we must ensure that the patches from the + // loading of (2) are used and not overwritten by the Config::refresh() call + // happening during Config::merge(). + // + + // first of all, we get the patches array. + // This is either the patches array from the last recursion or the newly set one, + // that doesn't matter here. + let patches_before_merge = match config.get_array("patches") { + Ok(v) => { + v.into_iter() + .map(|p| { + p.into_str() + .map(PathBuf::from) + .with_context(|| anyhow!("patches must be strings")) + .map_err(Error::from) + }) + .collect::<Result<Vec<_>>>()? + }, + Err(config::ConfigError::NotFound(_)) => vec![], + Err(e) => return Err(e).map_err(Error::from), + }; + trace!("Patches before merging: {:?}", patches_before_merge); + + // Merge the new pkg.toml file over the already loaded configuration + config + .merge(config::File::from_str(&buf, config::FileFormat::Toml)) + .with_context(|| format!("Loading contents of {}", pkg_file.display()))?; + + let path_relative_to_root = path.strip_prefix(root)?; + + // get the patches that are in the `config` object after the merge + let patches = match config.get_array("patches") { + Ok(v) => { + trace!("Patches after merging: {:?}", v); + v + }, + + // if there was none, we simply use an empty array + // This is cheap because Vec::with_capacity(0) does not allocate + Err(config::ConfigError::NotFound(_)) => Vec::with_capacity(0), + Err(e) => return Err(e).map_err(Error::from), + } .into_iter() - .map(|p| p.display().to_string()) - .map(config::Value::from) - .collect::<Vec<_>>(); - config.set_once("patches", config::Value::from(patches))?; - } - let subdirs = all_subdirs(path) - .with_context(|| format!("Finding subdirs for {}", pkg_file.display()))?; + // Map all `Value`s to String and then join them on the path that is relative to + // the root directory of the repository. + .map(|patch| patch.into_str().map_err(Error::from)) + .map_ok(|patch| path_relative_to_root.join(patch)) + .inspect(|patch| trace!("Patch relative to root: {:?}", patch.as_ref().map(|p| p.display()))) + + // if the patch file exists, use it (as config::Value). + // + // Otherwise we have an error here, because we're refering to a non-existing file. + .and_then_ok(|patch| if patch.exists() { + trace!("Path to patch exists: {}", patch.display()); + Ok(Some(patch)) + } else if patches_before_merge.iter().any(|pb| pb.file_name() == patch.file_name()) { + // We have a patch already in the array that is named equal to the patch + // we have in the current recursion. + // It seems like this patch was already in the list and we re-found it + // because we loaded a deeper pkg.toml file. + Ok(None) + } else { + trace!("Path to patch does not exist: {}", patch.display()); + Err(anyhow!("Patch does not exist: {}", patch.display())) + }) + .filter_map_ok(|o| o) + .collect::<Result<Vec<_>>>()?; + + // If we found any patches, use them. Otherwise use the array from before the merge + // (which already has the correct pathes from the previous recursion). + let patches = if !patches.is_empty() && patches.iter().all(|p| p.exists()) { + patches + } else { + patches_before_merge + }; + + trace!("Patches after postprocessing merge: {:?}", patches); + let patches = patches + .into_iter() + .map(|p| p.display().to_string()) + .map(config::Value::from) + .collect::<Vec<_>>(); + config.set_once("patches", config::Value::from(patches))?; + } - if subdirs.is_empty() { - progress.tick(); - if pkg_file.is_file() { - let package = config.try_into() - .with_context(|| format!("Failed to parse {} into package", path.display())) - .and_then(|package: Package| { - if package.name().is_empty() { - Err(anyhow!("Package name cannot be empty: {}", pkg_file.display())) - } else if package.version().is_empty() { - Err(anyhow!("Package version cannot be empty: {}", pkg_file.display())) - } else { - Ok(package) - } - }); - - Ok(vec![package]) + let subdirs = all_subdirs(path) + .await? + .map(|subdir| { + let config = config.clone(); + + async { + let dir = subdir + .with_context(|| format!("Finding subdirs for {}", pkg_file.display()))?; + + trace!("Recursing into {}", dir.display()); + load_recursive(root, &dir, config, progress) + .await + .with_context(|| format!("Reading package from {}", pkg_file.display())) + .map_err(Error::from) + } + }) + .collect::<futures::stream::FuturesUnordered<_>>() + .await + .collect::<Vec<Result<Vec<Result<Package>>>>>() + .await + .into_iter() + .collect::<Result<Vec<Vec<Result<Package>>>>>()? + .into_iter() + .flatten() + .collect::<Vec<Result<Package>>>(); + + if subdirs.is_empty() { + progress.tick(); + if pkg_file.is_file() { + let package = config.try_into() + .with_context(|| format!("Failed to parse {} into package", path.display())) + .and_then(|package: Package| { + if package.name().is_empty() { + Err(anyhow!("Package name cannot be empty: {}", pkg_file.display())) + } else if package.version().is_empty() { + Err(anyhow!("Package version cannot be empty: {}", pkg_file.display())) + } else { + Ok(package) + } + }); + + Ok(vec![package]) + } else { + Ok(vec![]) + } } else { - Ok(vec![]) + Ok(subdirs) } - } else { - subdirs.into_iter().fold(Ok(Vec::new()), |vec, dir| { - vec.and_then(|mut v| { - trace!("Recursing into {}", dir.display()); - let mut loaded = load_recursive(root, &dir, config.clone(), progress) - .with_context(|| format!("Reading package from {}", pkg_file.display()))?; - - v.append(&mut loaded); - Ok(v) - }) - }) - } + }.boxed() } let inner = load_recursive(path, path, config::Config::default(), progress) + .await .with_context(|| format!("Recursing for {}", path.display()))? .into_iter() .inspect(|p| trace!("Loading into repository: {:?}", p)) |