summaryrefslogtreecommitdiffstats
path: root/tokio/src/time
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/time')
-rw-r--r--tokio/src/time/driver/mod.rs28
-rw-r--r--tokio/src/time/driver/tests/mod.rs55
2 files changed, 73 insertions, 10 deletions
diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs
index bb45cb7e..4616816f 100644
--- a/tokio/src/time/driver/mod.rs
+++ b/tokio/src/time/driver/mod.rs
@@ -20,7 +20,8 @@ use crate::park::{Park, Unpark};
use crate::time::{wheel, Error};
use crate::time::{Clock, Duration, Instant};
-use std::sync::atomic::Ordering::SeqCst;
+use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};
+
use std::sync::Arc;
use std::usize;
use std::{cmp, fmt};
@@ -333,28 +334,32 @@ impl Inner {
self.elapsed.load(SeqCst)
}
+ #[cfg(all(test, loom))]
+ fn num(&self, ordering: std::sync::atomic::Ordering) -> usize {
+ self.num.load(ordering)
+ }
+
/// Increments the number of active timeouts
fn increment(&self) -> Result<(), Error> {
- let mut curr = self.num.load(SeqCst);
-
+ let mut curr = self.num.load(Relaxed);
loop {
if curr == MAX_TIMEOUTS {
return Err(Error::at_capacity());
}
- let actual = self.num.compare_and_swap(curr, curr + 1, SeqCst);
-
- if curr == actual {
- return Ok(());
+ match self
+ .num
+ .compare_exchange_weak(curr, curr + 1, Release, Relaxed)
+ {
+ Ok(_) => return Ok(()),
+ Err(next) => curr = next,
}
-
- curr = actual;
}
}
/// Decrements the number of active timeouts
fn decrement(&self) {
- let prev = self.num.fetch_sub(1, SeqCst);
+ let prev = self.num.fetch_sub(1, Acquire);
debug_assert!(prev <= MAX_TIMEOUTS);
}
@@ -381,3 +386,6 @@ impl fmt::Debug for Inner {
fmt.debug_struct("Inner").finish()
}
}
+
+#[cfg(all(test, loom))]
+mod tests;
diff --git a/tokio/src/time/driver/tests/mod.rs b/tokio/src/time/driver/tests/mod.rs
new file mode 100644
index 00000000..5986d402
--- /dev/null
+++ b/tokio/src/time/driver/tests/mod.rs
@@ -0,0 +1,55 @@
+use crate::park::Unpark;
+use crate::time::driver::Inner;
+use crate::time::Instant;
+
+use loom::thread;
+
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+
+struct MockUnpark;
+
+impl Unpark for MockUnpark {
+ fn unpark(&self) {}
+}
+
+#[test]
+fn balanced_incr_and_decr() {
+ const OPS: usize = 100;
+
+ fn incr(inner: Arc<Inner>) {
+ for _ in 0..OPS {
+ inner.increment().expect("increment should not have failed");
+ thread::yield_now();
+ }
+ }
+
+ fn decr(inner: Arc<Inner>) {
+ let mut ops_performed = 0;
+ while ops_performed < OPS {
+ if inner.num(Ordering::Relaxed) > 0 {
+ ops_performed += 1;
+ inner.decrement();
+ }
+ thread::yield_now();
+ }
+ }
+
+ loom::model(|| {
+ let unpark = Box::new(MockUnpark);
+ let instant = Instant::now();
+
+ let inner = Arc::new(Inner::new(instant, unpark));
+
+ let incr_inner = inner.clone();
+ let decr_inner = inner.clone();
+
+ let incr_hndle = thread::spawn(move || incr(incr_inner));
+ let decr_hndle = thread::spawn(move || decr(decr_inner));
+
+ incr_hndle.join().expect("should never fail");
+ decr_hndle.join().expect("should never fail");
+
+ assert_eq!(inner.num(Ordering::SeqCst), 0);
+ })
+}