From 51fd196bed12e91ca1720ea383efe97145abdcb6 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Thu, 2 Jan 2020 14:09:12 +0100 Subject: Rewrite scan function with option to fetch pathes in parallel This does not result in parallel store accessing, but only parallel fetching pathes from the filesystem. Signed-off-by: Matthias Beyer --- bin/domain/imag-mail/Cargo.toml | 1 + bin/domain/imag-mail/src/lib.rs | 75 +++++++++++++++++++++++++++++------------ bin/domain/imag-mail/src/ui.rs | 7 ++++ 3 files changed, 62 insertions(+), 21 deletions(-) diff --git a/bin/domain/imag-mail/Cargo.toml b/bin/domain/imag-mail/Cargo.toml index 13a60a50..68e90552 100644 --- a/bin/domain/imag-mail/Cargo.toml +++ b/bin/domain/imag-mail/Cargo.toml @@ -25,6 +25,7 @@ failure = "0.1.5" indoc = "0.3.3" resiter = "0.4" walkdir = "2" +rayon = "1" libimagrt = { version = "0.10.0", path = "../../../lib/core/libimagrt" } libimagstore = { version = "0.10.0", path = "../../../lib/core/libimagstore" } diff --git a/bin/domain/imag-mail/src/lib.rs b/bin/domain/imag-mail/src/lib.rs index e290d7d8..0240042a 100644 --- a/bin/domain/imag-mail/src/lib.rs +++ b/bin/domain/imag-mail/src/lib.rs @@ -41,6 +41,7 @@ extern crate toml_query; #[macro_use] extern crate indoc; extern crate resiter; extern crate walkdir; +extern crate rayon; extern crate libimagrt; extern crate libimagmail; @@ -58,14 +59,17 @@ use failure::Error; use toml_query::read::TomlValueReadTypeExt; use clap::App; use resiter::AndThen; +use resiter::Filter; use resiter::IterInnerOkOrElse; use resiter::Map; +use rayon::prelude::*; use libimagmail::mail::Mail; use libimagmail::store::MailStore; use libimagmail::util; use libimagentryref::reference::{Ref, RefFassade}; use libimagentryref::util::get_ref_config; +use libimagentryref::reference::Config as RefConfig; use libimagrt::runtime::Runtime; use libimagrt::application::ImagApplication; use libimagutil::info_result::*; @@ -119,30 +123,59 @@ fn scan(rt: &Runtime) -> Result<()> { let collection_name = get_ref_collection_name(rt)?; let refconfig = get_ref_config(rt, "imag-mail")?; let scmd = rt.cli().subcommand_matches("scan").unwrap(); - let store = rt.store(); - scmd.values_of("path") + /// Helper function to get an Iterator for all files from one PathBuf + fn walk_pathes(path: PathBuf) -> impl Iterator> { + walkdir::WalkDir::new(path) + .follow_links(false) + .into_iter() + .filter_ok(|entry| entry.file_type().is_file()) + .map_err(Error::from) + .map_ok(|entry| entry.into_path()) + .inspect(|e| trace!("Processing = {:?}", e)) + } + + /// Helper function to process an iterator over Result and create store entries for + /// each path in the iterator + fn process_iter(i: &mut dyn Iterator>, rt: &Runtime, collection_name: &str, refconfig: &RefConfig) -> Result<()> { + let scmd = rt.cli().subcommand_matches("scan").unwrap(); + i.and_then_ok(|path| { + if scmd.is_present("ignore-existing-ids") { + rt.store().retrieve_mail_from_path(path, collection_name, refconfig, true) + } else { + rt.store().create_mail_from_path(path, collection_name, refconfig) + } + }) + .and_then_ok(|e| rt.report_touched(e.get_location()).map_err(Error::from)) + .collect::>>() + .map(|_| ()) + } + + let pathes = scmd.values_of("path") .unwrap() // enforced by clap .map(PathBuf::from) - .map(|path| { - walkdir::WalkDir::new(path) - .follow_links(false) - .into_iter() - .filter_entry(|entry| entry.file_type().is_file()) - .map_ok(|entry| entry.into_path()) - .map_err(Error::from) - }) - .flatten() - .and_then_ok(|path| { - if scmd.is_present("ignore_existing_ids") { - store.retrieve_mail_from_path(path, &collection_name, &refconfig) - } else { - store.create_mail_from_path(path, &collection_name, &refconfig) - } - }) - .and_then_ok(|e| rt.report_touched(e.get_location()).map_err(Error::from)) - .collect::>>() - .map(|_| ()) + .collect::>(); + + if scmd.is_present("scan-parallel") { + debug!("Fetching pathes in parallel"); + let mut i = pathes.into_par_iter() + .map(|path| walk_pathes(path).collect::>()) + .collect::>>()? + .into_iter() + .map(Ok); + + debug!("Processing pathes"); + process_iter(&mut i, rt, &collection_name, &refconfig) + } else { + debug!("Fetching pathes not in parallel"); + let mut i = pathes + .into_iter() + .map(walk_pathes) + .flatten(); + + debug!("Processing pathes"); + process_iter(&mut i, rt, &collection_name, &refconfig) + } } fn import_mail(rt: &Runtime) -> Result<()> { diff --git a/bin/domain/imag-mail/src/ui.rs b/bin/domain/imag-mail/src/ui.rs index de0cd0a7..0af3e880 100644 --- a/bin/domain/imag-mail/src/ui.rs +++ b/bin/domain/imag-mail/src/ui.rs @@ -57,6 +57,13 @@ pub fn build_ui<'a>(app: App<'a, 'a>) -> App<'a, 'a> { .required(false) .help("Ignore errors that might occur when store entries exist already")) + .arg(Arg::with_name("scan-parallel") + .long("parallel") + .takes_value(false) + .required(false) + .multiple(false) + .help("Scan with multiple threads. Might be faster, but might slow down other tasks")) + .arg(Arg::with_name("path") .index(1) .takes_value(true) -- cgit v1.2.3