summaryrefslogtreecommitdiffstats
path: root/tokio/src/time
diff options
context:
space:
mode:
authorBrian L. Troutwine <blt@goodwatercap.com>2020-03-26 13:04:08 -0700
committerGitHub <noreply@github.com>2020-03-26 13:04:08 -0700
commit3fb213a8612699a46b2ccbeddd9adfbe3c468287 (patch)
tree793989aa6199c492552911c59e96ddb6da0681d6 /tokio/src/time
parent6cf1a5b6b8686e5bde107d072d77199aaefcb2ec (diff)
timer: improve memory ordering in Inner's increment (#2107)
This commit improves the memory ordering in the implementation of Inner's increment function. The former code did a sequentially consistent load of self.num, then entered a loop with a sequentially consistent compare and swap on the same, bailing out with and Err only if the loaded value was MAX_TIMEOUTS. The use of SeqCst means that all threads must observe all relevant memory operations in the same order, implying synchronization between all CPUs. This commit adjusts the implementation in two key ways. First, the initial load of self.num is now down with Relaxed ordering. If two threads entered this code simultaneously, formerly, tokio required that one proceed before the other, negating their parallelism. Now, either thread may proceed without coordination. Second, the SeqCst compare_and_swap is changed to a Release, Relaxed compare_exchange_weak. The first memory ordering referrs to success: if the value is swapped the load of that value for comparison will be Relaxed and the store will be Release. The second memory ordering referrs to failure: if the value is not swapped the load is Relaxed. The _weak variant may spuriously fail but will generate better code. These changes mean that it is possible for more loops to be taken per call than strictly necessary but with greater parallelism available on this operation, improved energy consumption as CPUs don't have to coordinate as much.
Diffstat (limited to 'tokio/src/time')
-rw-r--r--tokio/src/time/driver/mod.rs28
-rw-r--r--tokio/src/time/driver/tests/mod.rs55
2 files changed, 73 insertions, 10 deletions
diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs
index bb45cb7e..4616816f 100644
--- a/tokio/src/time/driver/mod.rs
+++ b/tokio/src/time/driver/mod.rs
@@ -20,7 +20,8 @@ use crate::park::{Park, Unpark};
use crate::time::{wheel, Error};
use crate::time::{Clock, Duration, Instant};
-use std::sync::atomic::Ordering::SeqCst;
+use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};
+
use std::sync::Arc;
use std::usize;
use std::{cmp, fmt};
@@ -333,28 +334,32 @@ impl Inner {
self.elapsed.load(SeqCst)
}
+ #[cfg(all(test, loom))]
+ fn num(&self, ordering: std::sync::atomic::Ordering) -> usize {
+ self.num.load(ordering)
+ }
+
/// Increments the number of active timeouts
fn increment(&self) -> Result<(), Error> {
- let mut curr = self.num.load(SeqCst);
-
+ let mut curr = self.num.load(Relaxed);
loop {
if curr == MAX_TIMEOUTS {
return Err(Error::at_capacity());
}
- let actual = self.num.compare_and_swap(curr, curr + 1, SeqCst);
-
- if curr == actual {
- return Ok(());
+ match self
+ .num
+ .compare_exchange_weak(curr, curr + 1, Release, Relaxed)
+ {
+ Ok(_) => return Ok(()),
+ Err(next) => curr = next,
}
-
- curr = actual;
}
}
/// Decrements the number of active timeouts
fn decrement(&self) {
- let prev = self.num.fetch_sub(1, SeqCst);
+ let prev = self.num.fetch_sub(1, Acquire);
debug_assert!(prev <= MAX_TIMEOUTS);
}
@@ -381,3 +386,6 @@ impl fmt::Debug for Inner {
fmt.debug_struct("Inner").finish()
}
}
+
+#[cfg(all(test, loom))]
+mod tests;
diff --git a/tokio/src/time/driver/tests/mod.rs b/tokio/src/time/driver/tests/mod.rs
new file mode 100644
index 00000000..5986d402
--- /dev/null
+++ b/tokio/src/time/driver/tests/mod.rs
@@ -0,0 +1,55 @@
+use crate::park::Unpark;
+use crate::time::driver::Inner;
+use crate::time::Instant;
+
+use loom::thread;
+
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+
+struct MockUnpark;
+
+impl Unpark for MockUnpark {
+ fn unpark(&self) {}
+}
+
+#[test]
+fn balanced_incr_and_decr() {
+ const OPS: usize = 100;
+
+ fn incr(inner: Arc<Inner>) {
+ for _ in 0..OPS {
+ inner.increment().expect("increment should not have failed");
+ thread::yield_now();
+ }
+ }
+
+ fn decr(inner: Arc<Inner>) {
+ let mut ops_performed = 0;
+ while ops_performed < OPS {
+ if inner.num(Ordering::Relaxed) > 0 {
+ ops_performed += 1;
+ inner.decrement();
+ }
+ thread::yield_now();
+ }
+ }
+
+ loom::model(|| {
+ let unpark = Box::new(MockUnpark);
+ let instant = Instant::now();
+
+ let inner = Arc::new(Inner::new(instant, unpark));
+
+ let incr_inner = inner.clone();
+ let decr_inner = inner.clone();
+
+ let incr_hndle = thread::spawn(move || incr(incr_inner));
+ let decr_hndle = thread::spawn(move || decr(decr_inner));
+
+ incr_hndle.join().expect("should never fail");
+ decr_hndle.join().expect("should never fail");
+
+ assert_eq!(inner.num(Ordering::SeqCst), 0);
+ })
+}