diff options
author | Carl Lerche <me@carllerche.com> | 2019-10-29 15:11:31 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-29 15:11:31 -0700 |
commit | 2b909d6805990abf0bc2a5dea9e7267ff87df704 (patch) | |
tree | de255969c720c294af754b3840efabff3e6d69a0 /tokio/src/sync/mpsc/bounded.rs | |
parent | c62ef2d232dea1535a8e22484fa2ca083f03e903 (diff) |
sync: move into `tokio` crate (#1705)
A step towards collapsing Tokio sub crates into a single `tokio`
crate (#1318).
The sync implementation is now provided by the main `tokio` crate.
Functionality can be opted out of by using the various net related
feature flags.
Diffstat (limited to 'tokio/src/sync/mpsc/bounded.rs')
-rw-r--r-- | tokio/src/sync/mpsc/bounded.rs | 337 |
1 files changed, 337 insertions, 0 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs new file mode 100644 index 00000000..787dd507 --- /dev/null +++ b/tokio/src/sync/mpsc/bounded.rs @@ -0,0 +1,337 @@ +use crate::sync::mpsc::chan; +use crate::sync::semaphore; + +use std::fmt; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Send values to the associated `Receiver`. +/// +/// Instances are created by the [`channel`](fn.channel.html) function. +pub struct Sender<T> { + chan: chan::Tx<T, Semaphore>, +} + +impl<T> Clone for Sender<T> { + fn clone(&self) -> Self { + Sender { + chan: self.chan.clone(), + } + } +} + +impl<T> fmt::Debug for Sender<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Sender") + .field("chan", &self.chan) + .finish() + } +} + +/// Receive values from the associated `Sender`. +/// +/// Instances are created by the [`channel`](fn.channel.html) function. +pub struct Receiver<T> { + /// The channel receiver + chan: chan::Rx<T, Semaphore>, +} + +impl<T> fmt::Debug for Receiver<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Receiver") + .field("chan", &self.chan) + .finish() + } +} + +/// Error returned by the `Sender`. +#[derive(Debug)] +pub struct SendError(()); + +/// Error returned by `Sender::try_send`. +#[derive(Debug)] +pub struct TrySendError<T> { + kind: ErrorKind, + value: T, +} + +#[derive(Debug)] +enum ErrorKind { + Closed, + NoCapacity, +} + +/// Error returned by `Receiver`. +#[derive(Debug)] +pub struct RecvError(()); + +/// Create a bounded mpsc channel for communicating between asynchronous tasks, +/// returning the sender/receiver halves. +/// +/// All data sent on `Sender` will become available on `Receiver` in the same +/// order as it was sent. +/// +/// The `Sender` can be cloned to `send` to the same channel from multiple code +/// locations. Only one `Receiver` is supported. +/// +/// If the `Receiver` is disconnected while trying to `send`, the `send` method +/// will return a `SendError`. Similarly, if `Sender` is disconnected while +/// trying to `recv`, the `recv` method will return a `RecvError`. +/// +/// # Examples +/// +/// ```rust +/// use tokio::sync::mpsc; +/// +/// #[tokio::main] +/// async fn main() { +/// let (mut tx, mut rx) = mpsc::channel(100); +/// +/// tokio::spawn(async move { +/// for i in 0..10 { +/// if let Err(_) = tx.send(i).await { +/// println!("receiver dropped"); +/// return; +/// } +/// } +/// }); +/// +/// while let Some(i) = rx.recv().await { +/// println!("got = {}", i); +/// } +/// } +/// ``` +pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { + assert!(buffer > 0, "mpsc bounded channel requires buffer > 0"); + let semaphore = (semaphore::Semaphore::new(buffer), buffer); + let (tx, rx) = chan::channel(semaphore); + + let tx = Sender::new(tx); + let rx = Receiver::new(rx); + + (tx, rx) +} + +/// Channel semaphore is a tuple of the semaphore implementation and a `usize` +/// representing the channel bound. +type Semaphore = (semaphore::Semaphore, usize); + +impl<T> Receiver<T> { + pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> { + Receiver { chan } + } + + /// Receive the next value for this receiver. + /// + /// `None` is returned when all `Sender` halves have dropped, indicating + /// that no further values can be sent on the channel. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::channel(100); + /// + /// tokio::spawn(async move { + /// tx.send("hello").await.unwrap(); + /// }); + /// + /// assert_eq!(Some("hello"), rx.recv().await); + /// assert_eq!(None, rx.recv().await); + /// } + /// ``` + /// + /// Values are buffered: + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::channel(100); + /// + /// tx.send("hello").await.unwrap(); + /// tx.send("world").await.unwrap(); + /// + /// assert_eq!(Some("hello"), rx.recv().await); + /// assert_eq!(Some("world"), rx.recv().await); + /// } + /// ``` + pub async fn recv(&mut self) -> Option<T> { + use futures_util::future::poll_fn; + + poll_fn(|cx| self.poll_recv(cx)).await + } + + #[doc(hidden)] // TODO: remove + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.chan.recv(cx) + } + + /// Closes the receiving half of a channel, without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + self.chan.close(); + } +} + +impl<T> futures_core::Stream for Receiver<T> { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.get_mut().poll_recv(cx) + } +} + +impl<T> Sender<T> { + pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> { + Sender { chan } + } + + #[doc(hidden)] // TODO: remove + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { + self.chan.poll_ready(cx).map_err(|_| SendError(())) + } + + /// Attempts to send a message on this `Sender`, returning the message + /// if there was an error. + pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> { + self.chan.try_send(message)?; + Ok(()) + } + + /// Send a value, waiting until there is capacity. + /// + /// # Examples + /// + /// In the following example, each call to `send` will block until the + /// previously sent value was received. + /// + /// ```rust + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::channel(1); + /// + /// tokio::spawn(async move { + /// for i in 0..10 { + /// if let Err(_) = tx.send(i).await { + /// println!("receiver dropped"); + /// return; + /// } + /// } + /// }); + /// + /// while let Some(i) = rx.recv().await { + /// println!("got = {}", i); + /// } + /// } + /// ``` + pub async fn send(&mut self, value: T) -> Result<(), SendError> { + use futures_util::future::poll_fn; + + poll_fn(|cx| self.poll_ready(cx)).await?; + + self.try_send(value).map_err(|_| SendError(())) + } +} + +impl<T> futures_sink::Sink<T> for Sender<T> { + type Error = SendError; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Sender::poll_ready(self.get_mut(), cx) + } + + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + self.as_mut().try_send(msg).map_err(|err| { + assert!(err.is_full(), "call `poll_ready` before sending"); + SendError(()) + }) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } +} + +// ===== impl SendError ===== + +impl fmt::Display for SendError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl ::std::error::Error for SendError {} + +// ===== impl TrySendError ===== + +impl<T> TrySendError<T> { + /// Get the inner value. + pub fn into_inner(self) -> T { + self.value + } + + /// Did the send fail because the channel has been closed? + pub fn is_closed(&self) -> bool { + if let ErrorKind::Closed = self.kind { + true + } else { + false + } + } + + /// Did the send fail because the channel was at capacity? + pub fn is_full(&self) -> bool { + if let ErrorKind::NoCapacity = self.kind { + true + } else { + false + } + } +} + +impl<T: fmt::Debug> fmt::Display for TrySendError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + let descr = match self.kind { + ErrorKind::Closed => "channel closed", + ErrorKind::NoCapacity => "no available capacity", + }; + write!(fmt, "{}", descr) + } +} + +impl<T: fmt::Debug> ::std::error::Error for TrySendError<T> {} + +impl<T> From<(T, chan::TrySendError)> for TrySendError<T> { + fn from((value, err): (T, chan::TrySendError)) -> TrySendError<T> { + TrySendError { + value, + kind: match err { + chan::TrySendError::Closed => ErrorKind::Closed, + chan::TrySendError::NoPermits => ErrorKind::NoCapacity, + }, + } + } +} + +// ===== impl RecvError ===== + +impl fmt::Display for RecvError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl ::std::error::Error for RecvError {} |