summaryrefslogtreecommitdiffstats
path: root/crates
diff options
context:
space:
mode:
authorAndrew Gallant <jamslam@gmail.com>2020-04-17 19:58:47 -0400
committerAndrew Gallant <jamslam@gmail.com>2020-04-18 11:33:03 -0400
commit139f186e5721e74542604fad8a3c5907c4911faa (patch)
treede9b6b453a4d3782a1c08b1325ac88b416ae85cb /crates
parentafb325f733d74d1520bbe9f251f2969e45c73431 (diff)
crates/ignore: switch to depth first traversal
This replaces the use of channels in the parallel directory traversal with a simple stack. The primary motivation for this change is to reduce peak memory usage. In particular, when using a channel (which is a queue), we wind up visiting files in a breadth first fashion. Using a stack switches us to a depth first traversal. While there are no real intrinsic differences, depth first traversal generally tends to use less memory because directory trees are more commonly wide than they are deep. In particular, the queue/stack size itself is not the only concern. In one recent case documented in #1550, a user wanted to search all Rust crates. The directory structure was shallow but extremely wide, with a single directory containing all crates. This in turn results is in descending into each of those directories and building a gitignore matcher for each (since most crates have `.gitignore` files) before ever searching a single file. This means that ripgrep has all such matchers in memory simultaneously, which winds up using quite a bit of memory. In a depth first traversal, peak memory usage is much lower because gitignore matches are built and discarded more quickly. In the case of searching all crates, the peak memory usage decrease is dramatic. On my system, it shrinks by an order magnitude, from almost 1GB to 50MB. The decline in peak memory usage is consistent across other use cases as well, but is typically more modest. For example, searching the Linux repo has a 50% decrease in peak memory usage and searching the Chromium repo has a 25% decrease in peak memory usage. Search times generally remain unchanged, although some ad hoc benchmarks that I typically run have gotten a bit slower. As far as I can tell, this appears to be result of scheduling changes. Namely, the depth first traversal seems to result in searching some very large files towards the end of the search, which reduces the effectiveness of parallelism and makes the overall search take longer. This seems to suggest that a stack isn't optimal. It would instead perhaps be better to prioritize searching larger files first, but it's not quite clear how to do this without introducing more overhead (getting the file size for each file requires a stat call). Fixes #1550
Diffstat (limited to 'crates')
-rw-r--r--crates/ignore/Cargo.toml4
-rw-r--r--crates/ignore/src/lib.rs1
-rw-r--r--crates/ignore/src/walk.rs95
3 files changed, 57 insertions, 43 deletions
diff --git a/crates/ignore/Cargo.toml b/crates/ignore/Cargo.toml
index f33cb3c2..479d595f 100644
--- a/crates/ignore/Cargo.toml
+++ b/crates/ignore/Cargo.toml
@@ -18,7 +18,6 @@ name = "ignore"
bench = false
[dependencies]
-crossbeam-channel = "0.4.0"
crossbeam-utils = "0.7.0"
globset = { version = "0.4.3", path = "../globset" }
lazy_static = "1.1"
@@ -32,5 +31,8 @@ walkdir = "2.2.7"
[target.'cfg(windows)'.dependencies.winapi-util]
version = "0.1.2"
+[dev-dependencies]
+crossbeam-channel = "0.4.0"
+
[features]
simd-accel = ["globset/simd-accel"]
diff --git a/crates/ignore/src/lib.rs b/crates/ignore/src/lib.rs
index 71b112c7..bcf0ef49 100644
--- a/crates/ignore/src/lib.rs
+++ b/crates/ignore/src/lib.rs
@@ -46,7 +46,6 @@ See the documentation for `WalkBuilder` for many other options.
#![deny(missing_docs)]
-extern crate crossbeam_channel as channel;
extern crate globset;
#[macro_use]
extern crate lazy_static;
diff --git a/crates/ignore/src/walk.rs b/crates/ignore/src/walk.rs
index 6734788f..65606ee3 100644
--- a/crates/ignore/src/walk.rs
+++ b/crates/ignore/src/walk.rs
@@ -5,10 +5,11 @@ use std::fs::{self, FileType, Metadata};
use std::io;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
+use std::thread;
+use std::time::Duration;
use std::vec;
-use channel::{self, TryRecvError};
use same_file::Handle;
use walkdir::{self, WalkDir};
@@ -364,7 +365,8 @@ impl DirEntryRaw {
})
}
- // Placeholder implementation to allow compiling on non-standard platforms (e.g. wasm32).
+ // Placeholder implementation to allow compiling on non-standard platforms
+ // (e.g. wasm32).
#[cfg(not(any(windows, unix)))]
fn from_entry_os(
depth: usize,
@@ -413,7 +415,8 @@ impl DirEntryRaw {
})
}
- // Placeholder implementation to allow compiling on non-standard platforms (e.g. wasm32).
+ // Placeholder implementation to allow compiling on non-standard platforms
+ // (e.g. wasm32).
#[cfg(not(any(windows, unix)))]
fn from_path(
depth: usize,
@@ -1186,16 +1189,9 @@ impl WalkParallel {
/// can be merged together into a single data structure.
pub fn visit(mut self, builder: &mut dyn ParallelVisitorBuilder) {
let threads = self.threads();
- // TODO: Figure out how to use a bounded channel here. With an
- // unbounded channel, the workers can run away and fill up memory
- // with all of the file paths. But a bounded channel doesn't work since
- // our producers are also are consumers, so they end up getting stuck.
- //
- // We probably need to rethink parallel traversal completely to fix
- // this. The best case scenario would be finding a way to use rayon
- // to do this.
- let (tx, rx) = channel::unbounded();
+ let stack = Arc::new(Mutex::new(vec![]));
{
+ let mut stack = stack.lock().unwrap();
let mut visitor = builder.build();
let mut paths = Vec::new().into_iter();
std::mem::swap(&mut paths, &mut self.paths);
@@ -1232,28 +1228,27 @@ impl WalkParallel {
}
}
};
- tx.send(Message::Work(Work {
+ stack.push(Message::Work(Work {
dent: dent,
ignore: self.ig_root.clone(),
root_device: root_device,
- }))
- .unwrap();
+ }));
}
// ... but there's no need to start workers if we don't need them.
- if tx.is_empty() {
+ if stack.is_empty() {
return;
}
}
// Create the workers and then wait for them to finish.
let quit_now = Arc::new(AtomicBool::new(false));
- let num_pending = Arc::new(AtomicUsize::new(tx.len()));
+ let num_pending =
+ Arc::new(AtomicUsize::new(stack.lock().unwrap().len()));
crossbeam_utils::thread::scope(|s| {
let mut handles = vec![];
for _ in 0..threads {
let worker = Worker {
visitor: builder.build(),
- tx: tx.clone(),
- rx: rx.clone(),
+ stack: stack.clone(),
quit_now: quit_now.clone(),
num_pending: num_pending.clone(),
max_depth: self.max_depth,
@@ -1263,8 +1258,6 @@ impl WalkParallel {
};
handles.push(s.spawn(|_| worker.run()));
}
- drop(tx);
- drop(rx);
for handle in handles {
handle.join().unwrap();
}
@@ -1362,10 +1355,13 @@ impl Work {
struct Worker<'s> {
/// The caller's callback.
visitor: Box<dyn ParallelVisitor + 's>,
- /// The push side of our mpmc queue.
- tx: channel::Sender<Message>,
- /// The receive side of our mpmc queue.
- rx: channel::Receiver<Message>,
+ /// A stack of work to do.
+ ///
+ /// We use a stack instead of a channel because a stack lets us visit
+ /// directories in depth first order. This can substantially reduce peak
+ /// memory usage by keeping both the number of files path and gitignore
+ /// matchers in memory lower.
+ stack: Arc<Mutex<Vec<Message>>>,
/// Whether all workers should terminate at the next opportunity. Note
/// that we need this because we don't want other `Work` to be done after
/// we quit. We wouldn't need this if have a priority channel.
@@ -1550,25 +1546,25 @@ impl<'s> Worker<'s> {
/// If all work has been exhausted, then this returns None. The worker
/// should then subsequently quit.
fn get_work(&mut self) -> Option<Work> {
- let mut value = self.rx.try_recv();
+ let mut value = self.recv();
loop {
// Simulate a priority channel: If quit_now flag is set, we can
// receive only quit messages.
if self.is_quit_now() {
- value = Ok(Message::Quit)
+ value = Some(Message::Quit)
}
match value {
- Ok(Message::Work(work)) => {
+ Some(Message::Work(work)) => {
return Some(work);
}
- Ok(Message::Quit) => {
+ Some(Message::Quit) => {
// Repeat quit message to wake up sleeping threads, if
// any. The domino effect will ensure that every thread
// will quit.
- self.tx.send(Message::Quit).unwrap();
+ self.send_quit();
return None;
}
- Err(TryRecvError::Empty) => {
+ None => {
// Once num_pending reaches 0, it is impossible for it to
// ever increase again. Namely, it only reaches 0 once
// all jobs have run such that no jobs have produced more
@@ -1580,17 +1576,21 @@ impl<'s> Worker<'s> {
if self.num_pending() == 0 {
// Every other thread is blocked at the next recv().
// Send the initial quit message and quit.
- self.tx.send(Message::Quit).unwrap();
+ self.send_quit();
return None;
}
// Wait for next `Work` or `Quit` message.
- value = Ok(self
- .rx
- .recv()
- .expect("channel disconnected while worker is alive"));
- }
- Err(TryRecvError::Disconnected) => {
- unreachable!("channel disconnected while worker is alive");
+ loop {
+ if let Some(v) = self.recv() {
+ value = Some(v);
+ break;
+ }
+ // Our stack isn't blocking. Instead of burning the
+ // CPU waiting, we let the thread sleep for a bit. In
+ // general, this tends to only occur once the search is
+ // approaching termination.
+ thread::sleep(Duration::from_millis(1));
+ }
}
}
}
@@ -1614,7 +1614,20 @@ impl<'s> Worker<'s> {
/// Send work.
fn send(&self, work: Work) {
self.num_pending.fetch_add(1, Ordering::SeqCst);
- self.tx.send(Message::Work(work)).unwrap();
+ let mut stack = self.stack.lock().unwrap();
+ stack.push(Message::Work(work));
+ }
+
+ /// Send a quit message.
+ fn send_quit(&self) {
+ let mut stack = self.stack.lock().unwrap();
+ stack.push(Message::Quit);
+ }
+
+ /// Receive work.
+ fn recv(&self) -> Option<Message> {
+ let mut stack = self.stack.lock().unwrap();
+ stack.pop()
}
/// Signal that work has been received.