summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md2
-rw-r--r--crates/ignore/src/walk.rs45
2 files changed, 22 insertions, 25 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d8c7e6ab..795d26b9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -23,6 +23,8 @@ Performance improvements:
Make most searches with `\b` look-arounds (among others) much faster.
* [PERF #2591](https://github.com/BurntSushi/ripgrep/pull/2591):
Parallel directory traversal now uses work stealing for faster searches.
+* [PERF #2642](https://github.com/BurntSushi/ripgrep/pull/2642):
+ Parallel directory traversal has some contention reduced.
Feature enhancements:
diff --git a/crates/ignore/src/walk.rs b/crates/ignore/src/walk.rs
index 4fee1d88..2288fe0b 100644
--- a/crates/ignore/src/walk.rs
+++ b/crates/ignore/src/walk.rs
@@ -1279,7 +1279,7 @@ impl WalkParallel {
}
// Create the workers and then wait for them to finish.
let quit_now = Arc::new(AtomicBool::new(false));
- let num_pending = Arc::new(AtomicUsize::new(stack.len()));
+ let active_workers = Arc::new(AtomicUsize::new(threads));
let stacks = Stack::new_for_each_thread(threads, stack);
std::thread::scope(|s| {
let handles: Vec<_> = stacks
@@ -1288,7 +1288,7 @@ impl WalkParallel {
visitor: builder.build(),
stack,
quit_now: quit_now.clone(),
- num_pending: num_pending.clone(),
+ active_workers: active_workers.clone(),
max_depth: self.max_depth,
max_filesize: self.max_filesize,
follow_links: self.follow_links,
@@ -1471,8 +1471,8 @@ struct Worker<'s> {
/// that we need this because we don't want other `Work` to be done after
/// we quit. We wouldn't need this if have a priority channel.
quit_now: Arc<AtomicBool>,
- /// The number of outstanding work items.
- num_pending: Arc<AtomicUsize>,
+ /// The number of currently active workers.
+ active_workers: Arc<AtomicUsize>,
/// The maximum depth of directories to descend. A value of `0` means no
/// descension at all.
max_depth: Option<usize>,
@@ -1500,7 +1500,6 @@ impl<'s> Worker<'s> {
if let WalkState::Quit = self.run_one(work) {
self.quit_now();
}
- self.work_done();
}
}
@@ -1682,23 +1681,20 @@ impl<'s> Worker<'s> {
return None;
}
None => {
- // Once num_pending reaches 0, it is impossible for it to
- // ever increase again. Namely, it only reaches 0 once
- // all jobs have run such that no jobs have produced more
- // work. We have this guarantee because num_pending is
- // always incremented before each job is submitted and only
- // decremented once each job is completely finished.
- // Therefore, if this reaches zero, then there can be no
- // other job running.
- if self.num_pending() == 0 {
- // Every other thread is blocked at the next recv().
- // Send the initial quit message and quit.
+ if self.deactivate_worker() == 0 {
+ // If deactivate_worker() returns 0, every worker thread
+ // is currently within the critical section between the
+ // acquire in deactivate_worker() and the release in
+ // activate_worker() below. For this to happen, every
+ // worker's local deque must be simultaneously empty,
+ // meaning there is no more work left at all.
self.send_quit();
return None;
}
// Wait for next `Work` or `Quit` message.
loop {
if let Some(v) = self.recv() {
+ self.activate_worker();
value = Some(v);
break;
}
@@ -1724,14 +1720,8 @@ impl<'s> Worker<'s> {
self.quit_now.load(AtomicOrdering::SeqCst)
}
- /// Returns the number of pending jobs.
- fn num_pending(&self) -> usize {
- self.num_pending.load(AtomicOrdering::SeqCst)
- }
-
/// Send work.
fn send(&self, work: Work) {
- self.num_pending.fetch_add(1, AtomicOrdering::SeqCst);
self.stack.push(Message::Work(work));
}
@@ -1745,9 +1735,14 @@ impl<'s> Worker<'s> {
self.stack.pop()
}
- /// Signal that work has been finished.
- fn work_done(&self) {
- self.num_pending.fetch_sub(1, AtomicOrdering::SeqCst);
+ /// Deactivates a worker and returns the number of currently active workers.
+ fn deactivate_worker(&self) -> usize {
+ self.active_workers.fetch_sub(1, AtomicOrdering::Acquire) - 1
+ }
+
+ /// Reactivates a worker.
+ fn activate_worker(&self) {
+ self.active_workers.fetch_add(1, AtomicOrdering::Release);
}
}