summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/mpsc/unbounded.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-29 15:11:31 -0700
committerGitHub <noreply@github.com>2019-10-29 15:11:31 -0700
commit2b909d6805990abf0bc2a5dea9e7267ff87df704 (patch)
treede255969c720c294af754b3840efabff3e6d69a0 /tokio/src/sync/mpsc/unbounded.rs
parentc62ef2d232dea1535a8e22484fa2ca083f03e903 (diff)
sync: move into `tokio` crate (#1705)
A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The sync implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags.
Diffstat (limited to 'tokio/src/sync/mpsc/unbounded.rs')
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs230
1 files changed, 230 insertions, 0 deletions
diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs
new file mode 100644
index 00000000..5a73771e
--- /dev/null
+++ b/tokio/src/sync/mpsc/unbounded.rs
@@ -0,0 +1,230 @@
+use crate::sync::loom::sync::atomic::AtomicUsize;
+use crate::sync::mpsc::chan;
+
+use std::fmt;
+use std::task::{Context, Poll};
+
+use std::pin::Pin;
+
+/// Send values to the associated `UnboundedReceiver`.
+///
+/// Instances are created by the
+/// [`unbounded_channel`](fn.unbounded_channel.html) function.
+pub struct UnboundedSender<T> {
+ chan: chan::Tx<T, Semaphore>,
+}
+
+impl<T> Clone for UnboundedSender<T> {
+ fn clone(&self) -> Self {
+ UnboundedSender {
+ chan: self.chan.clone(),
+ }
+ }
+}
+
+impl<T> fmt::Debug for UnboundedSender<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("UnboundedSender")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
+/// Receive values from the associated `UnboundedSender`.
+///
+/// Instances are created by the
+/// [`unbounded_channel`](fn.unbounded_channel.html) function.
+pub struct UnboundedReceiver<T> {
+ /// The channel receiver
+ chan: chan::Rx<T, Semaphore>,
+}
+
+impl<T> fmt::Debug for UnboundedReceiver<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("UnboundedReceiver")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}
+
+/// Error returned by the `UnboundedSender`.
+#[derive(Debug)]
+pub struct UnboundedSendError(());
+
+/// Returned by `UnboundedSender::try_send` when the channel has been closed.
+#[derive(Debug)]
+pub struct UnboundedTrySendError<T>(T);
+
+/// Error returned by `UnboundedReceiver`.
+#[derive(Debug)]
+pub struct UnboundedRecvError(());
+
+/// Create an unbounded mpsc channel for communicating between asynchronous
+/// tasks.
+///
+/// A `send` on this channel will always succeed as long as the receive half has
+/// not been closed. If the receiver falls behind, messages will be arbitrarily
+/// buffered.
+///
+/// **Note** that the amount of available system memory is an implicit bound to
+/// the channel. Using an `unbounded` channel has the ability of causing the
+/// process to run out of memory. In this case, the process will be aborted.
+pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
+ let (tx, rx) = chan::channel(AtomicUsize::new(0));
+
+ let tx = UnboundedSender::new(tx);
+ let rx = UnboundedReceiver::new(rx);
+
+ (tx, rx)
+}
+
+/// No capacity
+type Semaphore = AtomicUsize;
+
+impl<T> UnboundedReceiver<T> {
+ pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> {
+ UnboundedReceiver { chan }
+ }
+
+ #[doc(hidden)] // TODO: remove
+ pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.chan.recv(cx)
+ }
+
+ /// Receive the next value for this receiver.
+ ///
+ /// `None` is returned when all `Sender` halves have dropped, indicating
+ /// that no further values can be sent on the channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, mut rx) = mpsc::unbounded_channel();
+ ///
+ /// tokio::spawn(async move {
+ /// tx.try_send("hello").unwrap();
+ /// });
+ ///
+ /// assert_eq!(Some("hello"), rx.recv().await);
+ /// assert_eq!(None, rx.recv().await);
+ /// }
+ /// ```
+ ///
+ /// Values are buffered:
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, mut rx) = mpsc::unbounded_channel();
+ ///
+ /// tx.try_send("hello").unwrap();
+ /// tx.try_send("world").unwrap();
+ ///
+ /// assert_eq!(Some("hello"), rx.recv().await);
+ /// assert_eq!(Some("world"), rx.recv().await);
+ /// }
+ /// ```
+ pub async fn recv(&mut self) -> Option<T> {
+ use futures_util::future::poll_fn;
+
+ poll_fn(|cx| self.poll_recv(cx)).await
+ }
+
+ /// Closes the receiving half of a channel, without dropping it.
+ ///
+ /// This prevents any further messages from being sent on the channel while
+ /// still enabling the receiver to drain messages that are buffered.
+ pub fn close(&mut self) {
+ self.chan.close();
+ }
+}
+
+impl<T> futures_core::Stream for UnboundedReceiver<T> {
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.chan.recv(cx)
+ }
+}
+
+impl<T> UnboundedSender<T> {
+ pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> {
+ UnboundedSender { chan }
+ }
+
+ /// Attempts to send a message on this `UnboundedSender` without blocking.
+ pub fn try_send(&mut self, message: T) -> Result<(), UnboundedTrySendError<T>> {
+ self.chan.try_send(message)?;
+ Ok(())
+ }
+}
+
+impl<T> futures_sink::Sink<T> for UnboundedSender<T> {
+ type Error = UnboundedSendError;
+
+ fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
+ self.try_send(msg).map_err(|_| UnboundedSendError(()))
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+}
+
+// ===== impl UnboundedSendError =====
+
+impl fmt::Display for UnboundedSendError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "channel closed")
+ }
+}
+
+impl ::std::error::Error for UnboundedSendError {}
+
+// ===== impl TrySendError =====
+
+impl<T> UnboundedTrySendError<T> {
+ /// Get the inner value.
+ pub fn into_inner(self) -> T {
+ self.0
+ }
+}
+
+impl<T: fmt::Debug> fmt::Display for UnboundedTrySendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "channel closed")
+ }
+}
+
+impl<T: fmt::Debug> ::std::error::Error for UnboundedTrySendError<T> {}
+
+impl<T> From<(T, chan::TrySendError)> for UnboundedTrySendError<T> {
+ fn from((value, err): (T, chan::TrySendError)) -> UnboundedTrySendError<T> {
+ assert_eq!(chan::TrySendError::Closed, err);
+ UnboundedTrySendError(value)
+ }
+}
+
+// ===== impl UnboundedRecvError =====
+
+impl fmt::Display for UnboundedRecvError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "channel closed")
+ }
+}
+
+impl ::std::error::Error for UnboundedRecvError {}