summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/mpsc/unbounded.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/sync/mpsc/unbounded.rs')
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs39
1 files changed, 36 insertions, 3 deletions
diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs
index 6b2ca722..59456375 100644
--- a/tokio/src/sync/mpsc/unbounded.rs
+++ b/tokio/src/sync/mpsc/unbounded.rs
@@ -73,8 +73,7 @@ impl<T> UnboundedReceiver<T> {
UnboundedReceiver { chan }
}
- #[doc(hidden)] // TODO: doc
- pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
@@ -174,7 +173,41 @@ impl<T> UnboundedSender<T> {
/// [`close`]: UnboundedReceiver::close
/// [`UnboundedReceiver`]: UnboundedReceiver
pub fn send(&self, message: T) -> Result<(), SendError<T>> {
- self.chan.send_unbounded(message)?;
+ if !self.inc_num_messages() {
+ return Err(SendError(message));
+ }
+
+ self.chan.send(message);
Ok(())
}
+
+ fn inc_num_messages(&self) -> bool {
+ use std::process;
+ use std::sync::atomic::Ordering::{AcqRel, Acquire};
+
+ let mut curr = self.chan.semaphore().load(Acquire);
+
+ loop {
+ if curr & 1 == 1 {
+ return false;
+ }
+
+ if curr == usize::MAX ^ 1 {
+ // Overflowed the ref count. There is no safe way to recover, so
+ // abort the process. In practice, this should never happen.
+ process::abort()
+ }
+
+ match self
+ .chan
+ .semaphore()
+ .compare_exchange(curr, curr + 2, AcqRel, Acquire)
+ {
+ Ok(_) => return true,
+ Err(actual) => {
+ curr = actual;
+ }
+ }
+ }
+ }
}