diff options
author | Carl Lerche <me@carllerche.com> | 2020-09-24 17:26:38 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-24 17:26:38 -0700 |
commit | cf025ba45f68934ae2138bb75ee2a5ee50506d1b (patch) | |
tree | 39fa03f4b063402e84da4435ebca39bd21266ad2 /tokio/src/sync/mpsc/unbounded.rs | |
parent | 4186b0aa38abbec7670d53882d5cdfd4b12add5c (diff) |
sync: support mpsc send with `&self` (#2861)
Updates the mpsc channel to use the intrusive waker based sempahore.
This enables using `Sender` with `&self`.
Instead of using `Sender::poll_ready` to ensure capacity and updating
the `Sender` state, `async fn Sender::reserve()` is added. This function
returns a `Permit` value representing the reserved capacity.
Fixes: #2637
Refs: #2718 (intrusive waiters)
Diffstat (limited to 'tokio/src/sync/mpsc/unbounded.rs')
-rw-r--r-- | tokio/src/sync/mpsc/unbounded.rs | 39 |
1 files changed, 36 insertions, 3 deletions
diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 6b2ca722..59456375 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -73,8 +73,7 @@ impl<T> UnboundedReceiver<T> { UnboundedReceiver { chan } } - #[doc(hidden)] // TODO: doc - pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { + fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { self.chan.recv(cx) } @@ -174,7 +173,41 @@ impl<T> UnboundedSender<T> { /// [`close`]: UnboundedReceiver::close /// [`UnboundedReceiver`]: UnboundedReceiver pub fn send(&self, message: T) -> Result<(), SendError<T>> { - self.chan.send_unbounded(message)?; + if !self.inc_num_messages() { + return Err(SendError(message)); + } + + self.chan.send(message); Ok(()) } + + fn inc_num_messages(&self) -> bool { + use std::process; + use std::sync::atomic::Ordering::{AcqRel, Acquire}; + + let mut curr = self.chan.semaphore().load(Acquire); + + loop { + if curr & 1 == 1 { + return false; + } + + if curr == usize::MAX ^ 1 { + // Overflowed the ref count. There is no safe way to recover, so + // abort the process. In practice, this should never happen. + process::abort() + } + + match self + .chan + .semaphore() + .compare_exchange(curr, curr + 2, AcqRel, Acquire) + { + Ok(_) => return true, + Err(actual) => { + curr = actual; + } + } + } + } } |