diff options
author | Carl Lerche <me@carllerche.com> | 2019-10-29 15:11:31 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-29 15:11:31 -0700 |
commit | 2b909d6805990abf0bc2a5dea9e7267ff87df704 (patch) | |
tree | de255969c720c294af754b3840efabff3e6d69a0 /tokio/src/sync/mpsc/unbounded.rs | |
parent | c62ef2d232dea1535a8e22484fa2ca083f03e903 (diff) |
sync: move into `tokio` crate (#1705)
A step towards collapsing Tokio sub crates into a single `tokio`
crate (#1318).
The sync implementation is now provided by the main `tokio` crate.
Functionality can be opted out of by using the various net related
feature flags.
Diffstat (limited to 'tokio/src/sync/mpsc/unbounded.rs')
-rw-r--r-- | tokio/src/sync/mpsc/unbounded.rs | 230 |
1 files changed, 230 insertions, 0 deletions
diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs new file mode 100644 index 00000000..5a73771e --- /dev/null +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -0,0 +1,230 @@ +use crate::sync::loom::sync::atomic::AtomicUsize; +use crate::sync::mpsc::chan; + +use std::fmt; +use std::task::{Context, Poll}; + +use std::pin::Pin; + +/// Send values to the associated `UnboundedReceiver`. +/// +/// Instances are created by the +/// [`unbounded_channel`](fn.unbounded_channel.html) function. +pub struct UnboundedSender<T> { + chan: chan::Tx<T, Semaphore>, +} + +impl<T> Clone for UnboundedSender<T> { + fn clone(&self) -> Self { + UnboundedSender { + chan: self.chan.clone(), + } + } +} + +impl<T> fmt::Debug for UnboundedSender<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("UnboundedSender") + .field("chan", &self.chan) + .finish() + } +} + +/// Receive values from the associated `UnboundedSender`. +/// +/// Instances are created by the +/// [`unbounded_channel`](fn.unbounded_channel.html) function. +pub struct UnboundedReceiver<T> { + /// The channel receiver + chan: chan::Rx<T, Semaphore>, +} + +impl<T> fmt::Debug for UnboundedReceiver<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("UnboundedReceiver") + .field("chan", &self.chan) + .finish() + } +} + +/// Error returned by the `UnboundedSender`. +#[derive(Debug)] +pub struct UnboundedSendError(()); + +/// Returned by `UnboundedSender::try_send` when the channel has been closed. +#[derive(Debug)] +pub struct UnboundedTrySendError<T>(T); + +/// Error returned by `UnboundedReceiver`. +#[derive(Debug)] +pub struct UnboundedRecvError(()); + +/// Create an unbounded mpsc channel for communicating between asynchronous +/// tasks. +/// +/// A `send` on this channel will always succeed as long as the receive half has +/// not been closed. If the receiver falls behind, messages will be arbitrarily +/// buffered. +/// +/// **Note** that the amount of available system memory is an implicit bound to +/// the channel. Using an `unbounded` channel has the ability of causing the +/// process to run out of memory. In this case, the process will be aborted. +pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { + let (tx, rx) = chan::channel(AtomicUsize::new(0)); + + let tx = UnboundedSender::new(tx); + let rx = UnboundedReceiver::new(rx); + + (tx, rx) +} + +/// No capacity +type Semaphore = AtomicUsize; + +impl<T> UnboundedReceiver<T> { + pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> { + UnboundedReceiver { chan } + } + + #[doc(hidden)] // TODO: remove + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.chan.recv(cx) + } + + /// Receive the next value for this receiver. + /// + /// `None` is returned when all `Sender` halves have dropped, indicating + /// that no further values can be sent on the channel. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::unbounded_channel(); + /// + /// tokio::spawn(async move { + /// tx.try_send("hello").unwrap(); + /// }); + /// + /// assert_eq!(Some("hello"), rx.recv().await); + /// assert_eq!(None, rx.recv().await); + /// } + /// ``` + /// + /// Values are buffered: + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::unbounded_channel(); + /// + /// tx.try_send("hello").unwrap(); + /// tx.try_send("world").unwrap(); + /// + /// assert_eq!(Some("hello"), rx.recv().await); + /// assert_eq!(Some("world"), rx.recv().await); + /// } + /// ``` + pub async fn recv(&mut self) -> Option<T> { + use futures_util::future::poll_fn; + + poll_fn(|cx| self.poll_recv(cx)).await + } + + /// Closes the receiving half of a channel, without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + self.chan.close(); + } +} + +impl<T> futures_core::Stream for UnboundedReceiver<T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.chan.recv(cx) + } +} + +impl<T> UnboundedSender<T> { + pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> { + UnboundedSender { chan } + } + + /// Attempts to send a message on this `UnboundedSender` without blocking. + pub fn try_send(&mut self, message: T) -> Result<(), UnboundedTrySendError<T>> { + self.chan.try_send(message)?; + Ok(()) + } +} + +impl<T> futures_sink::Sink<T> for UnboundedSender<T> { + type Error = UnboundedSendError; + + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + self.try_send(msg).map_err(|_| UnboundedSendError(())) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } +} + +// ===== impl UnboundedSendError ===== + +impl fmt::Display for UnboundedSendError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl ::std::error::Error for UnboundedSendError {} + +// ===== impl TrySendError ===== + +impl<T> UnboundedTrySendError<T> { + /// Get the inner value. + pub fn into_inner(self) -> T { + self.0 + } +} + +impl<T: fmt::Debug> fmt::Display for UnboundedTrySendError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl<T: fmt::Debug> ::std::error::Error for UnboundedTrySendError<T> {} + +impl<T> From<(T, chan::TrySendError)> for UnboundedTrySendError<T> { + fn from((value, err): (T, chan::TrySendError)) -> UnboundedTrySendError<T> { + assert_eq!(chan::TrySendError::Closed, err); + UnboundedTrySendError(value) + } +} + +// ===== impl UnboundedRecvError ===== + +impl fmt::Display for UnboundedRecvError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl ::std::error::Error for UnboundedRecvError {} |