diff options
author | Brian L. Troutwine <blt@goodwatercap.com> | 2020-03-26 13:04:08 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-26 13:04:08 -0700 |
commit | 3fb213a8612699a46b2ccbeddd9adfbe3c468287 (patch) | |
tree | 793989aa6199c492552911c59e96ddb6da0681d6 /tokio/src/time | |
parent | 6cf1a5b6b8686e5bde107d072d77199aaefcb2ec (diff) |
timer: improve memory ordering in Inner's increment (#2107)
This commit improves the memory ordering in the implementation of
Inner's increment function. The former code did a sequentially
consistent load of self.num, then entered a loop with a sequentially
consistent compare and swap on the same, bailing out with and Err only
if the loaded value was MAX_TIMEOUTS. The use of SeqCst means that all
threads must observe all relevant memory operations in the same order,
implying synchronization between all CPUs.
This commit adjusts the implementation in two key ways. First, the
initial load of self.num is now down with Relaxed ordering. If two
threads entered this code simultaneously, formerly, tokio required
that one proceed before the other, negating their parallelism. Now,
either thread may proceed without coordination. Second, the SeqCst
compare_and_swap is changed to a Release, Relaxed
compare_exchange_weak. The first memory ordering referrs to success:
if the value is swapped the load of that value for comparison will be
Relaxed and the store will be Release. The second memory ordering
referrs to failure: if the value is not swapped the load is
Relaxed. The _weak variant may spuriously fail but will generate
better code.
These changes mean that it is possible for more loops to be taken per
call than strictly necessary but with greater parallelism available on
this operation, improved energy consumption as CPUs don't have to
coordinate as much.
Diffstat (limited to 'tokio/src/time')
-rw-r--r-- | tokio/src/time/driver/mod.rs | 28 | ||||
-rw-r--r-- | tokio/src/time/driver/tests/mod.rs | 55 |
2 files changed, 73 insertions, 10 deletions
diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index bb45cb7e..4616816f 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -20,7 +20,8 @@ use crate::park::{Park, Unpark}; use crate::time::{wheel, Error}; use crate::time::{Clock, Duration, Instant}; -use std::sync::atomic::Ordering::SeqCst; +use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst}; + use std::sync::Arc; use std::usize; use std::{cmp, fmt}; @@ -333,28 +334,32 @@ impl Inner { self.elapsed.load(SeqCst) } + #[cfg(all(test, loom))] + fn num(&self, ordering: std::sync::atomic::Ordering) -> usize { + self.num.load(ordering) + } + /// Increments the number of active timeouts fn increment(&self) -> Result<(), Error> { - let mut curr = self.num.load(SeqCst); - + let mut curr = self.num.load(Relaxed); loop { if curr == MAX_TIMEOUTS { return Err(Error::at_capacity()); } - let actual = self.num.compare_and_swap(curr, curr + 1, SeqCst); - - if curr == actual { - return Ok(()); + match self + .num + .compare_exchange_weak(curr, curr + 1, Release, Relaxed) + { + Ok(_) => return Ok(()), + Err(next) => curr = next, } - - curr = actual; } } /// Decrements the number of active timeouts fn decrement(&self) { - let prev = self.num.fetch_sub(1, SeqCst); + let prev = self.num.fetch_sub(1, Acquire); debug_assert!(prev <= MAX_TIMEOUTS); } @@ -381,3 +386,6 @@ impl fmt::Debug for Inner { fmt.debug_struct("Inner").finish() } } + +#[cfg(all(test, loom))] +mod tests; diff --git a/tokio/src/time/driver/tests/mod.rs b/tokio/src/time/driver/tests/mod.rs new file mode 100644 index 00000000..5986d402 --- /dev/null +++ b/tokio/src/time/driver/tests/mod.rs @@ -0,0 +1,55 @@ +use crate::park::Unpark; +use crate::time::driver::Inner; +use crate::time::Instant; + +use loom::thread; + +use std::sync::atomic::Ordering; +use std::sync::Arc; + +struct MockUnpark; + +impl Unpark for MockUnpark { + fn unpark(&self) {} +} + +#[test] +fn balanced_incr_and_decr() { + const OPS: usize = 100; + + fn incr(inner: Arc<Inner>) { + for _ in 0..OPS { + inner.increment().expect("increment should not have failed"); + thread::yield_now(); + } + } + + fn decr(inner: Arc<Inner>) { + let mut ops_performed = 0; + while ops_performed < OPS { + if inner.num(Ordering::Relaxed) > 0 { + ops_performed += 1; + inner.decrement(); + } + thread::yield_now(); + } + } + + loom::model(|| { + let unpark = Box::new(MockUnpark); + let instant = Instant::now(); + + let inner = Arc::new(Inner::new(instant, unpark)); + + let incr_inner = inner.clone(); + let decr_inner = inner.clone(); + + let incr_hndle = thread::spawn(move || incr(incr_inner)); + let decr_hndle = thread::spawn(move || decr(decr_inner)); + + incr_hndle.join().expect("should never fail"); + decr_hndle.join().expect("should never fail"); + + assert_eq!(inner.num(Ordering::SeqCst), 0); + }) +} |