diff options
author | Kevin Leimkuhler <kevin@kleimkuhler.com> | 2020-04-27 21:04:47 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-27 21:04:47 -0700 |
commit | a81958484941ddcc2f1955fb6873c827f694ec9b (patch) | |
tree | 0cfa52747f808e3dfb7e3161c2096ece237670f3 /tokio/src/sync/broadcast.rs | |
parent | 70ed3c7f0436079d798306820918d026819cb73d (diff) |
sync: fix slow receivers in broadcast (#2448)
Broadcast uses a ring buffer to store values sent to the channel. In order to
deal with slow receivers, the oldest values are overwritten with new values
once the buffer wraps. A receiver should be able to calculate how many values
it has missed.
Additionally, when the broadcast closes, a final value of `None` is sent to
the channel. If the buffer has wrapped, this value overwrites the oldest
value.
This is an issue mainly in a single capacity broadcast when a value is sent
and then the sender is dropped. The original value is immediately overwritten
with `None` meaning that receivers assume they have lagged behind.
**Solution**
A value of `None` is no longer sent to the channel when the final sender has
been dropped. This solves the single capacity broadcast case by completely
removing the behavior of overwriting values when the channel is closed.
Now, when the final sender is dropped a closed bit is set on the next slot
that the channel is supposed to send to.
In the case of a fast receiver, if it finds a slot where the closed bit is
set, it knows the channel is closed without locking the tail.
In the case of a slow receiver, it must first find out if it has missed any
values. This is similar to before, but must be able to account for channel
closure.
If the channel is not closed, the oldest value may be located at index `n`. If
the channel is closed, the oldest value is located at index `n - 1`.
Knowing the index where the oldest value is located, a receiver can calculate
how many values it may have missed and starts to catch up.
Closes #2425
Diffstat (limited to 'tokio/src/sync/broadcast.rs')
-rw-r--r-- | tokio/src/sync/broadcast.rs | 90 |
1 files changed, 65 insertions, 25 deletions
diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 05a58070..abc4974a 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -272,6 +272,9 @@ struct Tail { /// Number of active receivers rx_cnt: usize, + + /// True if the channel is closed + closed: bool, } /// Slot in the buffer @@ -319,7 +322,10 @@ struct RecvGuard<'a, T> { } /// Max number of receivers. Reserve space to lock. -const MAX_RECEIVERS: usize = usize::MAX >> 1; +const MAX_RECEIVERS: usize = usize::MAX >> 2; +const CLOSED: usize = 1; +const WRITER: usize = 2; +const READER: usize = 4; /// Create a bounded, multi-producer, multi-consumer channel where each sent /// value is broadcasted to all active receivers. @@ -389,7 +395,11 @@ pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) { let shared = Arc::new(Shared { buffer: buffer.into_boxed_slice(), mask: capacity - 1, - tail: Mutex::new(Tail { pos: 0, rx_cnt: 1 }), + tail: Mutex::new(Tail { + pos: 0, + rx_cnt: 1, + closed: false, + }), condvar: Condvar::new(), wait_stack: AtomicPtr::new(ptr::null_mut()), num_tx: AtomicUsize::new(1), @@ -580,15 +590,15 @@ impl<T> Sender<T> { let slot = &self.shared.buffer[idx]; // Acquire the write lock - let mut prev = slot.lock.fetch_or(1, SeqCst); + let mut prev = slot.lock.fetch_or(WRITER, SeqCst); - while prev & !1 != 0 { + while prev & !WRITER != 0 { // Concurrent readers, we must go to sleep tail = self.shared.condvar.wait(tail).unwrap(); prev = slot.lock.load(SeqCst); - if prev & 1 == 0 { + if prev & WRITER == 0 { // The writer lock bit was cleared while this thread was // sleeping. This can only happen if a newer write happened on // this slot by another thread. Bail early as an optimization, @@ -604,13 +614,18 @@ impl<T> Sender<T> { // Slot lock acquired slot.write.pos.with_mut(|ptr| unsafe { *ptr = pos }); - slot.write.val.with_mut(|ptr| unsafe { *ptr = value }); // Set remaining receivers slot.rem.store(rem, SeqCst); - // Release the slot lock - slot.lock.store(0, SeqCst); + // Set the closed bit if the value is `None`; otherwise write the value + if value.is_none() { + tail.closed = true; + slot.lock.store(CLOSED, SeqCst); + } else { + slot.write.val.with_mut(|ptr| unsafe { *ptr = value }); + slot.lock.store(0, SeqCst); + } // Release the mutex. This must happen after the slot lock is released, // otherwise the writer lock bit could be cleared while another thread @@ -688,28 +703,52 @@ impl<T> Receiver<T> { if guard.pos() != self.next { let pos = guard.pos(); - guard.drop_no_rem_dec(); - + // The receiver has read all current values in the channel if pos.wrapping_add(self.shared.buffer.len() as u64) == self.next { + guard.drop_no_rem_dec(); return Err(TryRecvError::Empty); - } else { - let tail = self.shared.tail.lock().unwrap(); + } - // `tail.pos` points to the slot the **next** send writes to. - // Because a receiver is lagging, this slot also holds the - // oldest value. To make the positions match, we subtract the - // capacity. - let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64); - let missed = next.wrapping_sub(self.next); + let tail = self.shared.tail.lock().unwrap(); - self.next = next; + // `tail.pos` points to the slot that the **next** send writes to. If + // the channel is closed, the previous slot is the oldest value. + let mut adjust = 0; + if tail.closed { + adjust = 1 + } + let next = tail + .pos + .wrapping_sub(self.shared.buffer.len() as u64 + adjust); - return Err(TryRecvError::Lagged(missed)); + let missed = next.wrapping_sub(self.next); + + drop(tail); + + // The receiver is slow but no values have been missed + if missed == 0 { + self.next = self.next.wrapping_add(1); + return Ok(guard); } + + guard.drop_no_rem_dec(); + self.next = next; + + return Err(TryRecvError::Lagged(missed)); } self.next = self.next.wrapping_add(1); + // If the `CLOSED` bit it set on the slot, the channel is closed + // + // `try_rx_lock` could check for this and bail early. If it's return + // value was changed to represent the state of the lock, it could + // match on being closed, empty, or available for reading. + if slot.lock.load(SeqCst) & CLOSED == CLOSED { + guard.drop_no_rem_dec(); + return Err(TryRecvError::Closed); + } + Ok(guard) } } @@ -909,7 +948,6 @@ impl<T> Drop for Receiver<T> { while self.next != until { match self.recv_ref(true) { - // Ignore the value Ok(_) => {} // The channel is closed Err(TryRecvError::Closed) => break, @@ -954,13 +992,15 @@ impl<T> Slot<T> { let mut curr = self.lock.load(SeqCst); loop { - if curr & 1 == 1 { + if curr & WRITER == WRITER { // Locked by sender return false; } - // Only increment (by 2) if the LSB "lock" bit is not set. - let res = self.lock.compare_exchange(curr, curr + 2, SeqCst, SeqCst); + // Only increment (by `READER`) if the `WRITER` bit is not set. + let res = self + .lock + .compare_exchange(curr, curr + READER, SeqCst, SeqCst); match res { Ok(_) => return true, @@ -978,7 +1018,7 @@ impl<T> Slot<T> { } } - if 1 == self.lock.fetch_sub(2, SeqCst) - 2 { + if WRITER == self.lock.fetch_sub(READER, SeqCst) - READER { // First acquire the lock to make sure our sender is waiting on the // condition variable, otherwise the notification could be lost. mem::drop(tail.lock().unwrap()); |