summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/mpsc/unbounded.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-09-24 17:26:38 -0700
committerGitHub <noreply@github.com>2020-09-24 17:26:38 -0700
commitcf025ba45f68934ae2138bb75ee2a5ee50506d1b (patch)
tree39fa03f4b063402e84da4435ebca39bd21266ad2 /tokio/src/sync/mpsc/unbounded.rs
parent4186b0aa38abbec7670d53882d5cdfd4b12add5c (diff)
sync: support mpsc send with `&self` (#2861)
Updates the mpsc channel to use the intrusive waker based sempahore. This enables using `Sender` with `&self`. Instead of using `Sender::poll_ready` to ensure capacity and updating the `Sender` state, `async fn Sender::reserve()` is added. This function returns a `Permit` value representing the reserved capacity. Fixes: #2637 Refs: #2718 (intrusive waiters)
Diffstat (limited to 'tokio/src/sync/mpsc/unbounded.rs')
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs39
1 files changed, 36 insertions, 3 deletions
diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs
index 6b2ca722..59456375 100644
--- a/tokio/src/sync/mpsc/unbounded.rs
+++ b/tokio/src/sync/mpsc/unbounded.rs
@@ -73,8 +73,7 @@ impl<T> UnboundedReceiver<T> {
UnboundedReceiver { chan }
}
- #[doc(hidden)] // TODO: doc
- pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
@@ -174,7 +173,41 @@ impl<T> UnboundedSender<T> {
/// [`close`]: UnboundedReceiver::close
/// [`UnboundedReceiver`]: UnboundedReceiver
pub fn send(&self, message: T) -> Result<(), SendError<T>> {
- self.chan.send_unbounded(message)?;
+ if !self.inc_num_messages() {
+ return Err(SendError(message));
+ }
+
+ self.chan.send(message);
Ok(())
}
+
+ fn inc_num_messages(&self) -> bool {
+ use std::process;
+ use std::sync::atomic::Ordering::{AcqRel, Acquire};
+
+ let mut curr = self.chan.semaphore().load(Acquire);
+
+ loop {
+ if curr & 1 == 1 {
+ return false;
+ }
+
+ if curr == usize::MAX ^ 1 {
+ // Overflowed the ref count. There is no safe way to recover, so
+ // abort the process. In practice, this should never happen.
+ process::abort()
+ }
+
+ match self
+ .chan
+ .semaphore()
+ .compare_exchange(curr, curr + 2, AcqRel, Acquire)
+ {
+ Ok(_) => return true,
+ Err(actual) => {
+ curr = actual;
+ }
+ }
+ }
+ }
}