summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/mpsc/bounded.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-29 15:11:31 -0700
committerGitHub <noreply@github.com>2019-10-29 15:11:31 -0700
commit2b909d6805990abf0bc2a5dea9e7267ff87df704 (patch)
treede255969c720c294af754b3840efabff3e6d69a0 /tokio/src/sync/mpsc/bounded.rs
parentc62ef2d232dea1535a8e22484fa2ca083f03e903 (diff)
sync: move into `tokio` crate (#1705)
A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The sync implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags.
Diffstat (limited to 'tokio/src/sync/mpsc/bounded.rs')
-rw-r--r--tokio/src/sync/mpsc/bounded.rs337
1 files changed, 337 insertions, 0 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
new file mode 100644
index 00000000..787dd507
--- /dev/null
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -0,0 +1,337 @@
+use crate::sync::mpsc::chan;
+use crate::sync::semaphore;
+
+use std::fmt;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Send values to the associated `Receiver`.
+///
+/// Instances are created by the [`channel`](fn.channel.html) function.
+pub struct Sender<T> {
+ chan: chan::Tx<T, Semaphore>,
+}
+
+impl<T> Clone for Sender<T> {
+ fn clone(&self) -> Self {
+ Sender {
+ chan: self.chan.clone(),
+ }
+ }
+}
+
+impl<T> fmt::Debug for Sender<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Sender")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
+/// Receive values from the associated `Sender`.
+///
+/// Instances are created by the [`channel`](fn.channel.html) function.
+pub struct Receiver<T> {
+ /// The channel receiver
+ chan: chan::Rx<T, Semaphore>,
+}
+
+impl<T> fmt::Debug for Receiver<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Receiver")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
+/// Error returned by the `Sender`.
+#[derive(Debug)]
+pub struct SendError(());
+
+/// Error returned by `Sender::try_send`.
+#[derive(Debug)]
+pub struct TrySendError<T> {
+ kind: ErrorKind,
+ value: T,
+}
+
+#[derive(Debug)]
+enum ErrorKind {
+ Closed,
+ NoCapacity,
+}
+
+/// Error returned by `Receiver`.
+#[derive(Debug)]
+pub struct RecvError(());
+
+/// Create a bounded mpsc channel for communicating between asynchronous tasks,
+/// returning the sender/receiver halves.
+///
+/// All data sent on `Sender` will become available on `Receiver` in the same
+/// order as it was sent.
+///
+/// The `Sender` can be cloned to `send` to the same channel from multiple code
+/// locations. Only one `Receiver` is supported.
+///
+/// If the `Receiver` is disconnected while trying to `send`, the `send` method
+/// will return a `SendError`. Similarly, if `Sender` is disconnected while
+/// trying to `recv`, the `recv` method will return a `RecvError`.
+///
+/// # Examples
+///
+/// ```rust
+/// use tokio::sync::mpsc;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let (mut tx, mut rx) = mpsc::channel(100);
+///
+/// tokio::spawn(async move {
+/// for i in 0..10 {
+/// if let Err(_) = tx.send(i).await {
+/// println!("receiver dropped");
+/// return;
+/// }
+/// }
+/// });
+///
+/// while let Some(i) = rx.recv().await {
+/// println!("got = {}", i);
+/// }
+/// }
+/// ```
+pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
+ assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
+ let semaphore = (semaphore::Semaphore::new(buffer), buffer);
+ let (tx, rx) = chan::channel(semaphore);
+
+ let tx = Sender::new(tx);
+ let rx = Receiver::new(rx);
+
+ (tx, rx)
+}
+
+/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
+/// representing the channel bound.
+type Semaphore = (semaphore::Semaphore, usize);
+
+impl<T> Receiver<T> {
+ pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
+ Receiver { chan }
+ }
+
+ /// Receive the next value for this receiver.
+ ///
+ /// `None` is returned when all `Sender` halves have dropped, indicating
+ /// that no further values can be sent on the channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, mut rx) = mpsc::channel(100);
+ ///
+ /// tokio::spawn(async move {
+ /// tx.send("hello").await.unwrap();
+ /// });
+ ///
+ /// assert_eq!(Some("hello"), rx.recv().await);
+ /// assert_eq!(None, rx.recv().await);
+ /// }
+ /// ```
+ ///
+ /// Values are buffered:
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, mut rx) = mpsc::channel(100);
+ ///
+ /// tx.send("hello").await.unwrap();
+ /// tx.send("world").await.unwrap();
+ ///
+ /// assert_eq!(Some("hello"), rx.recv().await);
+ /// assert_eq!(Some("world"), rx.recv().await);
+ /// }
+ /// ```
+ pub async fn recv(&mut self) -> Option<T> {
+ use futures_util::future::poll_fn;
+
+ poll_fn(|cx| self.poll_recv(cx)).await
+ }
+
+ #[doc(hidden)] // TODO: remove
+ pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.chan.recv(cx)
+ }
+
+ /// Closes the receiving half of a channel, without dropping it.
+ ///
+ /// This prevents any further messages from being sent on the channel while
+ /// still enabling the receiver to drain messages that are buffered.
+ pub fn close(&mut self) {
+ self.chan.close();
+ }
+}
+
+impl<T> futures_core::Stream for Receiver<T> {
+ type Item = T;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.get_mut().poll_recv(cx)
+ }
+}
+
+impl<T> Sender<T> {
+ pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
+ Sender { chan }
+ }
+
+ #[doc(hidden)] // TODO: remove
+ pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
+ self.chan.poll_ready(cx).map_err(|_| SendError(()))
+ }
+
+ /// Attempts to send a message on this `Sender`, returning the message
+ /// if there was an error.
+ pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
+ self.chan.try_send(message)?;
+ Ok(())
+ }
+
+ /// Send a value, waiting until there is capacity.
+ ///
+ /// # Examples
+ ///
+ /// In the following example, each call to `send` will block until the
+ /// previously sent value was received.
+ ///
+ /// ```rust
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, mut rx) = mpsc::channel(1);
+ ///
+ /// tokio::spawn(async move {
+ /// for i in 0..10 {
+ /// if let Err(_) = tx.send(i).await {
+ /// println!("receiver dropped");
+ /// return;
+ /// }
+ /// }
+ /// });
+ ///
+ /// while let Some(i) = rx.recv().await {
+ /// println!("got = {}", i);
+ /// }
+ /// }
+ /// ```
+ pub async fn send(&mut self, value: T) -> Result<(), SendError> {
+ use futures_util::future::poll_fn;
+
+ poll_fn(|cx| self.poll_ready(cx)).await?;
+
+ self.try_send(value).map_err(|_| SendError(()))
+ }
+}
+
+impl<T> futures_sink::Sink<T> for Sender<T> {
+ type Error = SendError;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Sender::poll_ready(self.get_mut(), cx)
+ }
+
+ fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
+ self.as_mut().try_send(msg).map_err(|err| {
+ assert!(err.is_full(), "call `poll_ready` before sending");
+ SendError(())
+ })
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+}
+
+// ===== impl SendError =====
+
+impl fmt::Display for SendError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "channel closed")
+ }
+}
+
+impl ::std::error::Error for SendError {}
+
+// ===== impl TrySendError =====
+
+impl<T> TrySendError<T> {
+ /// Get the inner value.
+ pub fn into_inner(self) -> T {
+ self.value
+ }
+
+ /// Did the send fail because the channel has been closed?
+ pub fn is_closed(&self) -> bool {
+ if let ErrorKind::Closed = self.kind {
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Did the send fail because the channel was at capacity?
+ pub fn is_full(&self) -> bool {
+ if let ErrorKind::NoCapacity = self.kind {
+ true
+ } else {
+ false
+ }
+ }
+}
+
+impl<T: fmt::Debug> fmt::Display for TrySendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let descr = match self.kind {
+ ErrorKind::Closed => "channel closed",
+ ErrorKind::NoCapacity => "no available capacity",
+ };
+ write!(fmt, "{}", descr)
+ }
+}
+
+impl<T: fmt::Debug> ::std::error::Error for TrySendError<T> {}
+
+impl<T> From<(T, chan::TrySendError)> for TrySendError<T> {
+ fn from((value, err): (T, chan::TrySendError)) -> TrySendError<T> {
+ TrySendError {
+ value,
+ kind: match err {
+ chan::TrySendError::Closed => ErrorKind::Closed,
+ chan::TrySendError::NoPermits => ErrorKind::NoCapacity,
+ },
+ }
+ }
+}
+
+// ===== impl RecvError =====
+
+impl fmt::Display for RecvError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "channel closed")
+ }
+}
+
+impl ::std::error::Error for RecvError {}