From 2b909d6805990abf0bc2a5dea9e7267ff87df704 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 29 Oct 2019 15:11:31 -0700 Subject: sync: move into `tokio` crate (#1705) A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The sync implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags. --- tokio/src/sync/mpsc/unbounded.rs | 230 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 230 insertions(+) create mode 100644 tokio/src/sync/mpsc/unbounded.rs (limited to 'tokio/src/sync/mpsc/unbounded.rs') diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs new file mode 100644 index 00000000..5a73771e --- /dev/null +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -0,0 +1,230 @@ +use crate::sync::loom::sync::atomic::AtomicUsize; +use crate::sync::mpsc::chan; + +use std::fmt; +use std::task::{Context, Poll}; + +use std::pin::Pin; + +/// Send values to the associated `UnboundedReceiver`. +/// +/// Instances are created by the +/// [`unbounded_channel`](fn.unbounded_channel.html) function. +pub struct UnboundedSender { + chan: chan::Tx, +} + +impl Clone for UnboundedSender { + fn clone(&self) -> Self { + UnboundedSender { + chan: self.chan.clone(), + } + } +} + +impl fmt::Debug for UnboundedSender { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("UnboundedSender") + .field("chan", &self.chan) + .finish() + } +} + +/// Receive values from the associated `UnboundedSender`. +/// +/// Instances are created by the +/// [`unbounded_channel`](fn.unbounded_channel.html) function. +pub struct UnboundedReceiver { + /// The channel receiver + chan: chan::Rx, +} + +impl fmt::Debug for UnboundedReceiver { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("UnboundedReceiver") + .field("chan", &self.chan) + .finish() + } +} + +/// Error returned by the `UnboundedSender`. +#[derive(Debug)] +pub struct UnboundedSendError(()); + +/// Returned by `UnboundedSender::try_send` when the channel has been closed. +#[derive(Debug)] +pub struct UnboundedTrySendError(T); + +/// Error returned by `UnboundedReceiver`. +#[derive(Debug)] +pub struct UnboundedRecvError(()); + +/// Create an unbounded mpsc channel for communicating between asynchronous +/// tasks. +/// +/// A `send` on this channel will always succeed as long as the receive half has +/// not been closed. If the receiver falls behind, messages will be arbitrarily +/// buffered. +/// +/// **Note** that the amount of available system memory is an implicit bound to +/// the channel. Using an `unbounded` channel has the ability of causing the +/// process to run out of memory. In this case, the process will be aborted. +pub fn unbounded_channel() -> (UnboundedSender, UnboundedReceiver) { + let (tx, rx) = chan::channel(AtomicUsize::new(0)); + + let tx = UnboundedSender::new(tx); + let rx = UnboundedReceiver::new(rx); + + (tx, rx) +} + +/// No capacity +type Semaphore = AtomicUsize; + +impl UnboundedReceiver { + pub(crate) fn new(chan: chan::Rx) -> UnboundedReceiver { + UnboundedReceiver { chan } + } + + #[doc(hidden)] // TODO: remove + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + self.chan.recv(cx) + } + + /// Receive the next value for this receiver. + /// + /// `None` is returned when all `Sender` halves have dropped, indicating + /// that no further values can be sent on the channel. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::unbounded_channel(); + /// + /// tokio::spawn(async move { + /// tx.try_send("hello").unwrap(); + /// }); + /// + /// assert_eq!(Some("hello"), rx.recv().await); + /// assert_eq!(None, rx.recv().await); + /// } + /// ``` + /// + /// Values are buffered: + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::unbounded_channel(); + /// + /// tx.try_send("hello").unwrap(); + /// tx.try_send("world").unwrap(); + /// + /// assert_eq!(Some("hello"), rx.recv().await); + /// assert_eq!(Some("world"), rx.recv().await); + /// } + /// ``` + pub async fn recv(&mut self) -> Option { + use futures_util::future::poll_fn; + + poll_fn(|cx| self.poll_recv(cx)).await + } + + /// Closes the receiving half of a channel, without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + self.chan.close(); + } +} + +impl futures_core::Stream for UnboundedReceiver { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.chan.recv(cx) + } +} + +impl UnboundedSender { + pub(crate) fn new(chan: chan::Tx) -> UnboundedSender { + UnboundedSender { chan } + } + + /// Attempts to send a message on this `UnboundedSender` without blocking. + pub fn try_send(&mut self, message: T) -> Result<(), UnboundedTrySendError> { + self.chan.try_send(message)?; + Ok(()) + } +} + +impl futures_sink::Sink for UnboundedSender { + type Error = UnboundedSendError; + + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + self.try_send(msg).map_err(|_| UnboundedSendError(())) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +// ===== impl UnboundedSendError ===== + +impl fmt::Display for UnboundedSendError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl ::std::error::Error for UnboundedSendError {} + +// ===== impl TrySendError ===== + +impl UnboundedTrySendError { + /// Get the inner value. + pub fn into_inner(self) -> T { + self.0 + } +} + +impl fmt::Display for UnboundedTrySendError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl ::std::error::Error for UnboundedTrySendError {} + +impl From<(T, chan::TrySendError)> for UnboundedTrySendError { + fn from((value, err): (T, chan::TrySendError)) -> UnboundedTrySendError { + assert_eq!(chan::TrySendError::Closed, err); + UnboundedTrySendError(value) + } +} + +// ===== impl UnboundedRecvError ===== + +impl fmt::Display for UnboundedRecvError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl ::std::error::Error for UnboundedRecvError {} -- cgit v1.2.3