summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-01-02 14:09:12 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-01-03 10:19:49 +0100
commit51fd196bed12e91ca1720ea383efe97145abdcb6 (patch)
tree605d6101324f5f2e597711aa4f2f89ae7f402b35
parenta6ca22c73036be412dafa04265506eb9135c6fd1 (diff)
Rewrite scan function with option to fetch pathes in parallel
This does not result in parallel store accessing, but only parallel fetching pathes from the filesystem. Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--bin/domain/imag-mail/Cargo.toml1
-rw-r--r--bin/domain/imag-mail/src/lib.rs75
-rw-r--r--bin/domain/imag-mail/src/ui.rs7
3 files changed, 62 insertions, 21 deletions
diff --git a/bin/domain/imag-mail/Cargo.toml b/bin/domain/imag-mail/Cargo.toml
index 13a60a50..68e90552 100644
--- a/bin/domain/imag-mail/Cargo.toml
+++ b/bin/domain/imag-mail/Cargo.toml
@@ -25,6 +25,7 @@ failure = "0.1.5"
indoc = "0.3.3"
resiter = "0.4"
walkdir = "2"
+rayon = "1"
libimagrt = { version = "0.10.0", path = "../../../lib/core/libimagrt" }
libimagstore = { version = "0.10.0", path = "../../../lib/core/libimagstore" }
diff --git a/bin/domain/imag-mail/src/lib.rs b/bin/domain/imag-mail/src/lib.rs
index e290d7d8..0240042a 100644
--- a/bin/domain/imag-mail/src/lib.rs
+++ b/bin/domain/imag-mail/src/lib.rs
@@ -41,6 +41,7 @@ extern crate toml_query;
#[macro_use] extern crate indoc;
extern crate resiter;
extern crate walkdir;
+extern crate rayon;
extern crate libimagrt;
extern crate libimagmail;
@@ -58,14 +59,17 @@ use failure::Error;
use toml_query::read::TomlValueReadTypeExt;
use clap::App;
use resiter::AndThen;
+use resiter::Filter;
use resiter::IterInnerOkOrElse;
use resiter::Map;
+use rayon::prelude::*;
use libimagmail::mail::Mail;
use libimagmail::store::MailStore;
use libimagmail::util;
use libimagentryref::reference::{Ref, RefFassade};
use libimagentryref::util::get_ref_config;
+use libimagentryref::reference::Config as RefConfig;
use libimagrt::runtime::Runtime;
use libimagrt::application::ImagApplication;
use libimagutil::info_result::*;
@@ -119,30 +123,59 @@ fn scan(rt: &Runtime) -> Result<()> {
let collection_name = get_ref_collection_name(rt)?;
let refconfig = get_ref_config(rt, "imag-mail")?;
let scmd = rt.cli().subcommand_matches("scan").unwrap();
- let store = rt.store();
- scmd.values_of("path")
+ /// Helper function to get an Iterator for all files from one PathBuf
+ fn walk_pathes(path: PathBuf) -> impl Iterator<Item = Result<PathBuf>> {
+ walkdir::WalkDir::new(path)
+ .follow_links(false)
+ .into_iter()
+ .filter_ok(|entry| entry.file_type().is_file())
+ .map_err(Error::from)
+ .map_ok(|entry| entry.into_path())
+ .inspect(|e| trace!("Processing = {:?}", e))
+ }
+
+ /// Helper function to process an iterator over Result<PathBuf> and create store entries for
+ /// each path in the iterator
+ fn process_iter(i: &mut dyn Iterator<Item = Result<PathBuf>>, rt: &Runtime, collection_name: &str, refconfig: &RefConfig) -> Result<()> {
+ let scmd = rt.cli().subcommand_matches("scan").unwrap();
+ i.and_then_ok(|path| {
+ if scmd.is_present("ignore-existing-ids") {
+ rt.store().retrieve_mail_from_path(path, collection_name, refconfig, true)
+ } else {
+ rt.store().create_mail_from_path(path, collection_name, refconfig)
+ }
+ })
+ .and_then_ok(|e| rt.report_touched(e.get_location()).map_err(Error::from))
+ .collect::<Result<Vec<_>>>()
+ .map(|_| ())
+ }
+
+ let pathes = scmd.values_of("path")
.unwrap() // enforced by clap
.map(PathBuf::from)
- .map(|path| {
- walkdir::WalkDir::new(path)
- .follow_links(false)
- .into_iter()
- .filter_entry(|entry| entry.file_type().is_file())
- .map_ok(|entry| entry.into_path())
- .map_err(Error::from)
- })
- .flatten()
- .and_then_ok(|path| {
- if scmd.is_present("ignore_existing_ids") {
- store.retrieve_mail_from_path(path, &collection_name, &refconfig)
- } else {
- store.create_mail_from_path(path, &collection_name, &refconfig)
- }
- })
- .and_then_ok(|e| rt.report_touched(e.get_location()).map_err(Error::from))
- .collect::<Result<Vec<()>>>()
- .map(|_| ())
+ .collect::<Vec<_>>();
+
+ if scmd.is_present("scan-parallel") {
+ debug!("Fetching pathes in parallel");
+ let mut i = pathes.into_par_iter()
+ .map(|path| walk_pathes(path).collect::<Result<_>>())
+ .collect::<Result<Vec<PathBuf>>>()?
+ .into_iter()
+ .map(Ok);
+
+ debug!("Processing pathes");
+ process_iter(&mut i, rt, &collection_name, &refconfig)
+ } else {
+ debug!("Fetching pathes not in parallel");
+ let mut i = pathes
+ .into_iter()
+ .map(walk_pathes)
+ .flatten();
+
+ debug!("Processing pathes");
+ process_iter(&mut i, rt, &collection_name, &refconfig)
+ }
}
fn import_mail(rt: &Runtime) -> Result<()> {
diff --git a/bin/domain/imag-mail/src/ui.rs b/bin/domain/imag-mail/src/ui.rs
index de0cd0a7..0af3e880 100644
--- a/bin/domain/imag-mail/src/ui.rs
+++ b/bin/domain/imag-mail/src/ui.rs
@@ -57,6 +57,13 @@ pub fn build_ui<'a>(app: App<'a, 'a>) -> App<'a, 'a> {
.required(false)
.help("Ignore errors that might occur when store entries exist already"))
+ .arg(Arg::with_name("scan-parallel")
+ .long("parallel")
+ .takes_value(false)
+ .required(false)
+ .multiple(false)
+ .help("Scan with multiple threads. Might be faster, but might slow down other tasks"))
+
.arg(Arg::with_name("path")
.index(1)
.takes_value(true)