summaryrefslogtreecommitdiffstats
path: root/crates/ignore
diff options
context:
space:
mode:
authorTavian Barnes <tavianator@tavianator.com>2023-10-30 15:56:08 -0400
committerAndrew Gallant <jamslam@gmail.com>2023-11-21 18:39:32 -0500
commit6d7550d58e88583deeb142b56e0dbe52f5102cbf (patch)
tree165ca4da3edfb89a5212abc9d837824695db6b49 /crates/ignore
parentaf55fc2b38cc5d31e24f3392ec2d03ab5f6428c4 (diff)
ignore: Avoid contention on num_pending
Previously, every worker would increment the shared num_pending count on every new work item, and decrement it after finishing them, leading to lots of contention. Now, we only track the number of workers actively running, so there is no contention except when workers go to sleep or wake up. Closes #2642
Diffstat (limited to 'crates/ignore')
-rw-r--r--crates/ignore/src/walk.rs45
1 files changed, 20 insertions, 25 deletions
diff --git a/crates/ignore/src/walk.rs b/crates/ignore/src/walk.rs
index 4fee1d88..2288fe0b 100644
--- a/crates/ignore/src/walk.rs
+++ b/crates/ignore/src/walk.rs
@@ -1279,7 +1279,7 @@ impl WalkParallel {
}
// Create the workers and then wait for them to finish.
let quit_now = Arc::new(AtomicBool::new(false));
- let num_pending = Arc::new(AtomicUsize::new(stack.len()));
+ let active_workers = Arc::new(AtomicUsize::new(threads));
let stacks = Stack::new_for_each_thread(threads, stack);
std::thread::scope(|s| {
let handles: Vec<_> = stacks
@@ -1288,7 +1288,7 @@ impl WalkParallel {
visitor: builder.build(),
stack,
quit_now: quit_now.clone(),
- num_pending: num_pending.clone(),
+ active_workers: active_workers.clone(),
max_depth: self.max_depth,
max_filesize: self.max_filesize,
follow_links: self.follow_links,
@@ -1471,8 +1471,8 @@ struct Worker<'s> {
/// that we need this because we don't want other `Work` to be done after
/// we quit. We wouldn't need this if have a priority channel.
quit_now: Arc<AtomicBool>,
- /// The number of outstanding work items.
- num_pending: Arc<AtomicUsize>,
+ /// The number of currently active workers.
+ active_workers: Arc<AtomicUsize>,
/// The maximum depth of directories to descend. A value of `0` means no
/// descension at all.
max_depth: Option<usize>,
@@ -1500,7 +1500,6 @@ impl<'s> Worker<'s> {
if let WalkState::Quit = self.run_one(work) {
self.quit_now();
}
- self.work_done();
}
}
@@ -1682,23 +1681,20 @@ impl<'s> Worker<'s> {
return None;
}
None => {
- // Once num_pending reaches 0, it is impossible for it to
- // ever increase again. Namely, it only reaches 0 once
- // all jobs have run such that no jobs have produced more
- // work. We have this guarantee because num_pending is
- // always incremented before each job is submitted and only
- // decremented once each job is completely finished.
- // Therefore, if this reaches zero, then there can be no
- // other job running.
- if self.num_pending() == 0 {
- // Every other thread is blocked at the next recv().
- // Send the initial quit message and quit.
+ if self.deactivate_worker() == 0 {
+ // If deactivate_worker() returns 0, every worker thread
+ // is currently within the critical section between the
+ // acquire in deactivate_worker() and the release in
+ // activate_worker() below. For this to happen, every
+ // worker's local deque must be simultaneously empty,
+ // meaning there is no more work left at all.
self.send_quit();
return None;
}
// Wait for next `Work` or `Quit` message.
loop {
if let Some(v) = self.recv() {
+ self.activate_worker();
value = Some(v);
break;
}
@@ -1724,14 +1720,8 @@ impl<'s> Worker<'s> {
self.quit_now.load(AtomicOrdering::SeqCst)
}
- /// Returns the number of pending jobs.
- fn num_pending(&self) -> usize {
- self.num_pending.load(AtomicOrdering::SeqCst)
- }
-
/// Send work.
fn send(&self, work: Work) {
- self.num_pending.fetch_add(1, AtomicOrdering::SeqCst);
self.stack.push(Message::Work(work));
}
@@ -1745,9 +1735,14 @@ impl<'s> Worker<'s> {
self.stack.pop()
}
- /// Signal that work has been finished.
- fn work_done(&self) {
- self.num_pending.fetch_sub(1, AtomicOrdering::SeqCst);
+ /// Deactivates a worker and returns the number of currently active workers.
+ fn deactivate_worker(&self) -> usize {
+ self.active_workers.fetch_sub(1, AtomicOrdering::Acquire) - 1
+ }
+
+ /// Reactivates a worker.
+ fn activate_worker(&self) {
+ self.active_workers.fetch_add(1, AtomicOrdering::Release);
}
}