diff options
author | Carl Lerche <me@carllerche.com> | 2020-05-06 07:37:44 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-06 07:37:44 -0700 |
commit | cc8a6625982b5fc0694d05b4e9fb7d6a592702a1 (patch) | |
tree | 5f946da38fb9931f08998f0eb472828757ad3a8e | |
parent | 264ae3bdb22004609de45b67e2890081bb47e5b2 (diff) |
sync: simplify the broadcast channel (#2467)
Replace an ad hoc read/write lock with RwLock. Use
The parking_lot RwLock when possible.
-rw-r--r-- | tokio/Cargo.toml | 6 | ||||
-rw-r--r-- | tokio/src/loom/std/mod.rs | 17 | ||||
-rw-r--r-- | tokio/src/loom/std/parking_lot.rs (renamed from tokio/src/loom/std/sync/pl_wrappers.rs) | 44 | ||||
-rw-r--r-- | tokio/src/sync/broadcast.rs | 215 | ||||
-rw-r--r-- | tokio/tests/sync_broadcast.rs | 2 |
5 files changed, 99 insertions, 185 deletions
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 63bb7ac0..2baa72f5 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -127,10 +127,8 @@ futures-test = "0.3.0" proptest = "0.9.4" tempfile = "3.1.0" -# loom is currently not compiling on windows. -# See: https://github.com/Xudong-Huang/generator-rs/issues/19 -[target.'cfg(not(windows))'.dev-dependencies] -loom = { version = "0.3.1", features = ["futures", "checkpoint"] } +[target.'cfg(loom)'.dev-dependencies] +loom = { version = "0.3.4", features = ["futures", "checkpoint"] } [package.metadata.docs.rs] all-features = true diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index 595bdf60..60ee56ad 100644 --- a/tokio/src/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -6,6 +6,8 @@ mod atomic_u32; mod atomic_u64; mod atomic_u8; mod atomic_usize; +#[cfg(feature = "parking_lot")] +mod parking_lot; mod unsafe_cell; pub(crate) mod cell { @@ -41,24 +43,21 @@ pub(crate) mod rand { pub(crate) mod sync { pub(crate) use std::sync::Arc; - #[cfg(feature = "parking_lot")] - mod pl_wrappers; - // Below, make sure all the feature-influenced types are exported for // internal use. Note however that some are not _currently_ named by // consuming code. #[cfg(feature = "parking_lot")] #[allow(unused_imports)] - pub(crate) use pl_wrappers::{Condvar, Mutex}; - - #[cfg(feature = "parking_lot")] - #[allow(unused_imports)] - pub(crate) use parking_lot::{MutexGuard, WaitTimeoutResult}; + pub(crate) use crate::loom::std::parking_lot::{ + Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, WaitTimeoutResult, + }; #[cfg(not(feature = "parking_lot"))] #[allow(unused_imports)] - pub(crate) use std::sync::{Condvar, Mutex, MutexGuard, WaitTimeoutResult}; + pub(crate) use std::sync::{ + Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, WaitTimeoutResult, + }; pub(crate) mod atomic { pub(crate) use crate::loom::std::atomic_ptr::AtomicPtr; diff --git a/tokio/src/loom/std/sync/pl_wrappers.rs b/tokio/src/loom/std/parking_lot.rs index 3be8ba1c..25d94af4 100644 --- a/tokio/src/loom/std/sync/pl_wrappers.rs +++ b/tokio/src/loom/std/parking_lot.rs @@ -6,25 +6,33 @@ use std::sync::{LockResult, TryLockError, TryLockResult}; use std::time::Duration; -use parking_lot as pl; +// Types that do not need wrapping +pub(crate) use parking_lot::{MutexGuard, RwLockReadGuard, RwLockWriteGuard, WaitTimeoutResult}; /// Adapter for `parking_lot::Mutex` to the `std::sync::Mutex` interface. #[derive(Debug)] -pub(crate) struct Mutex<T: ?Sized>(pl::Mutex<T>); +pub(crate) struct Mutex<T: ?Sized>(parking_lot::Mutex<T>); + +#[derive(Debug)] +pub(crate) struct RwLock<T>(parking_lot::RwLock<T>); + +/// Adapter for `parking_lot::Condvar` to the `std::sync::Condvar` interface. +#[derive(Debug)] +pub(crate) struct Condvar(parking_lot::Condvar); impl<T> Mutex<T> { #[inline] pub(crate) fn new(t: T) -> Mutex<T> { - Mutex(pl::Mutex::new(t)) + Mutex(parking_lot::Mutex::new(t)) } #[inline] - pub(crate) fn lock(&self) -> LockResult<pl::MutexGuard<'_, T>> { + pub(crate) fn lock(&self) -> LockResult<MutexGuard<'_, T>> { Ok(self.0.lock()) } #[inline] - pub(crate) fn try_lock(&self) -> TryLockResult<pl::MutexGuard<'_, T>> { + pub(crate) fn try_lock(&self) -> TryLockResult<MutexGuard<'_, T>> { match self.0.try_lock() { Some(guard) => Ok(guard), None => Err(TryLockError::WouldBlock), @@ -35,14 +43,24 @@ impl<T> Mutex<T> { // provided here as needed. } -/// Adapter for `parking_lot::Condvar` to the `std::sync::Condvar` interface. -#[derive(Debug)] -pub(crate) struct Condvar(pl::Condvar); +impl<T> RwLock<T> { + pub(crate) fn new(t: T) -> RwLock<T> { + RwLock(parking_lot::RwLock::new(t)) + } + + pub(crate) fn read(&self) -> LockResult<RwLockReadGuard<'_, T>> { + Ok(self.0.read()) + } + + pub(crate) fn write(&self) -> LockResult<RwLockWriteGuard<'_, T>> { + Ok(self.0.write()) + } +} impl Condvar { #[inline] pub(crate) fn new() -> Condvar { - Condvar(pl::Condvar::new()) + Condvar(parking_lot::Condvar::new()) } #[inline] @@ -58,8 +76,8 @@ impl Condvar { #[inline] pub(crate) fn wait<'a, T>( &self, - mut guard: pl::MutexGuard<'a, T>, - ) -> LockResult<pl::MutexGuard<'a, T>> { + mut guard: MutexGuard<'a, T>, + ) -> LockResult<MutexGuard<'a, T>> { self.0.wait(&mut guard); Ok(guard) } @@ -67,9 +85,9 @@ impl Condvar { #[inline] pub(crate) fn wait_timeout<'a, T>( &self, - mut guard: pl::MutexGuard<'a, T>, + mut guard: MutexGuard<'a, T>, timeout: Duration, - ) -> LockResult<(pl::MutexGuard<'a, T>, pl::WaitTimeoutResult)> { + ) -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> { let wtr = self.0.wait_for(&mut guard, timeout); Ok((guard, wtr)) } diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index abc4974a..9873dcb7 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -110,11 +110,10 @@ use crate::loom::cell::UnsafeCell; use crate::loom::future::AtomicWaker; -use crate::loom::sync::atomic::{spin_loop_hint, AtomicBool, AtomicPtr, AtomicUsize}; -use crate::loom::sync::{Arc, Condvar, Mutex}; +use crate::loom::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize}; +use crate::loom::sync::{Arc, Mutex, RwLock, RwLockReadGuard}; use std::fmt; -use std::mem; use std::ptr; use std::sync::atomic::Ordering::SeqCst; use std::task::{Context, Poll, Waker}; @@ -247,7 +246,7 @@ pub enum TryRecvError { /// Data shared between senders and receivers struct Shared<T> { /// slots in the channel - buffer: Box<[Slot<T>]>, + buffer: Box<[RwLock<Slot<T>>]>, /// Mask a position -> index mask: usize, @@ -255,9 +254,6 @@ struct Shared<T> { /// Tail of the queue tail: Mutex<Tail>, - /// Notifies a sender that the slot is unlocked - condvar: Condvar, - /// Stack of pending waiters wait_stack: AtomicPtr<WaitNode>, @@ -282,23 +278,21 @@ struct Slot<T> { /// Remaining number of receivers that are expected to see this value. /// /// When this goes to zero, the value is released. + /// + /// An atomic is used as it is mutated concurrently with the slot read lock + /// acquired. rem: AtomicUsize, - /// Used to lock the `write` field. - lock: AtomicUsize, - - /// The value being broadcast - /// - /// Synchronized by `state` - write: Write<T>, -} + /// Uniquely identifies the `send` stored in the slot + pos: u64, -/// A write in the buffer -struct Write<T> { - /// Uniquely identifies this write - pos: UnsafeCell<u64>, + /// True signals the channel is closed. + closed: bool, - /// The written value + /// The value being broadcast. + /// + /// The value is set by `send` when the write lock is held. When a reader + /// drops, `rem` is decremented. When it hits zero, the value is dropped. val: UnsafeCell<Option<T>>, } @@ -316,16 +310,11 @@ struct WaitNode { } struct RecvGuard<'a, T> { - slot: &'a Slot<T>, - tail: &'a Mutex<Tail>, - condvar: &'a Condvar, + slot: RwLockReadGuard<'a, Slot<T>>, } /// Max number of receivers. Reserve space to lock. const MAX_RECEIVERS: usize = usize::MAX >> 2; -const CLOSED: usize = 1; -const WRITER: usize = 2; -const READER: usize = 4; /// Create a bounded, multi-producer, multi-consumer channel where each sent /// value is broadcasted to all active receivers. @@ -382,14 +371,12 @@ pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) { let mut buffer = Vec::with_capacity(capacity); for i in 0..capacity { - buffer.push(Slot { + buffer.push(RwLock::new(Slot { rem: AtomicUsize::new(0), - lock: AtomicUsize::new(0), - write: Write { - pos: UnsafeCell::new((i as u64).wrapping_sub(capacity as u64)), - val: UnsafeCell::new(None), - }, - }); + pos: (i as u64).wrapping_sub(capacity as u64), + closed: false, + val: UnsafeCell::new(None), + })); } let shared = Arc::new(Shared { @@ -400,7 +387,6 @@ pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) { rx_cnt: 1, closed: false, }), - condvar: Condvar::new(), wait_stack: AtomicPtr::new(ptr::null_mut()), num_tx: AtomicUsize::new(1), }); @@ -587,46 +573,25 @@ impl<T> Sender<T> { tail.pos = tail.pos.wrapping_add(1); // Get the slot - let slot = &self.shared.buffer[idx]; - - // Acquire the write lock - let mut prev = slot.lock.fetch_or(WRITER, SeqCst); + let mut slot = self.shared.buffer[idx].write().unwrap(); - while prev & !WRITER != 0 { - // Concurrent readers, we must go to sleep - tail = self.shared.condvar.wait(tail).unwrap(); - - prev = slot.lock.load(SeqCst); - - if prev & WRITER == 0 { - // The writer lock bit was cleared while this thread was - // sleeping. This can only happen if a newer write happened on - // this slot by another thread. Bail early as an optimization, - // there is nothing left to do. - return Ok(rem); - } - } - - if tail.pos.wrapping_sub(pos) > self.shared.buffer.len() as u64 { - // There is a newer pending write to the same slot. - return Ok(rem); - } - - // Slot lock acquired - slot.write.pos.with_mut(|ptr| unsafe { *ptr = pos }); + // Track the position + slot.pos = pos; // Set remaining receivers - slot.rem.store(rem, SeqCst); + slot.rem.with_mut(|v| *v = rem); // Set the closed bit if the value is `None`; otherwise write the value if value.is_none() { tail.closed = true; - slot.lock.store(CLOSED, SeqCst); + slot.closed = true; } else { - slot.write.val.with_mut(|ptr| unsafe { *ptr = value }); - slot.lock.store(0, SeqCst); + slot.val.with_mut(|ptr| unsafe { *ptr = value }); } + // Release the slot lock before the tail lock + drop(slot); + // Release the mutex. This must happen after the slot lock is released, // otherwise the writer lock bit could be cleared while another thread // is in the critical section. @@ -675,42 +640,32 @@ impl<T> Drop for Sender<T> { impl<T> Receiver<T> { /// Locks the next value if there is one. - /// - /// The caller is responsible for unlocking - fn recv_ref(&mut self, spin: bool) -> Result<RecvGuard<'_, T>, TryRecvError> { + fn recv_ref(&mut self) -> Result<RecvGuard<'_, T>, TryRecvError> { let idx = (self.next & self.shared.mask as u64) as usize; // The slot holding the next value to read - let slot = &self.shared.buffer[idx]; - - // Lock the slot - if !slot.try_rx_lock() { - if spin { - while !slot.try_rx_lock() { - spin_loop_hint(); - } - } else { - return Err(TryRecvError::Empty); - } - } - - let guard = RecvGuard { - slot, - tail: &self.shared.tail, - condvar: &self.shared.condvar, - }; - - if guard.pos() != self.next { - let pos = guard.pos(); + let mut slot = self.shared.buffer[idx].read().unwrap(); + if slot.pos != self.next { // The receiver has read all current values in the channel - if pos.wrapping_add(self.shared.buffer.len() as u64) == self.next { - guard.drop_no_rem_dec(); + if slot.pos.wrapping_add(self.shared.buffer.len() as u64) == self.next { return Err(TryRecvError::Empty); } + // Release the `slot` lock before attempting to acquire the `tail` + // lock. This is required because `send2` acquires the tail lock + // first followed by the slot lock. Acquiring the locks in reverse + // order here would result in a potential deadlock: `recv_ref` + // acquires the `slot` lock and attempts to acquire the `tail` lock + // while `send2` acquired the `tail` lock and attempts to acquire + // the slot lock. + drop(slot); + let tail = self.shared.tail.lock().unwrap(); + // Acquire slot lock again + slot = self.shared.buffer[idx].read().unwrap(); + // `tail.pos` points to the slot that the **next** send writes to. If // the channel is closed, the previous slot is the oldest value. let mut adjust = 0; @@ -728,10 +683,10 @@ impl<T> Receiver<T> { // The receiver is slow but no values have been missed if missed == 0 { self.next = self.next.wrapping_add(1); - return Ok(guard); + + return Ok(RecvGuard { slot }); } - guard.drop_no_rem_dec(); self.next = next; return Err(TryRecvError::Lagged(missed)); @@ -739,17 +694,11 @@ impl<T> Receiver<T> { self.next = self.next.wrapping_add(1); - // If the `CLOSED` bit it set on the slot, the channel is closed - // - // `try_rx_lock` could check for this and bail early. If it's return - // value was changed to represent the state of the lock, it could - // match on being closed, empty, or available for reading. - if slot.lock.load(SeqCst) & CLOSED == CLOSED { - guard.drop_no_rem_dec(); + if slot.closed { return Err(TryRecvError::Closed); } - Ok(guard) + Ok(RecvGuard { slot }) } } @@ -797,7 +746,7 @@ where /// } /// ``` pub fn try_recv(&mut self) -> Result<T, TryRecvError> { - let guard = self.recv_ref(false)?; + let guard = self.recv_ref()?; guard.clone_value().ok_or(TryRecvError::Closed) } @@ -947,7 +896,7 @@ impl<T> Drop for Receiver<T> { drop(tail); while self.next != until { - match self.recv_ref(true) { + match self.recv_ref() { Ok(_) => {} // The channel is closed Err(TryRecvError::Closed) => break, @@ -984,72 +933,22 @@ impl<T> fmt::Debug for Receiver<T> { } } -impl<T> Slot<T> { - /// Tries to lock the slot for a receiver. If `false`, then a sender holds the - /// lock and the calling task will be notified once the sender has released - /// the lock. - fn try_rx_lock(&self) -> bool { - let mut curr = self.lock.load(SeqCst); - - loop { - if curr & WRITER == WRITER { - // Locked by sender - return false; - } - - // Only increment (by `READER`) if the `WRITER` bit is not set. - let res = self - .lock - .compare_exchange(curr, curr + READER, SeqCst, SeqCst); - - match res { - Ok(_) => return true, - Err(actual) => curr = actual, - } - } - } - - fn rx_unlock(&self, tail: &Mutex<Tail>, condvar: &Condvar, rem_dec: bool) { - if rem_dec { - // Decrement the remaining counter - if 1 == self.rem.fetch_sub(1, SeqCst) { - // Last receiver, drop the value - self.write.val.with_mut(|ptr| unsafe { *ptr = None }); - } - } - - if WRITER == self.lock.fetch_sub(READER, SeqCst) - READER { - // First acquire the lock to make sure our sender is waiting on the - // condition variable, otherwise the notification could be lost. - mem::drop(tail.lock().unwrap()); - // Wake up senders - condvar.notify_all(); - } - } -} - impl<'a, T> RecvGuard<'a, T> { - fn pos(&self) -> u64 { - self.slot.write.pos.with(|ptr| unsafe { *ptr }) - } - fn clone_value(&self) -> Option<T> where T: Clone, { - self.slot.write.val.with(|ptr| unsafe { (*ptr).clone() }) - } - - fn drop_no_rem_dec(self) { - self.slot.rx_unlock(self.tail, self.condvar, false); - - mem::forget(self); + self.slot.val.with(|ptr| unsafe { (*ptr).clone() }) } } impl<'a, T> Drop for RecvGuard<'a, T> { fn drop(&mut self) { - self.slot.rx_unlock(self.tail, self.condvar, true) + // Decrement the remaining counter + if 1 == self.slot.rem.fetch_sub(1, SeqCst) { + // Safety: Last receiver, drop the value + self.slot.val.with_mut(|ptr| unsafe { *ptr = None }); + } } } diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index 4fb7c0aa..4d756f91 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -49,7 +49,7 @@ macro_rules! assert_closed { }; } -trait AssertSend: Send {} +trait AssertSend: Send + Sync {} impl AssertSend for broadcast::Sender<i32> {} impl AssertSend for broadcast::Receiver<i32> {} |