summaryrefslogtreecommitdiffstats
path: root/src/main.rs
diff options
context:
space:
mode:
authorAndrew Gallant <jamslam@gmail.com>2016-11-05 21:44:15 -0400
committerAndrew Gallant <jamslam@gmail.com>2016-11-05 21:45:55 -0400
commitb272be25fa7b616689384d2e4bec3e01715e2477 (patch)
tree8743d1ec86294f81da800c024f094a4846158527 /src/main.rs
parent1aeae3e22da4e9ffa78ece485369e7a24744b2d4 (diff)
Add parallel recursive directory iterator.
This adds a new walk type in the `ignore` crate, `WalkParallel`, which provides a way for recursively iterating over a set of paths in parallel while respecting various ignore rules. The API is a bit strange, as a closure producing a closure isn't something one often sees, but it does seem to work well. This also allowed us to simplify much of the worker logic in ripgrep proper, where MultiWorker is now gone.
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs407
1 files changed, 166 insertions, 241 deletions
diff --git a/src/main.rs b/src/main.rs
index 71c4da32..276ee059 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,5 +1,4 @@
extern crate ctrlc;
-extern crate deque;
extern crate docopt;
extern crate env_logger;
extern crate grep;
@@ -21,30 +20,20 @@ extern crate term;
extern crate winapi;
use std::error::Error;
-use std::fs::File;
use std::io;
use std::io::Write;
-use std::path::Path;
use std::process;
use std::result;
use std::sync::{Arc, Mutex};
-use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use std::sync::mpsc;
use std::thread;
use std::cmp;
-use deque::{Stealer, Stolen};
-use grep::Grep;
-use memmap::{Mmap, Protection};
use term::Terminal;
-use ignore::DirEntry;
use args::Args;
-use out::{ColoredTerminal, Out};
-use pathutil::strip_prefix;
-use printer::Printer;
-use search_stream::InputBuffer;
-#[cfg(windows)]
-use terminal_win::WindowsBuffer;
+use worker::Work;
macro_rules! errored {
($($tt:tt)*) => {
@@ -68,11 +57,12 @@ mod search_buffer;
mod search_stream;
#[cfg(windows)]
mod terminal_win;
+mod worker;
pub type Result<T> = result::Result<T, Box<Error + Send + Sync>>;
fn main() {
- match Args::parse().and_then(run) {
+ match Args::parse().map(Arc::new).and_then(run) {
Ok(count) if count == 0 => process::exit(1),
Ok(_) => process::exit(0),
Err(err) => {
@@ -82,95 +72,108 @@ fn main() {
}
}
-fn run(args: Args) -> Result<u64> {
- let args = Arc::new(args);
-
- let handler_args = args.clone();
- ctrlc::set_handler(move || {
- let stdout = io::stdout();
- let mut stdout = stdout.lock();
+fn run(args: Arc<Args>) -> Result<u64> {
+ {
+ let args = args.clone();
+ ctrlc::set_handler(move || {
+ let stdout = io::stdout();
+ let mut stdout = stdout.lock();
- let _ = handler_args.stdout().reset();
- let _ = stdout.flush();
+ let _ = args.stdout().reset();
+ let _ = stdout.flush();
- process::exit(1);
- });
-
- let paths = args.paths();
+ process::exit(1);
+ });
+ }
let threads = cmp::max(1, args.threads() - 1);
- let isone =
- paths.len() == 1 && (paths[0] == Path::new("-") || paths[0].is_file());
if args.files() {
- return run_files(args.clone());
- }
- if args.type_list() {
- return run_types(args.clone());
- }
- if threads == 1 || isone {
- return run_one_thread(args.clone());
+ if threads == 1 || args.is_one_path() {
+ run_files_one_thread(args)
+ } else {
+ run_files_parallel(args)
+ }
+ } else if args.type_list() {
+ run_types(args)
+ } else if threads == 1 || args.is_one_path() {
+ run_one_thread(args)
+ } else {
+ run_parallel(args)
}
+}
+
+fn run_parallel(args: Arc<Args>) -> Result<u64> {
let out = Arc::new(Mutex::new(args.out()));
let quiet_matched = QuietMatched::new(args.quiet());
- let mut workers = vec![];
+ let paths_searched = Arc::new(AtomicUsize::new(0));
+ let match_count = Arc::new(AtomicUsize::new(0));
+
+ args.walker_parallel().run(|| {
+ let args = args.clone();
+ let quiet_matched = quiet_matched.clone();
+ let paths_searched = paths_searched.clone();
+ let match_count = match_count.clone();
+ let out = out.clone();
+ let mut outbuf = args.outbuf();
+ let mut worker = args.worker();
+ Box::new(move |result| {
+ use ignore::WalkState::*;
- let workq = {
- let (workq, stealer) = deque::new();
- for _ in 0..threads {
- let worker = MultiWorker {
- chan_work: stealer.clone(),
- quiet_matched: quiet_matched.clone(),
- out: out.clone(),
- outbuf: Some(args.outbuf()),
- worker: Worker {
- args: args.clone(),
- inpbuf: args.input_buffer(),
- grep: args.grep(),
- match_count: 0,
- },
+ if quiet_matched.has_match() {
+ return Quit;
+ }
+ let dent = match get_or_log_dir_entry(result) {
+ None => return Continue,
+ Some(dent) => dent,
};
- workers.push(thread::spawn(move || worker.run()));
- }
- workq
- };
- let mut paths_searched: u64 = 0;
- for dent in args.walker() {
- if quiet_matched.has_match() {
- break;
- }
- paths_searched += 1;
- if dent.is_stdin() {
- workq.push(Work::Stdin);
- } else {
- workq.push(Work::File(dent));
- }
- }
- if !paths.is_empty() && paths_searched == 0 {
- eprintln!("No files were searched, which means ripgrep probably \
- applied a filter you didn't expect. \
- Try running again with --debug.");
- }
- for _ in 0..workers.len() {
- workq.push(Work::Quit);
- }
- let mut match_count = 0;
- for worker in workers {
- match_count += worker.join().unwrap();
+ paths_searched.fetch_add(1, Ordering::SeqCst);
+ outbuf.clear();
+ {
+ // This block actually executes the search and prints the
+ // results into outbuf.
+ let mut printer = args.printer(&mut outbuf);
+ let count =
+ if dent.is_stdin() {
+ worker.run(&mut printer, Work::Stdin)
+ } else {
+ worker.run(&mut printer, Work::DirEntry(dent))
+ };
+ match_count.fetch_add(count as usize, Ordering::SeqCst);
+ if quiet_matched.set_match(count > 0) {
+ return Quit;
+ }
+ }
+ if !outbuf.get_ref().is_empty() {
+ // This should be the only mutex in all of ripgrep. Since the
+ // common case is to report a small number of matches relative
+ // to the corpus, this really shouldn't matter much.
+ //
+ // Still, it'd be nice to send this on a channel, but then we'd
+ // need to manage a pool of outbufs, which would complicate the
+ // code.
+ let mut out = out.lock().unwrap();
+ out.write(&outbuf);
+ }
+ Continue
+ })
+ });
+ if !args.paths().is_empty() && paths_searched.load(Ordering::SeqCst) == 0 {
+ eprint_nothing_searched();
}
- Ok(match_count)
+ Ok(match_count.load(Ordering::SeqCst) as u64)
}
fn run_one_thread(args: Arc<Args>) -> Result<u64> {
- let mut worker = Worker {
- args: args.clone(),
- inpbuf: args.input_buffer(),
- grep: args.grep(),
- match_count: 0,
- };
+ let mut worker = args.worker();
let mut term = args.stdout();
let mut paths_searched: u64 = 0;
- for dent in args.walker() {
+ let mut match_count = 0;
+ for result in args.walker() {
+ let dent = match get_or_log_dir_entry(result) {
+ None => continue,
+ Some(dent) => dent,
+ };
let mut printer = args.printer(&mut term);
- if worker.match_count > 0 {
+ if match_count > 0 {
if args.quiet() {
break;
}
@@ -179,32 +182,53 @@ fn run_one_thread(args: Arc<Args>) -> Result<u64> {
}
}
paths_searched += 1;
- if dent.is_stdin() {
- worker.do_work(&mut printer, WorkReady::Stdin);
- } else {
- let file = match File::open(dent.path()) {
- Ok(file) => file,
- Err(err) => {
- eprintln!("{}: {}", dent.path().display(), err);
- continue;
- }
+ match_count +=
+ if dent.is_stdin() {
+ worker.run(&mut printer, Work::Stdin)
+ } else {
+ worker.run(&mut printer, Work::DirEntry(dent))
};
- worker.do_work(&mut printer, WorkReady::DirFile(dent, file));
- }
}
if !args.paths().is_empty() && paths_searched == 0 {
- eprintln!("No files were searched, which means ripgrep probably \
- applied a filter you didn't expect. \
- Try running again with --debug.");
+ eprint_nothing_searched();
}
- Ok(worker.match_count)
+ Ok(match_count)
+}
+
+fn run_files_parallel(args: Arc<Args>) -> Result<u64> {
+ let print_args = args.clone();
+ let (tx, rx) = mpsc::channel::<ignore::DirEntry>();
+ let print_thread = thread::spawn(move || {
+ let term = print_args.stdout();
+ let mut printer = print_args.printer(term);
+ let mut file_count = 0;
+ for dent in rx.iter() {
+ printer.path(dent.path());
+ file_count += 1;
+ }
+ file_count
+ });
+ args.walker_parallel().run(move || {
+ let tx = tx.clone();
+ Box::new(move |result| {
+ if let Some(dent) = get_or_log_dir_entry(result) {
+ tx.send(dent).unwrap();
+ }
+ ignore::WalkState::Continue
+ })
+ });
+ Ok(print_thread.join().unwrap())
}
-fn run_files(args: Arc<Args>) -> Result<u64> {
+fn run_files_one_thread(args: Arc<Args>) -> Result<u64> {
let term = args.stdout();
let mut printer = args.printer(term);
let mut file_count = 0;
- for dent in args.walker() {
+ for result in args.walker() {
+ let dent = match get_or_log_dir_entry(result) {
+ None => continue,
+ Some(dent) => dent,
+ };
printer.path(dent.path());
file_count += 1;
}
@@ -222,163 +246,64 @@ fn run_types(args: Arc<Args>) -> Result<u64> {
Ok(ty_count)
}
-enum Work {
- Stdin,
- File(DirEntry),
- Quit,
-}
-
-enum WorkReady {
- Stdin,
- DirFile(DirEntry, File),
-}
-
-struct MultiWorker {
- chan_work: Stealer<Work>,
- quiet_matched: QuietMatched,
- out: Arc<Mutex<Out>>,
- #[cfg(not(windows))]
- outbuf: Option<ColoredTerminal<term::TerminfoTerminal<Vec<u8>>>>,
- #[cfg(windows)]
- outbuf: Option<ColoredTerminal<WindowsBuffer>>,
- worker: Worker,
-}
-
-struct Worker {
- args: Arc<Args>,
- inpbuf: InputBuffer,
- grep: Grep,
- match_count: u64,
-}
-
-impl MultiWorker {
- fn run(mut self) -> u64 {
- loop {
- if self.quiet_matched.has_match() {
- break;
- }
- let work = match self.chan_work.steal() {
- Stolen::Empty | Stolen::Abort => continue,
- Stolen::Data(Work::Quit) => break,
- Stolen::Data(Work::Stdin) => WorkReady::Stdin,
- Stolen::Data(Work::File(ent)) => {
- match File::open(ent.path()) {
- Ok(file) => WorkReady::DirFile(ent, file),
- Err(err) => {
- eprintln!("{}: {}", ent.path().display(), err);
- continue;
- }
- }
- }
- };
- let mut outbuf = self.outbuf.take().unwrap();
- outbuf.clear();
- let mut printer = self.worker.args.printer(outbuf);
- self.worker.do_work(&mut printer, work);
- if self.quiet_matched.set_match(self.worker.match_count > 0) {
- break;
- }
- let outbuf = printer.into_inner();
- if !outbuf.get_ref().is_empty() {
- let mut out = self.out.lock().unwrap();
- out.write(&outbuf);
- }
- self.outbuf = Some(outbuf);
+fn get_or_log_dir_entry(
+ result: result::Result<ignore::DirEntry, ignore::Error>,
+) -> Option<ignore::DirEntry> {
+ match result {
+ Err(err) => {
+ eprintln!("{}", err);
+ None
}
- self.worker.match_count
- }
-}
-
-impl Worker {
- fn do_work<W: Terminal + Send>(
- &mut self,
- printer: &mut Printer<W>,
- work: WorkReady,
- ) {
- let result = match work {
- WorkReady::Stdin => {
- let stdin = io::stdin();
- let stdin = stdin.lock();
- self.search(printer, &Path::new("<stdin>"), stdin)
- }
- WorkReady::DirFile(ent, file) => {
- let mut path = ent.path();
- if let Some(p) = strip_prefix("./", path) {
- path = p;
- }
- if self.args.mmap() {
- self.search_mmap(printer, path, &file)
- } else {
- self.search(printer, path, file)
- }
- }
- };
- match result {
- Ok(count) => {
- self.match_count += count;
- }
- Err(err) => {
+ Ok(dent) => {
+ if let Some(err) = dent.error() {
eprintln!("{}", err);
}
+ if !dent.file_type().map_or(true, |x| x.is_file()) {
+ None
+ } else {
+ Some(dent)
+ }
}
}
+}
- fn search<R: io::Read, W: Terminal + Send>(
- &mut self,
- printer: &mut Printer<W>,
- path: &Path,
- rdr: R,
- ) -> Result<u64> {
- self.args.searcher(
- &mut self.inpbuf,
- printer,
- &self.grep,
- path,
- rdr,
- ).run().map_err(From::from)
- }
-
- fn search_mmap<W: Terminal + Send>(
- &mut self,
- printer: &mut Printer<W>,
- path: &Path,
- file: &File,
- ) -> Result<u64> {
- if try!(file.metadata()).len() == 0 {
- // Opening a memory map with an empty file results in an error.
- // However, this may not actually be an empty file! For example,
- // /proc/cpuinfo reports itself as an empty file, but it can
- // produce data when it's read from. Therefore, we fall back to
- // regular read calls.
- return self.search(printer, path, file);
- }
- let mmap = try!(Mmap::open(file, Protection::Read));
- Ok(self.args.searcher_buffer(
- printer,
- &self.grep,
- path,
- unsafe { mmap.as_slice() },
- ).run())
- }
+fn eprint_nothing_searched() {
+ eprintln!("No files were searched, which means ripgrep probably \
+ applied a filter you didn't expect. \
+ Try running again with --debug.");
}
+/// A simple thread safe abstraction for determining whether a search should
+/// stop if the user has requested quiet mode.
#[derive(Clone, Debug)]
-struct QuietMatched(Arc<Option<AtomicBool>>);
+pub struct QuietMatched(Arc<Option<AtomicBool>>);
impl QuietMatched {
- fn new(quiet: bool) -> QuietMatched {
+ /// Create a new QuietMatched value.
+ ///
+ /// If quiet is true, then set_match and has_match will reflect whether
+ /// a search should quit or not because it found a match.
+ ///
+ /// If quiet is false, then set_match is always a no-op and has_match
+ /// always returns false.
+ pub fn new(quiet: bool) -> QuietMatched {
let atomic = if quiet { Some(AtomicBool::new(false)) } else { None };
QuietMatched(Arc::new(atomic))
}
- fn has_match(&self) -> bool {
+ /// Returns true if and only if quiet mode is enabled and a match has
+ /// occurred.
+ pub fn has_match(&self) -> bool {
match *self.0 {
None => false,
Some(ref matched) => matched.load(Ordering::SeqCst),
}
}
- fn set_match(&self, yes: bool) -> bool {
+ /// Sets whether a match has occurred or not.
+ ///
+ /// If quiet mode is disabled, then this is a no-op.
+ pub fn set_match(&self, yes: bool) -> bool {
match *self.0 {
None => false,
Some(_) if !yes => false,