summaryrefslogtreecommitdiffstats
path: root/ignore
diff options
context:
space:
mode:
authorAndrew Gallant <jamslam@gmail.com>2018-08-21 19:47:12 -0400
committerAndrew Gallant <jamslam@gmail.com>2018-08-21 23:05:52 -0400
commit0eef05142a03cbe393e8e346d401991d45878a81 (patch)
tree31a7652820751ffed210cac412d68d8054951ef4 /ignore
parentedd6eb4e06c528b668672ac32b45a62939802a83 (diff)
ripgrep: move minimum version to Rust stable
This also updates some code to make use of our more liberal versioning requirement, including the use of crossbeam-channel instead of the MsQueue from the older an unmaintained crossbeam 0.3. This does regrettably add a sizable number of dependencies, however, compile times seem mostly unaffected. Closes #1019
Diffstat (limited to 'ignore')
-rw-r--r--ignore/Cargo.toml2
-rw-r--r--ignore/examples/walk.rs19
-rw-r--r--ignore/src/lib.rs2
-rw-r--r--ignore/src/walk.rs30
4 files changed, 31 insertions, 22 deletions
diff --git a/ignore/Cargo.toml b/ignore/Cargo.toml
index 42b043bf..029860f4 100644
--- a/ignore/Cargo.toml
+++ b/ignore/Cargo.toml
@@ -18,7 +18,7 @@ name = "ignore"
bench = false
[dependencies]
-crossbeam = "0.3"
+crossbeam-channel = "0.2"
globset = { version = "0.4.0", path = "../globset" }
lazy_static = "1"
log = "0.4"
diff --git a/ignore/examples/walk.rs b/ignore/examples/walk.rs
index ad64e015..67432b71 100644
--- a/ignore/examples/walk.rs
+++ b/ignore/examples/walk.rs
@@ -1,14 +1,12 @@
-extern crate crossbeam;
+extern crate crossbeam_channel as channel;
extern crate ignore;
extern crate walkdir;
use std::env;
use std::io::{self, Write};
use std::path::Path;
-use std::sync::Arc;
use std::thread;
-use crossbeam::sync::MsQueue;
use ignore::WalkBuilder;
use walkdir::WalkDir;
@@ -16,7 +14,7 @@ fn main() {
let mut path = env::args().nth(1).unwrap();
let mut parallel = false;
let mut simple = false;
- let queue: Arc<MsQueue<Option<DirEntry>>> = Arc::new(MsQueue::new());
+ let (tx, rx) = channel::bounded::<DirEntry>(100);
if path == "parallel" {
path = env::args().nth(2).unwrap();
parallel = true;
@@ -25,10 +23,9 @@ fn main() {
simple = true;
}
- let stdout_queue = queue.clone();
let stdout_thread = thread::spawn(move || {
let mut stdout = io::BufWriter::new(io::stdout());
- while let Some(dent) = stdout_queue.pop() {
+ for dent in rx {
write_path(&mut stdout, dent.path());
}
});
@@ -36,26 +33,26 @@ fn main() {
if parallel {
let walker = WalkBuilder::new(path).threads(6).build_parallel();
walker.run(|| {
- let queue = queue.clone();
+ let tx = tx.clone();
Box::new(move |result| {
use ignore::WalkState::*;
- queue.push(Some(DirEntry::Y(result.unwrap())));
+ tx.send(DirEntry::Y(result.unwrap()));
Continue
})
});
} else if simple {
let walker = WalkDir::new(path);
for result in walker {
- queue.push(Some(DirEntry::X(result.unwrap())));
+ tx.send(DirEntry::X(result.unwrap()));
}
} else {
let walker = WalkBuilder::new(path).build();
for result in walker {
- queue.push(Some(DirEntry::Y(result.unwrap())));
+ tx.send(DirEntry::Y(result.unwrap()));
}
}
- queue.push(None);
+ drop(tx);
stdout_thread.join().unwrap();
}
diff --git a/ignore/src/lib.rs b/ignore/src/lib.rs
index b97e267a..190794f5 100644
--- a/ignore/src/lib.rs
+++ b/ignore/src/lib.rs
@@ -46,7 +46,7 @@ See the documentation for `WalkBuilder` for many other options.
#![deny(missing_docs)]
-extern crate crossbeam;
+extern crate crossbeam_channel as channel;
extern crate globset;
#[macro_use]
extern crate lazy_static;
diff --git a/ignore/src/walk.rs b/ignore/src/walk.rs
index fc36b4e2..09bac172 100644
--- a/ignore/src/walk.rs
+++ b/ignore/src/walk.rs
@@ -10,7 +10,7 @@ use std::thread;
use std::time::Duration;
use std::vec;
-use crossbeam::sync::MsQueue;
+use channel;
use same_file::Handle;
use walkdir::{self, WalkDir};
@@ -956,7 +956,14 @@ impl WalkParallel {
) where F: FnMut() -> Box<FnMut(Result<DirEntry, Error>) -> WalkState + Send + 'static> {
let mut f = mkf();
let threads = self.threads();
- let queue = Arc::new(MsQueue::new());
+ // TODO: Figure out how to use a bounded channel here. With an
+ // unbounded channel, the workers can run away and will up memory
+ // with all of the file paths. But a bounded channel doesn't work since
+ // our producers are also are consumers, so they end up getting stuck.
+ //
+ // We probably need to rethink parallel traversal completely to fix
+ // this.
+ let (tx, rx) = channel::unbounded();
let mut any_work = false;
// Send the initial set of root paths to the pool of workers.
// Note that we only send directories. For files, we send to them the
@@ -976,7 +983,7 @@ impl WalkParallel {
}
}
};
- queue.push(Message::Work(Work {
+ tx.send(Message::Work(Work {
dent: dent,
ignore: self.ig_root.clone(),
}));
@@ -994,7 +1001,8 @@ impl WalkParallel {
for _ in 0..threads {
let worker = Worker {
f: mkf(),
- queue: queue.clone(),
+ tx: tx.clone(),
+ rx: rx.clone(),
quit_now: quit_now.clone(),
is_waiting: false,
is_quitting: false,
@@ -1007,6 +1015,8 @@ impl WalkParallel {
};
handles.push(thread::spawn(|| worker.run()));
}
+ drop(tx);
+ drop(rx);
for handle in handles {
handle.join().unwrap();
}
@@ -1099,8 +1109,10 @@ impl Work {
struct Worker {
/// The caller's callback.
f: Box<FnMut(Result<DirEntry, Error>) -> WalkState + Send + 'static>,
- /// A queue of work items. This is multi-producer and multi-consumer.
- queue: Arc<MsQueue<Message>>,
+ /// The push side of our mpmc queue.
+ tx: channel::Sender<Message>,
+ /// The receive side of our mpmc queue.
+ rx: channel::Receiver<Message>,
/// Whether all workers should quit at the next opportunity. Note that
/// this is distinct from quitting because of exhausting the contents of
/// a directory. Instead, this is used when the caller's callback indicates
@@ -1235,7 +1247,7 @@ impl Worker {
};
if !should_skip_path && !should_skip_filesize {
- self.queue.push(Message::Work(Work {
+ self.tx.send(Message::Work(Work {
dent: dent,
ignore: ig.clone(),
}));
@@ -1252,7 +1264,7 @@ impl Worker {
if self.is_quit_now() {
return None;
}
- match self.queue.try_pop() {
+ match self.rx.try_recv() {
Some(Message::Work(work)) => {
self.waiting(false);
self.quitting(false);
@@ -1294,7 +1306,7 @@ impl Worker {
self.quitting(false);
if self.num_waiting() == self.threads {
for _ in 0..self.threads {
- self.queue.push(Message::Quit);
+ self.tx.send(Message::Quit);
}
} else {
// You're right to consider this suspicious, but it's