diff options
author | Carl Lerche <me@carllerche.com> | 2018-08-27 12:24:51 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-08-27 12:24:51 -0700 |
commit | b479ce78d3e05f689d2a7c501942d2de67bed722 (patch) | |
tree | 487e9b625b713a766b57f7299fee70c24371c272 /tokio-channel | |
parent | 6e45e0ac61b68208310748a61f68dc6df576dbb6 (diff) |
add experimental async/await support. (#582)
This patch adds experimental async/await support to Tokio. It does this
by adding feature flags to existing libs only where necessary in order
to add nightly specific code (mostly `Unpin` implementations). It then
provides a new crate: `tokio-async-await` which is a shim layer on top
of `tokio`.
The `tokio-async-await` crate is expected to look exactly like `tokio`
does, but with async / await support. This strategy reduces the amount
of cfg guarding in the main libraries.
This patch also adds `tokio-channel`, which is copied from futures-rs
0.1 and adds the necessary `Unpin` implementations. In general, futures
0.1 is mostly unmaintained, so it will make sense for Tokio to take over
maintainership of key components regardless of async / await support.
Diffstat (limited to 'tokio-channel')
-rw-r--r-- | tokio-channel/CHANGELOG.md | 0 | ||||
-rw-r--r-- | tokio-channel/Cargo.toml | 25 | ||||
-rw-r--r-- | tokio-channel/LICENSE | 51 | ||||
-rw-r--r-- | tokio-channel/README.md | 0 | ||||
-rw-r--r-- | tokio-channel/src/async_await.rs | 10 | ||||
-rw-r--r-- | tokio-channel/src/lib.rs | 22 | ||||
-rw-r--r-- | tokio-channel/src/lock.rs | 105 | ||||
-rw-r--r-- | tokio-channel/src/mpsc/mod.rs | 989 | ||||
-rw-r--r-- | tokio-channel/src/mpsc/queue.rs | 151 | ||||
-rw-r--r-- | tokio-channel/src/oneshot.rs | 426 | ||||
-rw-r--r-- | tokio-channel/tests/mpsc-close.rs | 22 | ||||
-rw-r--r-- | tokio-channel/tests/mpsc.rs | 481 | ||||
-rw-r--r-- | tokio-channel/tests/oneshot.rs | 124 | ||||
-rw-r--r-- | tokio-channel/tests/support/mod.rs | 16 |
14 files changed, 2422 insertions, 0 deletions
diff --git a/tokio-channel/CHANGELOG.md b/tokio-channel/CHANGELOG.md new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tokio-channel/CHANGELOG.md diff --git a/tokio-channel/Cargo.toml b/tokio-channel/Cargo.toml new file mode 100644 index 00000000..d38ac375 --- /dev/null +++ b/tokio-channel/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "tokio-channel" + +# When releasing to crates.io: +# - Update html_root_url. +# - Update CHANGELOG.md. +# - Create "v0.1.x" git tag. +version = "0.1.0" +authors = ["Carl Lerche <me@carllerche.com>"] +license = "MIT" +repository = "https://github.com/tokio-rs/tokio" +homepage = "https://tokio.rs" +documentation = "https://docs.rs/tokio-channel/0.1.0" +description = """ +Channels for asynchronous communication using Tokio. +""" +categories = ["asynchronous"] + +[features] +# This feature comes with no promise of stability. Things will break with each +# patch release. Use at your own risk. +async-await-preview = [] + +[dependencies] +futures = "0.1.23" diff --git a/tokio-channel/LICENSE b/tokio-channel/LICENSE new file mode 100644 index 00000000..e0c7ffad --- /dev/null +++ b/tokio-channel/LICENSE @@ -0,0 +1,51 @@ +Copyright (c) 2018 Tokio Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. + +Copyright (c) 2016 futures-rs Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/tokio-channel/README.md b/tokio-channel/README.md new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tokio-channel/README.md diff --git a/tokio-channel/src/async_await.rs b/tokio-channel/src/async_await.rs new file mode 100644 index 00000000..ff501d68 --- /dev/null +++ b/tokio-channel/src/async_await.rs @@ -0,0 +1,10 @@ +use {oneshot, mpsc}; + +use std::marker::Unpin; + +impl<T> Unpin for oneshot::Sender<T> {} +impl<T> Unpin for oneshot::Receiver<T> {} + +impl<T> Unpin for mpsc::Sender<T> {} +impl<T> Unpin for mpsc::UnboundedSender<T> {} +impl<T> Unpin for mpsc::Receiver<T> {} diff --git a/tokio-channel/src/lib.rs b/tokio-channel/src/lib.rs new file mode 100644 index 00000000..ae385530 --- /dev/null +++ b/tokio-channel/src/lib.rs @@ -0,0 +1,22 @@ +#![doc(html_root_url = "https://docs.rs/tokio-channel/0.1.0")] +#![deny(missing_docs, warnings, missing_debug_implementations)] +#![cfg_attr(feature = "async-await-preview", feature( + pin, + ))] + +//! Asynchronous channels. +//! +//! This crate provides channels that can be used to communicate between +//! asynchronous tasks. + +extern crate futures; + +pub mod mpsc; +pub mod oneshot; + +mod lock; + +// ===== EXPERIMENTAL async / await support ===== + +#[cfg(feature = "async-await-preview")] +mod async_await; diff --git a/tokio-channel/src/lock.rs b/tokio-channel/src/lock.rs new file mode 100644 index 00000000..d2acf0a6 --- /dev/null +++ b/tokio-channel/src/lock.rs @@ -0,0 +1,105 @@ +//! A "mutex" which only supports `try_lock` +//! +//! As a futures library the eventual call to an event loop should be the only +//! thing that ever blocks, so this is assisted with a fast user-space +//! implementation of a lock that can only have a `try_lock` operation. + +use std::cell::UnsafeCell; +use std::ops::{Deref, DerefMut}; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::atomic::AtomicBool; + +/// A "mutex" around a value, similar to `std::sync::Mutex<T>`. +/// +/// This lock only supports the `try_lock` operation, however, and does not +/// implement poisoning. +#[derive(Debug)] +pub struct Lock<T> { + locked: AtomicBool, + data: UnsafeCell<T>, +} + +/// Sentinel representing an acquired lock through which the data can be +/// accessed. +pub struct TryLock<'a, T: 'a> { + __ptr: &'a Lock<T>, +} + +// The `Lock` structure is basically just a `Mutex<T>`, and these two impls are +// intended to mirror the standard library's corresponding impls for `Mutex<T>`. +// +// If a `T` is sendable across threads, so is the lock, and `T` must be sendable +// across threads to be `Sync` because it allows mutable access from multiple +// threads. +unsafe impl<T: Send> Send for Lock<T> {} +unsafe impl<T: Send> Sync for Lock<T> {} + +impl<T> Lock<T> { + /// Creates a new lock around the given value. + pub fn new(t: T) -> Lock<T> { + Lock { + locked: AtomicBool::new(false), + data: UnsafeCell::new(t), + } + } + + /// Attempts to acquire this lock, returning whether the lock was acquired or + /// not. + /// + /// If `Some` is returned then the data this lock protects can be accessed + /// through the sentinel. This sentinel allows both mutable and immutable + /// access. + /// + /// If `None` is returned then the lock is already locked, either elsewhere + /// on this thread or on another thread. + pub fn try_lock(&self) -> Option<TryLock<T>> { + if !self.locked.swap(true, SeqCst) { + Some(TryLock { __ptr: self }) + } else { + None + } + } +} + +impl<'a, T> Deref for TryLock<'a, T> { + type Target = T; + fn deref(&self) -> &T { + // The existence of `TryLock` represents that we own the lock, so we + // can safely access the data here. + unsafe { &*self.__ptr.data.get() } + } +} + +impl<'a, T> DerefMut for TryLock<'a, T> { + fn deref_mut(&mut self) -> &mut T { + // The existence of `TryLock` represents that we own the lock, so we + // can safely access the data here. + // + // Additionally, we're the *only* `TryLock` in existence so mutable + // access should be ok. + unsafe { &mut *self.__ptr.data.get() } + } +} + +impl<'a, T> Drop for TryLock<'a, T> { + fn drop(&mut self) { + self.__ptr.locked.store(false, SeqCst); + } +} + +#[cfg(test)] +mod tests { + use super::Lock; + + #[test] + fn smoke() { + let a = Lock::new(1); + let mut a1 = a.try_lock().unwrap(); + assert!(a.try_lock().is_none()); + assert_eq!(*a1, 1); + *a1 = 2; + drop(a1); + assert_eq!(*a.try_lock().unwrap(), 2); + assert_eq!(*a.try_lock().unwrap(), 2); + } +} diff --git a/tokio-channel/src/mpsc/mod.rs b/tokio-channel/src/mpsc/mod.rs new file mode 100644 index 00000000..21abd55d --- /dev/null +++ b/tokio-channel/src/mpsc/mod.rs @@ -0,0 +1,989 @@ +//! A multi-producer, single-consumer, futures-aware, FIFO queue with back pressure. +//! +//! A channel can be used as a communication primitive between tasks running on +//! `futures-rs` executors. Channel creation provides `Receiver` and `Sender` +//! handles. `Receiver` implements `Stream` and allows a task to read values +//! out of the channel. If there is no message to read from the channel, the +//! current task will be notified when a new value is sent. `Sender` implements +//! the `Sink` trait and allows a task to send messages into the channel. If +//! the channel is at capacity, then send will be rejected and the task will be +//! notified when additional capacity is available. +//! +//! # Disconnection +//! +//! When all `Sender` handles have been dropped, it is no longer possible to +//! send values into the channel. This is considered the termination event of +//! the stream. As such, `Sender::poll` will return `Ok(Ready(None))`. +//! +//! If the receiver handle is dropped, then messages can no longer be read out +//! of the channel. In this case, a `send` will result in an error. +//! +//! # Clean Shutdown +//! +//! If the `Receiver` is simply dropped, then it is possible for there to be +//! messages still in the channel that will not be processed. As such, it is +//! usually desirable to perform a "clean" shutdown. To do this, the receiver +//! will first call `close`, which will prevent any further messages to be sent +//! into the channel. Then, the receiver consumes the channel to completion, at +//! which point the receiver can be dropped. + +// At the core, the channel uses an atomic FIFO queue for message passing. This +// queue is used as the primary coordination primitive. In order to enforce +// capacity limits and handle back pressure, a secondary FIFO queue is used to +// send parked task handles. +// +// The general idea is that the channel is created with a `buffer` size of `n`. +// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed" +// slot to hold a message. This allows `Sender` to know for a fact that a send +// will succeed *before* starting to do the actual work of sending the value. +// Since most of this work is lock-free, once the work starts, it is impossible +// to safely revert. +// +// If the sender is unable to process a send operation, then the current +// task is parked and the handle is sent on the parked task queue. +// +// Note that the implementation guarantees that the channel capacity will never +// exceed the configured limit, however there is no *strict* guarantee that the +// receiver will wake up a parked task *immediately* when a slot becomes +// available. However, it will almost always unpark a task when a slot becomes +// available and it is *guaranteed* that a sender will be unparked when the +// message that caused the sender to become parked is read out of the channel. +// +// The steps for sending a message are roughly: +// +// 1) Increment the channel message count +// 2) If the channel is at capacity, push the task handle onto the wait queue +// 3) Push the message onto the message queue. +// +// The steps for receiving a message are roughly: +// +// 1) Pop a message from the message queue +// 2) Pop a task handle from the wait queue +// 3) Decrement the channel message count. +// +// It's important for the order of operations on lock-free structures to happen +// in reverse order between the sender and receiver. This makes the message +// queue the primary coordination structure and establishes the necessary +// happens-before semantics required for the acquire / release semantics used +// by the queue structure. + + + +use mpsc::queue::{Queue, PopResult}; + +use futures::task::{self, Task}; +use futures::{Async, AsyncSink, Poll, StartSend, Sink, Stream}; + +use std::fmt; +use std::error::Error; +use std::any::Any; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::usize; + +mod queue; + +/// The transmission end of a channel which is used to send values. +/// +/// This is created by the `channel` method. +#[derive(Debug)] +pub struct Sender<T> { + // Channel state shared between the sender and receiver. + inner: Arc<Inner<T>>, + + // Handle to the task that is blocked on this sender. This handle is sent + // to the receiver half in order to be notified when the sender becomes + // unblocked. + sender_task: Arc<Mutex<SenderTask>>, + + // True if the sender might be blocked. This is an optimization to avoid + // having to lock the mutex most of the time. + maybe_parked: bool, +} + +/// The transmission end of a channel which is used to send values. +/// +/// This is created by the `unbounded` method. +#[derive(Debug)] +pub struct UnboundedSender<T>(Sender<T>); + +trait AssertKinds: Send + Sync + Clone {} +impl AssertKinds for UnboundedSender<u32> {} + + +/// The receiving end of a channel which implements the `Stream` trait. +/// +/// This is a concrete implementation of a stream which can be used to represent +/// a stream of values being computed elsewhere. This is created by the +/// `channel` method. +#[derive(Debug)] +pub struct Receiver<T> { + inner: Arc<Inner<T>>, +} + +/// Error type for sending, used when the receiving end of a channel is +/// dropped +#[derive(Clone, PartialEq, Eq)] +pub struct SendError<T>(T); + +/// Error type returned from `try_send` +#[derive(Clone, PartialEq, Eq)] +pub struct TrySendError<T> { + kind: TrySendErrorKind<T>, +} + +#[derive(Clone, PartialEq, Eq)] +enum TrySendErrorKind<T> { + Full(T), + Disconnected(T), +} + +impl<T> fmt::Debug for SendError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_tuple("SendError") + .field(&"...") + .finish() + } +} + +impl<T> fmt::Display for SendError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "send failed because receiver is gone") + } +} + +impl<T: Any> Error for SendError<T> +{ + fn description(&self) -> &str { + "send failed because receiver is gone" + } +} + +impl<T> SendError<T> { + /// Returns the message that was attempted to be sent but failed. + pub fn into_inner(self) -> T { + self.0 + } +} + +impl<T> fmt::Debug for TrySendError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_tuple("TrySendError") + .field(&"...") + .finish() + } +} + +impl<T> fmt::Display for TrySendError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + if self.is_full() { + write!(fmt, "send failed because channel is full") + } else { + write!(fmt, "send failed because receiver is gone") + } + } +} + +impl<T: Any> Error for TrySendError<T> { + fn description(&self) -> &str { + if self.is_full() { + "send failed because channel is full" + } else { + "send failed because receiver is gone" + } + } +} + +impl<T> TrySendError<T> { + /// Returns true if this error is a result of the channel being full + pub fn is_full(&self) -> bool { + use self::TrySendErrorKind::*; + + match self.kind { + Full(_) => true, + _ => false, + } + } + + /// Returns true if this error is a result of the receiver being dropped + pub fn is_disconnected(&self) -> bool { + use self::TrySendErrorKind::*; + + match self.kind { + Disconnected(_) => true, + _ => false, + } + } + + /// Returns the message that was attempted to be sent but failed. + pub fn into_inner(self) -> T { + use self::TrySendErrorKind::*; + + match self.kind { + Full(v) | Disconnected(v) => v, + } + } +} + +#[derive(Debug)] +struct Inner<T> { + // Max buffer size of the channel. If `None` then the channel is unbounded. + buffer: Option<usize>, + + // Internal channel state. Consists of the number of messages stored in the + // channel as well as a flag signalling that the channel is closed. + state: AtomicUsize, + + // Atomic, FIFO queue used to send messages to the receiver + message_queue: Queue<Option<T>>, + + // Atomic, FIFO queue used to send parked task handles to the receiver. + parked_queue: Queue<Arc<Mutex<SenderTask>>>, + + // Number of senders in existence + num_senders: AtomicUsize, + + // Handle to the receiver's task. + recv_task: Mutex<ReceiverTask>, +} + +// Struct representation of `Inner::state`. +#[derive(Debug, Clone, Copy)] +struct State { + // `true` when the channel is open + is_open: bool, + + // Number of messages in the channel + num_messages: usize, +} + +#[derive(Debug)] +struct ReceiverTask { + unparked: bool, + task: Option<Task>, +} + +// Returned from Receiver::try_park() +enum TryPark { + Parked, + Closed, + NotEmpty, +} + +// The `is_open` flag is stored in the left-most bit of `Inner::state` +const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1); + +// When a new channel is created, it is created in the open state with no +// pending messages. +const INIT_STATE: usize = OPEN_MASK; + +// The maximum number of messages that a channel can track is `usize::MAX >> 1` +const MAX_CAPACITY: usize = !(OPEN_MASK); + +// The maximum requested buffer size must be less than the maximum capacity of +// a channel. This is because each sender gets a guaranteed slot. +const MAX_BUFFER: usize = MAX_CAPACITY >> 1; + +// Sent to the consumer to wake up blocked producers +#[derive(Debug)] +struct SenderTask { + task: Option<Task>, + is_parked: bool, +} + +impl SenderTask { + fn new() -> Self { + SenderTask { + task: None, + is_parked: false, + } + } + + fn notify(&mut self) { + self.is_parked = false; + + if let Some(task) = self.task.take() { + task.notify(); + } + } +} + +/// Creates an in-memory channel implementation of the `Stream` trait with +/// bounded capacity. +/// +/// This method creates a concrete implementation of the `Stream` trait which +/// can be used to send values across threads in a streaming fashion. This +/// channel is unique in that it implements back pressure to ensure that the +/// sender never outpaces the receiver. The channel capacity is equal to +/// `buffer + num-senders`. In other words, each sender gets a guaranteed slot +/// in the channel capacity, and on top of that there are `buffer` "first come, +/// first serve" slots available to all senders. +/// +/// The `Receiver` returned implements the `Stream` trait and has access to any +/// number of the associated combinators for transforming the result. +pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { + // Check that the requested buffer size does not exceed the maximum buffer + // size permitted by the system. + assert!(buffer < MAX_BUFFER, "requested buffer size too large"); + channel2(Some(buffer)) +} + +/// Creates an in-memory channel implementation of the `Stream` trait with +/// unbounded capacity. +/// +/// This method creates a concrete implementation of the `Stream` trait which +/// can be used to send values across threads in a streaming fashion. A `send` +/// on this channel will always succeed as long as the receive half has not +/// been closed. If the receiver falls behind, messages will be buffered +/// internally. +/// +/// **Note** that the amount of available system memory is an implicit bound to +/// the channel. Using an `unbounded` channel has the ability of causing the +/// process to run out of memory. In this case, the process will be aborted. +pub fn unbounded<T>() -> (UnboundedSender<T>, Receiver<T>) { + let (tx, rx) = channel2(None); + (UnboundedSender(tx), rx) +} + +fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) { + let inner = Arc::new(Inner { + buffer: buffer, + state: AtomicUsize::new(INIT_STATE), + message_queue: Queue::new(), + parked_queue: Queue::new(), + num_senders: AtomicUsize::new(1), + recv_task: Mutex::new(ReceiverTask { + unparked: false, + task: None, + }), + }); + + let tx = Sender { + inner: inner.clone(), + sender_task: Arc::new(Mutex::new(SenderTask::new())), + maybe_parked: false, + }; + + let rx = Receiver { + inner: inner, + }; + + (tx, rx) +} + +/* + * + * ===== impl Sender ===== + * + */ + +impl<T> Sender<T> { + /// Attempts to send a message on this `Sender<T>` without blocking. + /// + /// This function, unlike `start_send`, is safe to call whether it's being + /// called on a task or not. Note that this function, however, will *not* + /// attempt to block the current task if the message cannot be sent. + /// + /// It is not recommended to call this function from inside of a future, + /// only from an external thread where you've otherwise arranged to be + /// notified when the channel is no longer full. + pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { + // If the sender is currently blocked, reject the message + if !self.poll_unparked(false).is_ready() { + return Err(TrySendError { + kind: TrySendErrorKind::Full(msg), + }); + } + + // The channel has capacity to accept the message, so send it + self.do_send(Some(msg), false) + .map_err(|SendError(v)| { + TrySendError { + kind: TrySendErrorKind::Disconnected(v), + } + }) + } + + // Do the send without failing + // None means close + fn do_send(&mut self, msg: Option<T>, do_park: bool) -> Result<(), SendError<T>> { + // First, increment the number of messages contained by the channel. + // This operation will also atomically determine if the sender task + // should be parked. + // + // None is returned in the case that the channel has been closed by the + // receiver. This happens when `Receiver::close` is called or the + // receiver is dropped. + let park_self = match self.inc_num_messages(msg.is_none()) { + Some(park_self) => park_self, + None => { + // The receiver has closed the channel. Only abort if actually + // sending a message. It is important that the stream + // termination (None) is always sent. This technically means + // that it is possible for the queue to contain the following + // number of messages: + // + // num-senders + buffer + 1 + // + if let Some(msg) = msg { + return Err(SendError(msg)); + } else { + return Ok(()); + } + } + }; + + // If the channel has reached capacity, then the sender task needs to + // be parked. This will send the task handle on the parked task queue. + // + // However, when `do_send` is called while dropping the `Sender`, + // `task::current()` can't be called safely. In this case, in order to + // maintain internal consistency, a blank message is pushed onto the + // parked task queue. + if park_self { + self.park(do_park); + } + + self.queue_push_and_signal(msg); + + Ok(()) + } + + // Do the send without parking current task. + // + // To be called from unbounded sender. + fn do_send_nb(&self, msg: T) -> Result<(), SendError<T>> { + match self.inc_num_messages(false) { + Some(park_self) => assert!(!park_self), + None => return Err(SendError(msg)), + }; + + self.queue_push_and_signal(Some(msg)); + + Ok(()) + } + + // Push message to the queue and signal to the receiver + fn queue_push_and_signal(&self, msg: Option<T>) { + // Push the message onto the message queue + self.inner.message_queue.push(msg); + + // Signal to the receiver that a message has been enqueued. If the + // receiver is parked, this will unpark the task. + self.signal(); + } + + // Increment the number of queued messages. Returns if the sender should + // block. + fn inc_num_messages(&self, close: bool) -> Option<bool> { + let mut curr = self.inner.state.load(SeqCst); + + loop { + let mut state = decode_state(curr); + + // The receiver end closed the channel. + if !state.is_open { + return None; + } + + // This probably is never hit? Odds are the process will run out of + // memory first. It may be worth to return something else in this + // case? + assert!(state.num_messages < MAX_CAPACITY, "buffer space exhausted; \ + sending this messages would overflow the state"); + + state.num_messages += 1; + + // The channel is closed by all sender handles being dropped. + if close { + state.is_open = false; + } + + let next = encode_state(&state); + match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { + Ok(_) => { + // Block if the current number of pending messages has exceeded + // the configured buffer size + let park_self = match self.inner.buffer { + Some(buffer) => state.num_messages > buffer, + None => false, + }; + + return Some(park_self) + } + Err(actual) => curr = actual, + } + } + } + + // Signal to the receiver task that a message has been enqueued + fn signal(&self) { + // TODO + // This logic can probably be improved by guarding the lock with an + // atomic. + // + // Do this step first so that the lock is dropped when + // `unpark` is called + let task = { + let mut recv_task = self.inner.recv_task.lock().unwrap(); + + // If the receiver has already been unparked, then there is nothing + // more to do + if recv_task.unparked { + return; + } + + // Setting this flag enables the receiving end to detect that + // an unpark event happened in order to avoid unnecessarily + // parking. + recv_task.unparked = true; + recv_task.task.take() + }; + + if let Some(task) = task { + task.notify(); + } + } + + fn park(&mut self, can_park: bool) { + // TODO: clean up internal state if the task::current will fail + + let task = if can_park { + Some(task::current()) + } else { + None + }; + + { + let mut sender = self.sender_task.lock().unwrap(); + sender.task = task; + sender.is_parked = true; + } + + // Send handle over queue + let t = self.sender_task.clone(); + self.inner.parked_queue.push(t); + + // Check to make sure we weren't closed after we sent our task on the + // queue + let state = decode_state(self.inner.state.load(SeqCst)); + self.maybe_parked = state.is_open; + } + + /// Polls the channel to determine if there is guaranteed to be capacity to send at least one + /// item without waiting. + /// + /// Returns `Ok(Async::Ready(_))` if there is sufficient capacity, or returns + /// `Ok(Async::NotReady)` if the channel is not guaranteed to have capacity. Returns + /// `Err(SendError(_))` if the receiver has been dropped. + /// + /// # Panics + /// + /// This method will panic if called from outside the context of a task or future. + pub fn poll_ready(&mut self) -> Poll<(), SendError<()>> { + let state = decode_state(self.inner.state.load(SeqCst)); + if !state.is_open { + return Err(SendError(())); + } + + Ok(self.poll_unparked(true)) + } + + fn poll_unparked(&mut self, do_park: bool) -> Async<()> { + // First check the `maybe_parked` variable. This avoids acquiring the + // lock in most cases + if self.maybe_parked { + // Get a lock on the task handle + let mut task = self.sender_task.lock().unwrap(); + + if !task.is_parked { + self.maybe_parked = false; + return Async::Ready(()) + } + + // At this point, an unpark request is pending, so there will be an + // unpark sometime in the future. We just need to make sure that + // the correct task will be notified. + // + // Update the task in case the `Sender` has been moved to another + // task + task.task = if do_park { + Some(task::current()) + } else { + None + }; + + Async::NotReady + } else { + Async::Ready(()) + } + } +} + +impl<T> Sink for Sender<T> { + type SinkItem = T; + type SinkError = SendError<T>; + + fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> { + // If the sender is currently blocked, reject the message before doing + // any work. + if !self.poll_unparked(true).is_ready() { + return Ok(AsyncSink::NotReady(msg)); + } + + // The channel has capacity to accept the message, so send it. + self.do_send(Some(msg), true)?; + + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), SendError<T>> { + Ok(Async::Ready(())) + } + + fn close(&mut self) -> Poll<(), SendError<T>> { + Ok(Async::Ready(())) + } +} + +impl<T> UnboundedSender<T> { + /// Sends the provided message along this channel. + /// + /// This is an unbounded sender, so this function differs from `Sink::send` + /// by ensuring the return type reflects that the channel is always ready to + /// receive messages. + #[deprecated(note = "renamed to `unbounded_send`")] + #[doc(hidden)] + pub fn send(&self, msg: T) -> Result<(), SendError<T>> { + self.unbounded_send(msg) + } + + /// Sends the provided message along this channel. + /// + /// This is an unbounded sender, so this function differs from `Sink::send` + /// by ensuring the return type reflects that the channel is always ready to + /// receive messages. + pub fn unbounded_send(&self, msg: T) -> Result<(), SendError<T>> { + self.0.do_send_nb(msg) + } +} + +impl<T> Sink for UnboundedSe |