diff options
-rw-r--r-- | tokio/src/runtime/thread_pool/idle.rs | 15 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/queue/global.rs | 11 |
2 files changed, 14 insertions, 12 deletions
diff --git a/tokio/src/runtime/thread_pool/idle.rs b/tokio/src/runtime/thread_pool/idle.rs index acf80df8..ae87ca4b 100644 --- a/tokio/src/runtime/thread_pool/idle.rs +++ b/tokio/src/runtime/thread_pool/idle.rs @@ -4,7 +4,7 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; use std::fmt; -use std::sync::atomic::Ordering::{self, AcqRel, Relaxed, SeqCst}; +use std::sync::atomic::Ordering::{self, SeqCst}; pub(super) struct Idle { /// Tracks both the number of searching workers and the number of unparked @@ -89,12 +89,7 @@ impl Idle { } pub(super) fn transition_worker_to_searching(&self) -> bool { - // Using `Relaxed` ordering is acceptable here as it is just an - // optimization. This load has does not need to synchronize with - // anything, and the algorithm is correct no matter what the load - // returns (as in, it could return absolutely any `usize` value and the - // pool would be correct. - let state = State::load(&self.state, Relaxed); + let state = State::load(&self.state, SeqCst); if 2 * state.num_searching() >= self.num_workers { return false; } @@ -102,9 +97,7 @@ impl Idle { // It is possible for this routine to allow more than 50% of the workers // to search. That is OK. Limiting searchers is only an optimization to // prevent too much contention. - // - // At this point, we do not need a hard synchronization with `notify_work`, so `AcqRel` is sufficient. - State::inc_num_searching(&self.state, AcqRel); + State::inc_num_searching(&self.state, SeqCst); true } @@ -140,7 +133,7 @@ impl Idle { } fn notify_should_wakeup(&self) -> bool { - let state = State::load(&self.state, SeqCst); + let state = State(self.state.fetch_add(0, SeqCst)); state.num_searching() == 0 && state.num_unparked() < self.num_workers } } diff --git a/tokio/src/runtime/thread_pool/queue/global.rs b/tokio/src/runtime/thread_pool/queue/global.rs index 931b76a6..a6f49c01 100644 --- a/tokio/src/runtime/thread_pool/queue/global.rs +++ b/tokio/src/runtime/thread_pool/queue/global.rs @@ -51,7 +51,7 @@ impl<T: 'static> Queue<T> { /// Close the worker queue pub(super) fn close(&self) -> bool { // Acquire the lock - let _p = self.pointers.lock().unwrap(); + let p = self.pointers.lock().unwrap(); let len = unsafe { // Set the queue as closed. Because all mutations are synchronized by @@ -63,6 +63,8 @@ impl<T: 'static> Queue<T> { self.len.store(len | CLOSED, Release); + drop(p); + ret } @@ -119,7 +121,10 @@ impl<T: 'static> Queue<T> { } self.len.store(len + 2, Release); + f(Ok(())); + + drop(p); } } @@ -153,6 +158,8 @@ impl<T: 'static> Queue<T> { } self.len.store(len + (num << 1), Release); + + drop(p); } } @@ -185,6 +192,8 @@ impl<T: 'static> Queue<T> { // Decrement by 2 to avoid touching the shutdown flag self.len.store(self.len.unsync_load() - 2, Release); + drop(p); + Some(Task::from_raw(task)) } } |