diff options
-rw-r--r-- | tokio/src/time/driver/mod.rs | 28 | ||||
-rw-r--r-- | tokio/src/time/driver/tests/mod.rs | 55 |
2 files changed, 73 insertions, 10 deletions
diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index bb45cb7e..4616816f 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -20,7 +20,8 @@ use crate::park::{Park, Unpark}; use crate::time::{wheel, Error}; use crate::time::{Clock, Duration, Instant}; -use std::sync::atomic::Ordering::SeqCst; +use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst}; + use std::sync::Arc; use std::usize; use std::{cmp, fmt}; @@ -333,28 +334,32 @@ impl Inner { self.elapsed.load(SeqCst) } + #[cfg(all(test, loom))] + fn num(&self, ordering: std::sync::atomic::Ordering) -> usize { + self.num.load(ordering) + } + /// Increments the number of active timeouts fn increment(&self) -> Result<(), Error> { - let mut curr = self.num.load(SeqCst); - + let mut curr = self.num.load(Relaxed); loop { if curr == MAX_TIMEOUTS { return Err(Error::at_capacity()); } - let actual = self.num.compare_and_swap(curr, curr + 1, SeqCst); - - if curr == actual { - return Ok(()); + match self + .num + .compare_exchange_weak(curr, curr + 1, Release, Relaxed) + { + Ok(_) => return Ok(()), + Err(next) => curr = next, } - - curr = actual; } } /// Decrements the number of active timeouts fn decrement(&self) { - let prev = self.num.fetch_sub(1, SeqCst); + let prev = self.num.fetch_sub(1, Acquire); debug_assert!(prev <= MAX_TIMEOUTS); } @@ -381,3 +386,6 @@ impl fmt::Debug for Inner { fmt.debug_struct("Inner").finish() } } + +#[cfg(all(test, loom))] +mod tests; diff --git a/tokio/src/time/driver/tests/mod.rs b/tokio/src/time/driver/tests/mod.rs new file mode 100644 index 00000000..5986d402 --- /dev/null +++ b/tokio/src/time/driver/tests/mod.rs @@ -0,0 +1,55 @@ +use crate::park::Unpark; +use crate::time::driver::Inner; +use crate::time::Instant; + +use loom::thread; + +use std::sync::atomic::Ordering; +use std::sync::Arc; + +struct MockUnpark; + +impl Unpark for MockUnpark { + fn unpark(&self) {} +} + +#[test] +fn balanced_incr_and_decr() { + const OPS: usize = 100; + + fn incr(inner: Arc<Inner>) { + for _ in 0..OPS { + inner.increment().expect("increment should not have failed"); + thread::yield_now(); + } + } + + fn decr(inner: Arc<Inner>) { + let mut ops_performed = 0; + while ops_performed < OPS { + if inner.num(Ordering::Relaxed) > 0 { + ops_performed += 1; + inner.decrement(); + } + thread::yield_now(); + } + } + + loom::model(|| { + let unpark = Box::new(MockUnpark); + let instant = Instant::now(); + + let inner = Arc::new(Inner::new(instant, unpark)); + + let incr_inner = inner.clone(); + let decr_inner = inner.clone(); + + let incr_hndle = thread::spawn(move || incr(incr_inner)); + let decr_hndle = thread::spawn(move || decr(decr_inner)); + + incr_hndle.join().expect("should never fail"); + decr_hndle.join().expect("should never fail"); + + assert_eq!(inner.num(Ordering::SeqCst), 0); + }) +} |