summaryrefslogtreecommitdiffstats
path: root/tokio-channel
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2018-08-27 12:24:51 -0700
committerGitHub <noreply@github.com>2018-08-27 12:24:51 -0700
commitb479ce78d3e05f689d2a7c501942d2de67bed722 (patch)
tree487e9b625b713a766b57f7299fee70c24371c272 /tokio-channel
parent6e45e0ac61b68208310748a61f68dc6df576dbb6 (diff)
add experimental async/await support. (#582)
This patch adds experimental async/await support to Tokio. It does this by adding feature flags to existing libs only where necessary in order to add nightly specific code (mostly `Unpin` implementations). It then provides a new crate: `tokio-async-await` which is a shim layer on top of `tokio`. The `tokio-async-await` crate is expected to look exactly like `tokio` does, but with async / await support. This strategy reduces the amount of cfg guarding in the main libraries. This patch also adds `tokio-channel`, which is copied from futures-rs 0.1 and adds the necessary `Unpin` implementations. In general, futures 0.1 is mostly unmaintained, so it will make sense for Tokio to take over maintainership of key components regardless of async / await support.
Diffstat (limited to 'tokio-channel')
-rw-r--r--tokio-channel/CHANGELOG.md0
-rw-r--r--tokio-channel/Cargo.toml25
-rw-r--r--tokio-channel/LICENSE51
-rw-r--r--tokio-channel/README.md0
-rw-r--r--tokio-channel/src/async_await.rs10
-rw-r--r--tokio-channel/src/lib.rs22
-rw-r--r--tokio-channel/src/lock.rs105
-rw-r--r--tokio-channel/src/mpsc/mod.rs989
-rw-r--r--tokio-channel/src/mpsc/queue.rs151
-rw-r--r--tokio-channel/src/oneshot.rs426
-rw-r--r--tokio-channel/tests/mpsc-close.rs22
-rw-r--r--tokio-channel/tests/mpsc.rs481
-rw-r--r--tokio-channel/tests/oneshot.rs124
-rw-r--r--tokio-channel/tests/support/mod.rs16
14 files changed, 2422 insertions, 0 deletions
diff --git a/tokio-channel/CHANGELOG.md b/tokio-channel/CHANGELOG.md
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tokio-channel/CHANGELOG.md
diff --git a/tokio-channel/Cargo.toml b/tokio-channel/Cargo.toml
new file mode 100644
index 00000000..d38ac375
--- /dev/null
+++ b/tokio-channel/Cargo.toml
@@ -0,0 +1,25 @@
+[package]
+name = "tokio-channel"
+
+# When releasing to crates.io:
+# - Update html_root_url.
+# - Update CHANGELOG.md.
+# - Create "v0.1.x" git tag.
+version = "0.1.0"
+authors = ["Carl Lerche <me@carllerche.com>"]
+license = "MIT"
+repository = "https://github.com/tokio-rs/tokio"
+homepage = "https://tokio.rs"
+documentation = "https://docs.rs/tokio-channel/0.1.0"
+description = """
+Channels for asynchronous communication using Tokio.
+"""
+categories = ["asynchronous"]
+
+[features]
+# This feature comes with no promise of stability. Things will break with each
+# patch release. Use at your own risk.
+async-await-preview = []
+
+[dependencies]
+futures = "0.1.23"
diff --git a/tokio-channel/LICENSE b/tokio-channel/LICENSE
new file mode 100644
index 00000000..e0c7ffad
--- /dev/null
+++ b/tokio-channel/LICENSE
@@ -0,0 +1,51 @@
+Copyright (c) 2018 Tokio Contributors
+
+Permission is hereby granted, free of charge, to any
+person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the
+Software without restriction, including without
+limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software
+is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
+
+Copyright (c) 2016 futures-rs Contributors
+
+Permission is hereby granted, free of charge, to any
+person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the
+Software without restriction, including without
+limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software
+is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
diff --git a/tokio-channel/README.md b/tokio-channel/README.md
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tokio-channel/README.md
diff --git a/tokio-channel/src/async_await.rs b/tokio-channel/src/async_await.rs
new file mode 100644
index 00000000..ff501d68
--- /dev/null
+++ b/tokio-channel/src/async_await.rs
@@ -0,0 +1,10 @@
+use {oneshot, mpsc};
+
+use std::marker::Unpin;
+
+impl<T> Unpin for oneshot::Sender<T> {}
+impl<T> Unpin for oneshot::Receiver<T> {}
+
+impl<T> Unpin for mpsc::Sender<T> {}
+impl<T> Unpin for mpsc::UnboundedSender<T> {}
+impl<T> Unpin for mpsc::Receiver<T> {}
diff --git a/tokio-channel/src/lib.rs b/tokio-channel/src/lib.rs
new file mode 100644
index 00000000..ae385530
--- /dev/null
+++ b/tokio-channel/src/lib.rs
@@ -0,0 +1,22 @@
+#![doc(html_root_url = "https://docs.rs/tokio-channel/0.1.0")]
+#![deny(missing_docs, warnings, missing_debug_implementations)]
+#![cfg_attr(feature = "async-await-preview", feature(
+ pin,
+ ))]
+
+//! Asynchronous channels.
+//!
+//! This crate provides channels that can be used to communicate between
+//! asynchronous tasks.
+
+extern crate futures;
+
+pub mod mpsc;
+pub mod oneshot;
+
+mod lock;
+
+// ===== EXPERIMENTAL async / await support =====
+
+#[cfg(feature = "async-await-preview")]
+mod async_await;
diff --git a/tokio-channel/src/lock.rs b/tokio-channel/src/lock.rs
new file mode 100644
index 00000000..d2acf0a6
--- /dev/null
+++ b/tokio-channel/src/lock.rs
@@ -0,0 +1,105 @@
+//! A "mutex" which only supports `try_lock`
+//!
+//! As a futures library the eventual call to an event loop should be the only
+//! thing that ever blocks, so this is assisted with a fast user-space
+//! implementation of a lock that can only have a `try_lock` operation.
+
+use std::cell::UnsafeCell;
+use std::ops::{Deref, DerefMut};
+use std::sync::atomic::Ordering::SeqCst;
+use std::sync::atomic::AtomicBool;
+
+/// A "mutex" around a value, similar to `std::sync::Mutex<T>`.
+///
+/// This lock only supports the `try_lock` operation, however, and does not
+/// implement poisoning.
+#[derive(Debug)]
+pub struct Lock<T> {
+ locked: AtomicBool,
+ data: UnsafeCell<T>,
+}
+
+/// Sentinel representing an acquired lock through which the data can be
+/// accessed.
+pub struct TryLock<'a, T: 'a> {
+ __ptr: &'a Lock<T>,
+}
+
+// The `Lock` structure is basically just a `Mutex<T>`, and these two impls are
+// intended to mirror the standard library's corresponding impls for `Mutex<T>`.
+//
+// If a `T` is sendable across threads, so is the lock, and `T` must be sendable
+// across threads to be `Sync` because it allows mutable access from multiple
+// threads.
+unsafe impl<T: Send> Send for Lock<T> {}
+unsafe impl<T: Send> Sync for Lock<T> {}
+
+impl<T> Lock<T> {
+ /// Creates a new lock around the given value.
+ pub fn new(t: T) -> Lock<T> {
+ Lock {
+ locked: AtomicBool::new(false),
+ data: UnsafeCell::new(t),
+ }
+ }
+
+ /// Attempts to acquire this lock, returning whether the lock was acquired or
+ /// not.
+ ///
+ /// If `Some` is returned then the data this lock protects can be accessed
+ /// through the sentinel. This sentinel allows both mutable and immutable
+ /// access.
+ ///
+ /// If `None` is returned then the lock is already locked, either elsewhere
+ /// on this thread or on another thread.
+ pub fn try_lock(&self) -> Option<TryLock<T>> {
+ if !self.locked.swap(true, SeqCst) {
+ Some(TryLock { __ptr: self })
+ } else {
+ None
+ }
+ }
+}
+
+impl<'a, T> Deref for TryLock<'a, T> {
+ type Target = T;
+ fn deref(&self) -> &T {
+ // The existence of `TryLock` represents that we own the lock, so we
+ // can safely access the data here.
+ unsafe { &*self.__ptr.data.get() }
+ }
+}
+
+impl<'a, T> DerefMut for TryLock<'a, T> {
+ fn deref_mut(&mut self) -> &mut T {
+ // The existence of `TryLock` represents that we own the lock, so we
+ // can safely access the data here.
+ //
+ // Additionally, we're the *only* `TryLock` in existence so mutable
+ // access should be ok.
+ unsafe { &mut *self.__ptr.data.get() }
+ }
+}
+
+impl<'a, T> Drop for TryLock<'a, T> {
+ fn drop(&mut self) {
+ self.__ptr.locked.store(false, SeqCst);
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::Lock;
+
+ #[test]
+ fn smoke() {
+ let a = Lock::new(1);
+ let mut a1 = a.try_lock().unwrap();
+ assert!(a.try_lock().is_none());
+ assert_eq!(*a1, 1);
+ *a1 = 2;
+ drop(a1);
+ assert_eq!(*a.try_lock().unwrap(), 2);
+ assert_eq!(*a.try_lock().unwrap(), 2);
+ }
+}
diff --git a/tokio-channel/src/mpsc/mod.rs b/tokio-channel/src/mpsc/mod.rs
new file mode 100644
index 00000000..21abd55d
--- /dev/null
+++ b/tokio-channel/src/mpsc/mod.rs
@@ -0,0 +1,989 @@
+//! A multi-producer, single-consumer, futures-aware, FIFO queue with back pressure.
+//!
+//! A channel can be used as a communication primitive between tasks running on
+//! `futures-rs` executors. Channel creation provides `Receiver` and `Sender`
+//! handles. `Receiver` implements `Stream` and allows a task to read values
+//! out of the channel. If there is no message to read from the channel, the
+//! current task will be notified when a new value is sent. `Sender` implements
+//! the `Sink` trait and allows a task to send messages into the channel. If
+//! the channel is at capacity, then send will be rejected and the task will be
+//! notified when additional capacity is available.
+//!
+//! # Disconnection
+//!
+//! When all `Sender` handles have been dropped, it is no longer possible to
+//! send values into the channel. This is considered the termination event of
+//! the stream. As such, `Sender::poll` will return `Ok(Ready(None))`.
+//!
+//! If the receiver handle is dropped, then messages can no longer be read out
+//! of the channel. In this case, a `send` will result in an error.
+//!
+//! # Clean Shutdown
+//!
+//! If the `Receiver` is simply dropped, then it is possible for there to be
+//! messages still in the channel that will not be processed. As such, it is
+//! usually desirable to perform a "clean" shutdown. To do this, the receiver
+//! will first call `close`, which will prevent any further messages to be sent
+//! into the channel. Then, the receiver consumes the channel to completion, at
+//! which point the receiver can be dropped.
+
+// At the core, the channel uses an atomic FIFO queue for message passing. This
+// queue is used as the primary coordination primitive. In order to enforce
+// capacity limits and handle back pressure, a secondary FIFO queue is used to
+// send parked task handles.
+//
+// The general idea is that the channel is created with a `buffer` size of `n`.
+// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
+// slot to hold a message. This allows `Sender` to know for a fact that a send
+// will succeed *before* starting to do the actual work of sending the value.
+// Since most of this work is lock-free, once the work starts, it is impossible
+// to safely revert.
+//
+// If the sender is unable to process a send operation, then the current
+// task is parked and the handle is sent on the parked task queue.
+//
+// Note that the implementation guarantees that the channel capacity will never
+// exceed the configured limit, however there is no *strict* guarantee that the
+// receiver will wake up a parked task *immediately* when a slot becomes
+// available. However, it will almost always unpark a task when a slot becomes
+// available and it is *guaranteed* that a sender will be unparked when the
+// message that caused the sender to become parked is read out of the channel.
+//
+// The steps for sending a message are roughly:
+//
+// 1) Increment the channel message count
+// 2) If the channel is at capacity, push the task handle onto the wait queue
+// 3) Push the message onto the message queue.
+//
+// The steps for receiving a message are roughly:
+//
+// 1) Pop a message from the message queue
+// 2) Pop a task handle from the wait queue
+// 3) Decrement the channel message count.
+//
+// It's important for the order of operations on lock-free structures to happen
+// in reverse order between the sender and receiver. This makes the message
+// queue the primary coordination structure and establishes the necessary
+// happens-before semantics required for the acquire / release semantics used
+// by the queue structure.
+
+
+
+use mpsc::queue::{Queue, PopResult};
+
+use futures::task::{self, Task};
+use futures::{Async, AsyncSink, Poll, StartSend, Sink, Stream};
+
+use std::fmt;
+use std::error::Error;
+use std::any::Any;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::SeqCst;
+use std::sync::{Arc, Mutex};
+use std::thread;
+use std::usize;
+
+mod queue;
+
+/// The transmission end of a channel which is used to send values.
+///
+/// This is created by the `channel` method.
+#[derive(Debug)]
+pub struct Sender<T> {
+ // Channel state shared between the sender and receiver.
+ inner: Arc<Inner<T>>,
+
+ // Handle to the task that is blocked on this sender. This handle is sent
+ // to the receiver half in order to be notified when the sender becomes
+ // unblocked.
+ sender_task: Arc<Mutex<SenderTask>>,
+
+ // True if the sender might be blocked. This is an optimization to avoid
+ // having to lock the mutex most of the time.
+ maybe_parked: bool,
+}
+
+/// The transmission end of a channel which is used to send values.
+///
+/// This is created by the `unbounded` method.
+#[derive(Debug)]
+pub struct UnboundedSender<T>(Sender<T>);
+
+trait AssertKinds: Send + Sync + Clone {}
+impl AssertKinds for UnboundedSender<u32> {}
+
+
+/// The receiving end of a channel which implements the `Stream` trait.
+///
+/// This is a concrete implementation of a stream which can be used to represent
+/// a stream of values being computed elsewhere. This is created by the
+/// `channel` method.
+#[derive(Debug)]
+pub struct Receiver<T> {
+ inner: Arc<Inner<T>>,
+}
+
+/// Error type for sending, used when the receiving end of a channel is
+/// dropped
+#[derive(Clone, PartialEq, Eq)]
+pub struct SendError<T>(T);
+
+/// Error type returned from `try_send`
+#[derive(Clone, PartialEq, Eq)]
+pub struct TrySendError<T> {
+ kind: TrySendErrorKind<T>,
+}
+
+#[derive(Clone, PartialEq, Eq)]
+enum TrySendErrorKind<T> {
+ Full(T),
+ Disconnected(T),
+}
+
+impl<T> fmt::Debug for SendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_tuple("SendError")
+ .field(&"...")
+ .finish()
+ }
+}
+
+impl<T> fmt::Display for SendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "send failed because receiver is gone")
+ }
+}
+
+impl<T: Any> Error for SendError<T>
+{
+ fn description(&self) -> &str {
+ "send failed because receiver is gone"
+ }
+}
+
+impl<T> SendError<T> {
+ /// Returns the message that was attempted to be sent but failed.
+ pub fn into_inner(self) -> T {
+ self.0
+ }
+}
+
+impl<T> fmt::Debug for TrySendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_tuple("TrySendError")
+ .field(&"...")
+ .finish()
+ }
+}
+
+impl<T> fmt::Display for TrySendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ if self.is_full() {
+ write!(fmt, "send failed because channel is full")
+ } else {
+ write!(fmt, "send failed because receiver is gone")
+ }
+ }
+}
+
+impl<T: Any> Error for TrySendError<T> {
+ fn description(&self) -> &str {
+ if self.is_full() {
+ "send failed because channel is full"
+ } else {
+ "send failed because receiver is gone"
+ }
+ }
+}
+
+impl<T> TrySendError<T> {
+ /// Returns true if this error is a result of the channel being full
+ pub fn is_full(&self) -> bool {
+ use self::TrySendErrorKind::*;
+
+ match self.kind {
+ Full(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Returns true if this error is a result of the receiver being dropped
+ pub fn is_disconnected(&self) -> bool {
+ use self::TrySendErrorKind::*;
+
+ match self.kind {
+ Disconnected(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Returns the message that was attempted to be sent but failed.
+ pub fn into_inner(self) -> T {
+ use self::TrySendErrorKind::*;
+
+ match self.kind {
+ Full(v) | Disconnected(v) => v,
+ }
+ }
+}
+
+#[derive(Debug)]
+struct Inner<T> {
+ // Max buffer size of the channel. If `None` then the channel is unbounded.
+ buffer: Option<usize>,
+
+ // Internal channel state. Consists of the number of messages stored in the
+ // channel as well as a flag signalling that the channel is closed.
+ state: AtomicUsize,
+
+ // Atomic, FIFO queue used to send messages to the receiver
+ message_queue: Queue<Option<T>>,
+
+ // Atomic, FIFO queue used to send parked task handles to the receiver.
+ parked_queue: Queue<Arc<Mutex<SenderTask>>>,
+
+ // Number of senders in existence
+ num_senders: AtomicUsize,
+
+ // Handle to the receiver's task.
+ recv_task: Mutex<ReceiverTask>,
+}
+
+// Struct representation of `Inner::state`.
+#[derive(Debug, Clone, Copy)]
+struct State {
+ // `true` when the channel is open
+ is_open: bool,
+
+ // Number of messages in the channel
+ num_messages: usize,
+}
+
+#[derive(Debug)]
+struct ReceiverTask {
+ unparked: bool,
+ task: Option<Task>,
+}
+
+// Returned from Receiver::try_park()
+enum TryPark {
+ Parked,
+ Closed,
+ NotEmpty,
+}
+
+// The `is_open` flag is stored in the left-most bit of `Inner::state`
+const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1);
+
+// When a new channel is created, it is created in the open state with no
+// pending messages.
+const INIT_STATE: usize = OPEN_MASK;
+
+// The maximum number of messages that a channel can track is `usize::MAX >> 1`
+const MAX_CAPACITY: usize = !(OPEN_MASK);
+
+// The maximum requested buffer size must be less than the maximum capacity of
+// a channel. This is because each sender gets a guaranteed slot.
+const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
+
+// Sent to the consumer to wake up blocked producers
+#[derive(Debug)]
+struct SenderTask {
+ task: Option<Task>,
+ is_parked: bool,
+}
+
+impl SenderTask {
+ fn new() -> Self {
+ SenderTask {
+ task: None,
+ is_parked: false,
+ }
+ }
+
+ fn notify(&mut self) {
+ self.is_parked = false;
+
+ if let Some(task) = self.task.take() {
+ task.notify();
+ }
+ }
+}
+
+/// Creates an in-memory channel implementation of the `Stream` trait with
+/// bounded capacity.
+///
+/// This method creates a concrete implementation of the `Stream` trait which
+/// can be used to send values across threads in a streaming fashion. This
+/// channel is unique in that it implements back pressure to ensure that the
+/// sender never outpaces the receiver. The channel capacity is equal to
+/// `buffer + num-senders`. In other words, each sender gets a guaranteed slot
+/// in the channel capacity, and on top of that there are `buffer` "first come,
+/// first serve" slots available to all senders.
+///
+/// The `Receiver` returned implements the `Stream` trait and has access to any
+/// number of the associated combinators for transforming the result.
+pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
+ // Check that the requested buffer size does not exceed the maximum buffer
+ // size permitted by the system.
+ assert!(buffer < MAX_BUFFER, "requested buffer size too large");
+ channel2(Some(buffer))
+}
+
+/// Creates an in-memory channel implementation of the `Stream` trait with
+/// unbounded capacity.
+///
+/// This method creates a concrete implementation of the `Stream` trait which
+/// can be used to send values across threads in a streaming fashion. A `send`
+/// on this channel will always succeed as long as the receive half has not
+/// been closed. If the receiver falls behind, messages will be buffered
+/// internally.
+///
+/// **Note** that the amount of available system memory is an implicit bound to
+/// the channel. Using an `unbounded` channel has the ability of causing the
+/// process to run out of memory. In this case, the process will be aborted.
+pub fn unbounded<T>() -> (UnboundedSender<T>, Receiver<T>) {
+ let (tx, rx) = channel2(None);
+ (UnboundedSender(tx), rx)
+}
+
+fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
+ let inner = Arc::new(Inner {
+ buffer: buffer,
+ state: AtomicUsize::new(INIT_STATE),
+ message_queue: Queue::new(),
+ parked_queue: Queue::new(),
+ num_senders: AtomicUsize::new(1),
+ recv_task: Mutex::new(ReceiverTask {
+ unparked: false,
+ task: None,
+ }),
+ });
+
+ let tx = Sender {
+ inner: inner.clone(),
+ sender_task: Arc::new(Mutex::new(SenderTask::new())),
+ maybe_parked: false,
+ };
+
+ let rx = Receiver {
+ inner: inner,
+ };
+
+ (tx, rx)
+}
+
+/*
+ *
+ * ===== impl Sender =====
+ *
+ */
+
+impl<T> Sender<T> {
+ /// Attempts to send a message on this `Sender<T>` without blocking.
+ ///
+ /// This function, unlike `start_send`, is safe to call whether it's being
+ /// called on a task or not. Note that this function, however, will *not*
+ /// attempt to block the current task if the message cannot be sent.
+ ///
+ /// It is not recommended to call this function from inside of a future,
+ /// only from an external thread where you've otherwise arranged to be
+ /// notified when the channel is no longer full.
+ pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
+ // If the sender is currently blocked, reject the message
+ if !self.poll_unparked(false).is_ready() {
+ return Err(TrySendError {
+ kind: TrySendErrorKind::Full(msg),
+ });
+ }
+
+ // The channel has capacity to accept the message, so send it
+ self.do_send(Some(msg), false)
+ .map_err(|SendError(v)| {
+ TrySendError {
+ kind: TrySendErrorKind::Disconnected(v),
+ }
+ })
+ }
+
+ // Do the send without failing
+ // None means close
+ fn do_send(&mut self, msg: Option<T>, do_park: bool) -> Result<(), SendError<T>> {
+ // First, increment the number of messages contained by the channel.
+ // This operation will also atomically determine if the sender task
+ // should be parked.
+ //
+ // None is returned in the case that the channel has been closed by the
+ // receiver. This happens when `Receiver::close` is called or the
+ // receiver is dropped.
+ let park_self = match self.inc_num_messages(msg.is_none()) {
+ Some(park_self) => park_self,
+ None => {
+ // The receiver has closed the channel. Only abort if actually
+ // sending a message. It is important that the stream
+ // termination (None) is always sent. This technically means
+ // that it is possible for the queue to contain the following
+ // number of messages:
+ //
+ // num-senders + buffer + 1
+ //
+ if let Some(msg) = msg {
+ return Err(SendError(msg));
+ } else {
+ return Ok(());
+ }
+ }
+ };
+
+ // If the channel has reached capacity, then the sender task needs to
+ // be parked. This will send the task handle on the parked task queue.
+ //
+ // However, when `do_send` is called while dropping the `Sender`,
+ // `task::current()` can't be called safely. In this case, in order to
+ // maintain internal consistency, a blank message is pushed onto the
+ // parked task queue.
+ if park_self {
+ self.park(do_park);
+ }
+
+ self.queue_push_and_signal(msg);
+
+ Ok(())
+ }
+
+ // Do the send without parking current task.
+ //
+ // To be called from unbounded sender.
+ fn do_send_nb(&self, msg: T) -> Result<(), SendError<T>> {
+ match self.inc_num_messages(false) {
+ Some(park_self) => assert!(!park_self),
+ None => return Err(SendError(msg)),
+ };
+
+ self.queue_push_and_signal(Some(msg));
+
+ Ok(())
+ }
+
+ // Push message to the queue and signal to the receiver
+ fn queue_push_and_signal(&self, msg: Option<T>) {
+ // Push the message onto the message queue
+ self.inner.message_queue.push(msg);
+
+ // Signal to the receiver that a message has been enqueued. If the
+ // receiver is parked, this will unpark the task.
+ self.signal();
+ }
+
+ // Increment the number of queued messages. Returns if the sender should
+ // block.
+ fn inc_num_messages(&self, close: bool) -> Option<bool> {
+ let mut curr = self.inner.state.load(SeqCst);
+
+ loop {
+ let mut state = decode_state(curr);
+
+ // The receiver end closed the channel.
+ if !state.is_open {
+ return None;
+ }
+
+ // This probably is never hit? Odds are the process will run out of
+ // memory first. It may be worth to return something else in this
+ // case?
+ assert!(state.num_messages < MAX_CAPACITY, "buffer space exhausted; \
+ sending this messages would overflow the state");
+
+ state.num_messages += 1;
+
+ // The channel is closed by all sender handles being dropped.
+ if close {
+ state.is_open = false;
+ }
+
+ let next = encode_state(&state);
+ match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
+ Ok(_) => {
+ // Block if the current number of pending messages has exceeded
+ // the configured buffer size
+ let park_self = match self.inner.buffer {
+ Some(buffer) => state.num_messages > buffer,
+ None => false,
+ };
+
+ return Some(park_self)
+ }
+ Err(actual) => curr = actual,
+ }
+ }
+ }
+
+ // Signal to the receiver task that a message has been enqueued
+ fn signal(&self) {
+ // TODO
+ // This logic can probably be improved by guarding the lock with an
+ // atomic.
+ //
+ // Do this step first so that the lock is dropped when
+ // `unpark` is called
+ let task = {
+ let mut recv_task = self.inner.recv_task.lock().unwrap();
+
+ // If the receiver has already been unparked, then there is nothing
+ // more to do
+ if recv_task.unparked {
+ return;
+ }
+
+ // Setting this flag enables the receiving end to detect that
+ // an unpark event happened in order to avoid unnecessarily
+ // parking.
+ recv_task.unparked = true;
+ recv_task.task.take()
+ };
+
+ if let Some(task) = task {
+ task.notify();
+ }
+ }
+
+ fn park(&mut self, can_park: bool) {
+ // TODO: clean up internal state if the task::current will fail
+
+ let task = if can_park {
+ Some(task::current())
+ } else {
+ None
+ };
+
+ {
+ let mut sender = self.sender_task.lock().unwrap();
+ sender.task = task;
+ sender.is_parked = true;
+ }
+
+ // Send handle over queue
+ let t = self.sender_task.clone();
+ self.inner.parked_queue.push(t);
+
+ // Check to make sure we weren't closed after we sent our task on the
+ // queue
+ let state = decode_state(self.inner.state.load(SeqCst));
+ self.maybe_parked = state.is_open;
+ }
+
+ /// Polls the channel to determine if there is guaranteed to be capacity to send at least one
+ /// item without waiting.
+ ///
+ /// Returns `Ok(Async::Ready(_))` if there is sufficient capacity, or returns
+ /// `Ok(Async::NotReady)` if the channel is not guaranteed to have capacity. Returns
+ /// `Err(SendError(_))` if the receiver has been dropped.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if called from outside the context of a task or future.
+ pub fn poll_ready(&mut self) -> Poll<(), SendError<()>> {
+ let state = decode_state(self.inner.state.load(SeqCst));
+ if !state.is_open {
+ return Err(SendError(()));
+ }
+
+ Ok(self.poll_unparked(true))
+ }
+
+ fn poll_unparked(&mut self, do_park: bool) -> Async<()> {
+ // First check the `maybe_parked` variable. This avoids acquiring the
+ // lock in most cases
+ if self.maybe_parked {
+ // Get a lock on the task handle
+ let mut task = self.sender_task.lock().unwrap();
+
+ if !task.is_parked {
+ self.maybe_parked = false;
+ return Async::Ready(())
+ }
+
+ // At this point, an unpark request is pending, so there will be an
+ // unpark sometime in the future. We just need to make sure that
+ // the correct task will be notified.
+ //
+ // Update the task in case the `Sender` has been moved to another
+ // task
+ task.task = if do_park {
+ Some(task::current())
+ } else {
+ None
+ };
+
+ Async::NotReady
+ } else {
+ Async::Ready(())
+ }
+ }
+}
+
+impl<T> Sink for Sender<T> {
+ type SinkItem = T;
+ type SinkError = SendError<T>;
+
+ fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
+ // If the sender is currently blocked, reject the message before doing
+ // any work.
+ if !self.poll_unparked(true).is_ready() {
+ return Ok(AsyncSink::NotReady(msg));
+ }
+
+ // The channel has capacity to accept the message, so send it.
+ self.do_send(Some(msg), true)?;
+
+ Ok(AsyncSink::Ready)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
+ Ok(Async::Ready(()))
+ }
+
+ fn close(&mut self) -> Poll<(), SendError<T>> {
+ Ok(Async::Ready(()))
+ }
+}
+
+impl<T> UnboundedSender<T> {
+ /// Sends the provided message along this channel.
+ ///
+ /// This is an unbounded sender, so this function differs from `Sink::send`
+ /// by ensuring the return type reflects that the channel is always ready to
+ /// receive messages.
+ #[deprecated(note = "renamed to `unbounded_send`")]
+ #[doc(hidden)]
+ pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
+ self.unbounded_send(msg)
+ }
+
+ /// Sends the provided message along this channel.
+ ///
+ /// This is an unbounded sender, so this function differs from `Sink::send`
+ /// by ensuring the return type reflects that the channel is always ready to
+ /// receive messages.
+ pub fn unbounded_send(&self, msg: T) -> Result<(), SendError<T>> {
+ self.0.do_send_nb(msg)
+ }
+}
+
+impl<T> Sink for UnboundedSe