diff options
author | Andrew Gallant <jamslam@gmail.com> | 2018-08-21 19:47:12 -0400 |
---|---|---|
committer | Andrew Gallant <jamslam@gmail.com> | 2018-08-21 23:05:52 -0400 |
commit | 0eef05142a03cbe393e8e346d401991d45878a81 (patch) | |
tree | 31a7652820751ffed210cac412d68d8054951ef4 /ignore | |
parent | edd6eb4e06c528b668672ac32b45a62939802a83 (diff) |
ripgrep: move minimum version to Rust stable
This also updates some code to make use of our more liberal versioning
requirement, including the use of crossbeam-channel instead of the MsQueue
from the older an unmaintained crossbeam 0.3. This does regrettably add
a sizable number of dependencies, however, compile times seem mostly
unaffected.
Closes #1019
Diffstat (limited to 'ignore')
-rw-r--r-- | ignore/Cargo.toml | 2 | ||||
-rw-r--r-- | ignore/examples/walk.rs | 19 | ||||
-rw-r--r-- | ignore/src/lib.rs | 2 | ||||
-rw-r--r-- | ignore/src/walk.rs | 30 |
4 files changed, 31 insertions, 22 deletions
diff --git a/ignore/Cargo.toml b/ignore/Cargo.toml index 42b043bf..029860f4 100644 --- a/ignore/Cargo.toml +++ b/ignore/Cargo.toml @@ -18,7 +18,7 @@ name = "ignore" bench = false [dependencies] -crossbeam = "0.3" +crossbeam-channel = "0.2" globset = { version = "0.4.0", path = "../globset" } lazy_static = "1" log = "0.4" diff --git a/ignore/examples/walk.rs b/ignore/examples/walk.rs index ad64e015..67432b71 100644 --- a/ignore/examples/walk.rs +++ b/ignore/examples/walk.rs @@ -1,14 +1,12 @@ -extern crate crossbeam; +extern crate crossbeam_channel as channel; extern crate ignore; extern crate walkdir; use std::env; use std::io::{self, Write}; use std::path::Path; -use std::sync::Arc; use std::thread; -use crossbeam::sync::MsQueue; use ignore::WalkBuilder; use walkdir::WalkDir; @@ -16,7 +14,7 @@ fn main() { let mut path = env::args().nth(1).unwrap(); let mut parallel = false; let mut simple = false; - let queue: Arc<MsQueue<Option<DirEntry>>> = Arc::new(MsQueue::new()); + let (tx, rx) = channel::bounded::<DirEntry>(100); if path == "parallel" { path = env::args().nth(2).unwrap(); parallel = true; @@ -25,10 +23,9 @@ fn main() { simple = true; } - let stdout_queue = queue.clone(); let stdout_thread = thread::spawn(move || { let mut stdout = io::BufWriter::new(io::stdout()); - while let Some(dent) = stdout_queue.pop() { + for dent in rx { write_path(&mut stdout, dent.path()); } }); @@ -36,26 +33,26 @@ fn main() { if parallel { let walker = WalkBuilder::new(path).threads(6).build_parallel(); walker.run(|| { - let queue = queue.clone(); + let tx = tx.clone(); Box::new(move |result| { use ignore::WalkState::*; - queue.push(Some(DirEntry::Y(result.unwrap()))); + tx.send(DirEntry::Y(result.unwrap())); Continue }) }); } else if simple { let walker = WalkDir::new(path); for result in walker { - queue.push(Some(DirEntry::X(result.unwrap()))); + tx.send(DirEntry::X(result.unwrap())); } } else { let walker = WalkBuilder::new(path).build(); for result in walker { - queue.push(Some(DirEntry::Y(result.unwrap()))); + tx.send(DirEntry::Y(result.unwrap())); } } - queue.push(None); + drop(tx); stdout_thread.join().unwrap(); } diff --git a/ignore/src/lib.rs b/ignore/src/lib.rs index b97e267a..190794f5 100644 --- a/ignore/src/lib.rs +++ b/ignore/src/lib.rs @@ -46,7 +46,7 @@ See the documentation for `WalkBuilder` for many other options. #![deny(missing_docs)] -extern crate crossbeam; +extern crate crossbeam_channel as channel; extern crate globset; #[macro_use] extern crate lazy_static; diff --git a/ignore/src/walk.rs b/ignore/src/walk.rs index fc36b4e2..09bac172 100644 --- a/ignore/src/walk.rs +++ b/ignore/src/walk.rs @@ -10,7 +10,7 @@ use std::thread; use std::time::Duration; use std::vec; -use crossbeam::sync::MsQueue; +use channel; use same_file::Handle; use walkdir::{self, WalkDir}; @@ -956,7 +956,14 @@ impl WalkParallel { ) where F: FnMut() -> Box<FnMut(Result<DirEntry, Error>) -> WalkState + Send + 'static> { let mut f = mkf(); let threads = self.threads(); - let queue = Arc::new(MsQueue::new()); + // TODO: Figure out how to use a bounded channel here. With an + // unbounded channel, the workers can run away and will up memory + // with all of the file paths. But a bounded channel doesn't work since + // our producers are also are consumers, so they end up getting stuck. + // + // We probably need to rethink parallel traversal completely to fix + // this. + let (tx, rx) = channel::unbounded(); let mut any_work = false; // Send the initial set of root paths to the pool of workers. // Note that we only send directories. For files, we send to them the @@ -976,7 +983,7 @@ impl WalkParallel { } } }; - queue.push(Message::Work(Work { + tx.send(Message::Work(Work { dent: dent, ignore: self.ig_root.clone(), })); @@ -994,7 +1001,8 @@ impl WalkParallel { for _ in 0..threads { let worker = Worker { f: mkf(), - queue: queue.clone(), + tx: tx.clone(), + rx: rx.clone(), quit_now: quit_now.clone(), is_waiting: false, is_quitting: false, @@ -1007,6 +1015,8 @@ impl WalkParallel { }; handles.push(thread::spawn(|| worker.run())); } + drop(tx); + drop(rx); for handle in handles { handle.join().unwrap(); } @@ -1099,8 +1109,10 @@ impl Work { struct Worker { /// The caller's callback. f: Box<FnMut(Result<DirEntry, Error>) -> WalkState + Send + 'static>, - /// A queue of work items. This is multi-producer and multi-consumer. - queue: Arc<MsQueue<Message>>, + /// The push side of our mpmc queue. + tx: channel::Sender<Message>, + /// The receive side of our mpmc queue. + rx: channel::Receiver<Message>, /// Whether all workers should quit at the next opportunity. Note that /// this is distinct from quitting because of exhausting the contents of /// a directory. Instead, this is used when the caller's callback indicates @@ -1235,7 +1247,7 @@ impl Worker { }; if !should_skip_path && !should_skip_filesize { - self.queue.push(Message::Work(Work { + self.tx.send(Message::Work(Work { dent: dent, ignore: ig.clone(), })); @@ -1252,7 +1264,7 @@ impl Worker { if self.is_quit_now() { return None; } - match self.queue.try_pop() { + match self.rx.try_recv() { Some(Message::Work(work)) => { self.waiting(false); self.quitting(false); @@ -1294,7 +1306,7 @@ impl Worker { self.quitting(false); if self.num_waiting() == self.threads { for _ in 0..self.threads { - self.queue.push(Message::Quit); + self.tx.send(Message::Quit); } } else { // You're right to consider this suspicious, but it's |