diff options
author | Carl Lerche <me@carllerche.com> | 2019-01-22 11:37:26 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-01-22 11:37:26 -0800 |
commit | 13083153aad0750bfee5772d633c725b70f5d243 (patch) | |
tree | e16419be74a39cf60efc7b235db6991e81ccd6cf | |
parent | 91f20e33a41108be2b42a8593c70cbff68a23bd6 (diff) |
Introduce tokio-sync crate containing synchronization primitives. (#839)
Introduce a tokio-sync crate containing useful synchronization primitives for programs
written using Tokio.
The initial release contains:
* An mpsc channel
* A oneshot channel
* A semaphore implementation
* An `AtomicTask` primitive.
The `oneshot` and `mpsc` channels are new implementations providing improved
performance characteristics. In some benchmarks, the new mpsc channel shows
up to 7x improvement over the version provided by the `futures` crate. Unfortunately,
the `oneshot` implementation only provides a slight performance improvement as it
is mostly limited by the `futures` 0.1 task system. Once updated to the `std` version
of `Future` (currently nightly only), much greater performance improvements should
be achievable by `oneshot`.
Additionally, he implementations provided here are checked using
[Loom](http://github.com/carllerche/loom/), which provides greater confidence of
correctness.
43 files changed, 5145 insertions, 2384 deletions
diff --git a/.appveyor.yml b/.appveyor.yml index 4674a388..d04ab464 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -11,6 +11,7 @@ install: - rustup-init.exe -y --default-host %TARGET% - set PATH=%PATH%;C:\Users\appveyor\.cargo\bin - set RUST_BACKTRACE=1 + - set LOOM_MAX_DURATION=10 - rustc -V - cargo -V diff --git a/.travis.yml b/.travis.yml index d95ac7a9..8ac0308a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -101,6 +101,8 @@ script: | cargo check --all --exclude tokio-tls --target $TARGET cargo check --tests --all --exclude tokio-tls --target $TARGET else + # Limit the execution time of loom tests. + export LOOM_MAX_DURATION=10 cargo test --all --no-fail-fast cargo doc --all fi @@ -26,7 +26,6 @@ members = [ "./", "tokio-async-await", "tokio-buf", - "tokio-channel", "tokio-codec", "tokio-current-thread", "tokio-executor", @@ -34,6 +33,7 @@ members = [ "tokio-io", "tokio-reactor", "tokio-signal", + "tokio-sync", "tokio-threadpool", "tokio-timer", "tokio-tcp", @@ -49,6 +49,7 @@ default = [ "io", "reactor", "rt-full", + "sync", "tcp", "timer", "udp", @@ -67,6 +68,7 @@ rt-full = [ "tokio-executor", "tokio-threadpool", ] +sync = ["tokio-sync"] tcp = ["tokio-tcp"] timer = ["tokio-timer"] udp = ["tokio-udp"] @@ -95,6 +97,7 @@ tokio-fs = { version = "0.1.3", path = "tokio-fs", optional = true } tokio-io = { version = "0.1.6", path = "tokio-io", optional = true } tokio-executor = { version = "0.1.5", path = "tokio-executor", optional = true } tokio-reactor = { version = "0.1.1", path = "tokio-reactor", optional = true } +tokio-sync = { version = "0.1.0", path = "tokio-sync", optional = true } tokio-threadpool = { version = "0.1.8", path = "tokio-threadpool", optional = true } tokio-tcp = { version = "0.1.0", path = "tokio-tcp", optional = true } tokio-udp = { version = "0.1.0", path = "tokio-udp", optional = true } @@ -100,6 +100,8 @@ extern crate tokio_fs; extern crate tokio_reactor; #[cfg(feature = "rt-full")] extern crate tokio_threadpool; +#[cfg(feature = "sync")] +extern crate tokio_sync; #[cfg(feature = "timer")] extern crate tokio_timer; #[cfg(feature = "tcp")] @@ -126,6 +128,8 @@ pub mod net; pub mod prelude; #[cfg(feature = "reactor")] pub mod reactor; +#[cfg(feature = "sync")] +pub mod sync; #[cfg(feature = "timer")] pub mod timer; pub mod util; diff --git a/src/sync.rs b/src/sync.rs new file mode 100644 index 00000000..52b8787b --- /dev/null +++ b/src/sync.rs @@ -0,0 +1,16 @@ +//! Future-aware synchronization +//! +//! This module is enabled with the **`sync`** feature flag. +//! +//! Tasks sometimes need to communicate with each other. This module contains +//! two basic abstractions for doing so: +//! +//! - [oneshot](oneshot/index.html), a way of sending a single value +//! from one task to another. +//! - [mpsc](mpsc/index.html), a multi-producer, single-consumer channel for +//! sending values between tasks. + +pub use tokio_sync::{ + mpsc, + oneshot, +}; diff --git a/tokio-channel/LICENSE b/tokio-channel/LICENSE deleted file mode 100644 index 37effb0f..00000000 --- a/tokio-channel/LICENSE +++ /dev/null @@ -1,51 +0,0 @@ -Copyright (c) 2019 Tokio Contributors - -Permission is hereby granted, free of charge, to any -person obtaining a copy of this software and associated -documentation files (the "Software"), to deal in the -Software without restriction, including without -limitation the rights to use, copy, modify, merge, -publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software -is furnished to do so, subject to the following -conditions: - -The above copyright notice and this permission notice -shall be included in all copies or substantial portions -of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. - -Copyright (c) 2016 futures-rs Contributors - -Permission is hereby granted, free of charge, to any -person obtaining a copy of this software and associated -documentation files (the "Software"), to deal in the -Software without restriction, including without -limitation the rights to use, copy, modify, merge, -publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software -is furnished to do so, subject to the following -conditions: - -The above copyright notice and this permission notice -shall be included in all copies or substantial portions -of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. diff --git a/tokio-channel/src/lib.rs b/tokio-channel/src/lib.rs deleted file mode 100644 index 9b1b7b52..00000000 --- a/tokio-channel/src/lib.rs +++ /dev/null @@ -1,14 +0,0 @@ -#![doc(html_root_url = "https://docs.rs/tokio-channel/0.1.0")] -#![deny(missing_docs, warnings, missing_debug_implementations)] - -//! Asynchronous channels. -//! -//! This crate provides channels that can be used to communicate between -//! asynchronous tasks. - -extern crate futures; - -pub mod mpsc; -pub mod oneshot; - -mod lock; diff --git a/tokio-channel/src/lock.rs b/tokio-channel/src/lock.rs deleted file mode 100644 index d2acf0a6..00000000 --- a/tokio-channel/src/lock.rs +++ /dev/null @@ -1,105 +0,0 @@ -//! A "mutex" which only supports `try_lock` -//! -//! As a futures library the eventual call to an event loop should be the only -//! thing that ever blocks, so this is assisted with a fast user-space -//! implementation of a lock that can only have a `try_lock` operation. - -use std::cell::UnsafeCell; -use std::ops::{Deref, DerefMut}; -use std::sync::atomic::Ordering::SeqCst; -use std::sync::atomic::AtomicBool; - -/// A "mutex" around a value, similar to `std::sync::Mutex<T>`. -/// -/// This lock only supports the `try_lock` operation, however, and does not -/// implement poisoning. -#[derive(Debug)] -pub struct Lock<T> { - locked: AtomicBool, - data: UnsafeCell<T>, -} - -/// Sentinel representing an acquired lock through which the data can be -/// accessed. -pub struct TryLock<'a, T: 'a> { - __ptr: &'a Lock<T>, -} - -// The `Lock` structure is basically just a `Mutex<T>`, and these two impls are -// intended to mirror the standard library's corresponding impls for `Mutex<T>`. -// -// If a `T` is sendable across threads, so is the lock, and `T` must be sendable -// across threads to be `Sync` because it allows mutable access from multiple -// threads. -unsafe impl<T: Send> Send for Lock<T> {} -unsafe impl<T: Send> Sync for Lock<T> {} - -impl<T> Lock<T> { - /// Creates a new lock around the given value. - pub fn new(t: T) -> Lock<T> { - Lock { - locked: AtomicBool::new(false), - data: UnsafeCell::new(t), - } - } - - /// Attempts to acquire this lock, returning whether the lock was acquired or - /// not. - /// - /// If `Some` is returned then the data this lock protects can be accessed - /// through the sentinel. This sentinel allows both mutable and immutable - /// access. - /// - /// If `None` is returned then the lock is already locked, either elsewhere - /// on this thread or on another thread. - pub fn try_lock(&self) -> Option<TryLock<T>> { - if !self.locked.swap(true, SeqCst) { - Some(TryLock { __ptr: self }) - } else { - None - } - } -} - -impl<'a, T> Deref for TryLock<'a, T> { - type Target = T; - fn deref(&self) -> &T { - // The existence of `TryLock` represents that we own the lock, so we - // can safely access the data here. - unsafe { &*self.__ptr.data.get() } - } -} - -impl<'a, T> DerefMut for TryLock<'a, T> { - fn deref_mut(&mut self) -> &mut T { - // The existence of `TryLock` represents that we own the lock, so we - // can safely access the data here. - // - // Additionally, we're the *only* `TryLock` in existence so mutable - // access should be ok. - unsafe { &mut *self.__ptr.data.get() } - } -} - -impl<'a, T> Drop for TryLock<'a, T> { - fn drop(&mut self) { - self.__ptr.locked.store(false, SeqCst); - } -} - -#[cfg(test)] -mod tests { - use super::Lock; - - #[test] - fn smoke() { - let a = Lock::new(1); - let mut a1 = a.try_lock().unwrap(); - assert!(a.try_lock().is_none()); - assert_eq!(*a1, 1); - *a1 = 2; - drop(a1); - assert_eq!(*a.try_lock().unwrap(), 2); - assert_eq!(*a.try_lock().unwrap(), 2); - } -} diff --git a/tokio-channel/src/mpsc/mod.rs b/tokio-channel/src/mpsc/mod.rs deleted file mode 100644 index 21abd55d..00000000 --- a/tokio-channel/src/mpsc/mod.rs +++ /dev/null @@ -1,989 +0,0 @@ -//! A multi-producer, single-consumer, futures-aware, FIFO queue with back pressure. -//! -//! A channel can be used as a communication primitive between tasks running on -//! `futures-rs` executors. Channel creation provides `Receiver` and `Sender` -//! handles. `Receiver` implements `Stream` and allows a task to read values -//! out of the channel. If there is no message to read from the channel, the -//! current task will be notified when a new value is sent. `Sender` implements -//! the `Sink` trait and allows a task to send messages into the channel. If -//! the channel is at capacity, then send will be rejected and the task will be -//! notified when additional capacity is available. -//! -//! # Disconnection -//! -//! When all `Sender` handles have been dropped, it is no longer possible to -//! send values into the channel. This is considered the termination event of -//! the stream. As such, `Sender::poll` will return `Ok(Ready(None))`. -//! -//! If the receiver handle is dropped, then messages can no longer be read out -//! of the channel. In this case, a `send` will result in an error. -//! -//! # Clean Shutdown -//! -//! If the `Receiver` is simply dropped, then it is possible for there to be -//! messages still in the channel that will not be processed. As such, it is -//! usually desirable to perform a "clean" shutdown. To do this, the receiver -//! will first call `close`, which will prevent any further messages to be sent -//! into the channel. Then, the receiver consumes the channel to completion, at -//! which point the receiver can be dropped. - -// At the core, the channel uses an atomic FIFO queue for message passing. This -// queue is used as the primary coordination primitive. In order to enforce -// capacity limits and handle back pressure, a secondary FIFO queue is used to -// send parked task handles. -// -// The general idea is that the channel is created with a `buffer` size of `n`. -// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed" -// slot to hold a message. This allows `Sender` to know for a fact that a send -// will succeed *before* starting to do the actual work of sending the value. -// Since most of this work is lock-free, once the work starts, it is impossible -// to safely revert. -// -// If the sender is unable to process a send operation, then the current -// task is parked and the handle is sent on the parked task queue. -// -// Note that the implementation guarantees that the channel capacity will never -// exceed the configured limit, however there is no *strict* guarantee that the -// receiver will wake up a parked task *immediately* when a slot becomes -// available. However, it will almost always unpark a task when a slot becomes -// available and it is *guaranteed* that a sender will be unparked when the -// message that caused the sender to become parked is read out of the channel. -// -// The steps for sending a message are roughly: -// -// 1) Increment the channel message count -// 2) If the channel is at capacity, push the task handle onto the wait queue -// 3) Push the message onto the message queue. -// -// The steps for receiving a message are roughly: -// -// 1) Pop a message from the message queue -// 2) Pop a task handle from the wait queue -// 3) Decrement the channel message count. -// -// It's important for the order of operations on lock-free structures to happen -// in reverse order between the sender and receiver. This makes the message -// queue the primary coordination structure and establishes the necessary -// happens-before semantics required for the acquire / release semantics used -// by the queue structure. - - - -use mpsc::queue::{Queue, PopResult}; - -use futures::task::{self, Task}; -use futures::{Async, AsyncSink, Poll, StartSend, Sink, Stream}; - -use std::fmt; -use std::error::Error; -use std::any::Any; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::SeqCst; -use std::sync::{Arc, Mutex}; -use std::thread; -use std::usize; - -mod queue; - -/// The transmission end of a channel which is used to send values. -/// -/// This is created by the `channel` method. -#[derive(Debug)] -pub struct Sender<T> { - // Channel state shared between the sender and receiver. - inner: Arc<Inner<T>>, - - // Handle to the task that is blocked on this sender. This handle is sent - // to the receiver half in order to be notified when the sender becomes - // unblocked. - sender_task: Arc<Mutex<SenderTask>>, - - // True if the sender might be blocked. This is an optimization to avoid - // having to lock the mutex most of the time. - maybe_parked: bool, -} - -/// The transmission end of a channel which is used to send values. -/// -/// This is created by the `unbounded` method. -#[derive(Debug)] -pub struct UnboundedSender<T>(Sender<T>); - -trait AssertKinds: Send + Sync + Clone {} -impl AssertKinds for UnboundedSender<u32> {} - - -/// The receiving end of a channel which implements the `Stream` trait. -/// -/// This is a concrete implementation of a stream which can be used to represent -/// a stream of values being computed elsewhere. This is created by the -/// `channel` method. -#[derive(Debug)] -pub struct Receiver<T> { - inner: Arc<Inner<T>>, -} - -/// Error type for sending, used when the receiving end of a channel is -/// dropped -#[derive(Clone, PartialEq, Eq)] -pub struct SendError<T>(T); - -/// Error type returned from `try_send` -#[derive(Clone, PartialEq, Eq)] -pub struct TrySendError<T> { - kind: TrySendErrorKind<T>, -} - -#[derive(Clone, PartialEq, Eq)] -enum TrySendErrorKind<T> { - Full(T), - Disconnected(T), -} - -impl<T> fmt::Debug for SendError<T> { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_tuple("SendError") - .field(&"...") - .finish() - } -} - -impl<T> fmt::Display for SendError<T> { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "send failed because receiver is gone") - } -} - -impl<T: Any> Error for SendError<T> -{ - fn description(&self) -> &str { - "send failed because receiver is gone" - } -} - -impl<T> SendError<T> { - /// Returns the message that was attempted to be sent but failed. - pub fn into_inner(self) -> T { - self.0 - } -} - -impl<T> fmt::Debug for TrySendError<T> { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_tuple("TrySendError") - .field(&"...") - .finish() - } -} - -impl<T> fmt::Display for TrySendError<T> { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - if self.is_full() { - write!(fmt, "send failed because channel is full") - } else { - write!(fmt, "send failed because receiver is gone") - } - } -} - -impl<T: Any> Error for TrySendError<T> { - fn description(&self) -> &str { - if self.is_full() { - "send failed because channel is full" - } else { - "send failed because receiver is gone" - } - } -} - -impl<T> TrySendError<T> { - /// Returns true if this error is a result of the channel being full - pub fn is_full(&self) -> bool { - use self::TrySendErrorKind::*; - - match self.kind { - Full(_) => true, - _ => false, - } - } - - /// Returns true if this error is a result of the receiver being dropped - pub fn is_disconnected(&self) -> bool { - use self::TrySendErrorKind::*; - - match self.kind { - Disconnected(_) => true, - _ => false, - } - } - - /// Returns the message that was attempted to be sent but failed. - pub fn into_inner(self) -> T { - use self::TrySendErrorKind::*; - - match self.kind { - Full(v) | Disconnected(v) => v, - } - } -} - -#[derive(Debug)] -struct Inner<T> { - // Max buffer size of the channel. If `None` then the channel is unbounded. - buffer: Option<usize>, - - // Internal channel state. Consists of the number of messages stored in the - // channel as well as a flag signalling that the channel is closed. - state: AtomicUsize, - - // Atomic, FIFO queue used to send messages to the receiver - message_queue: Queue<Option<T>>, - - // Atomic, FIFO queue used to send parked task handles to the receiver. - parked_queue: Queue<Arc<Mutex<SenderTask>>>, - - // Number of senders in existence - num_senders: AtomicUsize, - - // Handle to the receiver's task. - recv_task: Mutex<ReceiverTask>, -} - -// Struct representation of `Inner::state`. -#[derive(Debug, Clone, Copy)] -struct State { - // `true` when the channel is open - is_open: bool, - - // Number of messages in the channel - num_messages: usize, -} - -#[derive(Debug)] -struct ReceiverTask { - unparked: bool, - task: Option<Task>, -} - -// Returned from Receiver::try_park() -enum TryPark { - Parked, - Closed, - NotEmpty, -} - -// The `is_open` flag is stored in the left-most bit of `Inner::state` -const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1); - -// When a new channel is created, it is created in the open state with no -// pending messages. -const INIT_STATE: usize = OPEN_MASK; - -// The maximum number of messages that a channel can track is `usize::MAX >> 1` -const MAX_CAPACITY: usize = !(OPEN_MASK); - -// The maximum requested buffer size must be less than the maximum capacity of -// a channel. This is because each sender gets a guaranteed slot. -const MAX_BUFFER: usize = MAX_CAPACITY >> 1; - -// Sent to the consumer to wake up blocked producers -#[derive(Debug)] -struct SenderTask { - task: Option<Task>, - is_parked: bool, -} - -impl SenderTask { - fn new() -> Self { - SenderTask { - task: None, - is_parked: false, - } - } - - fn notify(&mut self) { - self.is_parked = false; - - if let Some(task) = self.task.take() { - task.notify(); - } - } -} - -/// Creates an in-memory channel implementation of the `Stream` trait with -/// bounded capacity. -/// -/// This method creates a concrete implementation of the `Stream` trait which -/// can be used to send values across threads in a streaming fashion. This -/// channel is unique in that it implements back pressure to ensure that the -/// sender never outpaces the receiver. The channel capacity is equal to -/// `buffer + num-senders`. In other words, each sender gets a guaranteed slot -/// in the channel capacity, and on top of that there are `buffer` "first come, -/// first serve" slots available to all senders. -/// -/// The `Receiver` returned implements the `Stream` trait and has access to any -/// number of the associated combinators for transforming the result. -pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { - // Check that the requested buffer size does not exceed the maximum buffer - // size permitted by the system. - assert!(buffer < MAX_BUFFER, "requested buffer size too large"); - channel2(Some(buffer)) -} - -/// Creates an in-memory channel implementation of the `Stream` trait with -/// unbounded capacity. -/// -/// This method creates a concrete implementation of the `Stream` trait which -/// can be used to send values across threads in a streaming fashion. A `send` -/// on this channel will always succeed as long as the receive half has not -/// been closed. If the receiver falls behind, messages will be buffered -/// internally. -/// -/// **Note** that the amount of available system memory is an implicit bound to -/// the channel. Using an `unbounded` channel has the ability of causing the -/// process to run out of memory. In this case, the process will be aborted. -pub fn unbounded<T>() -> (UnboundedSender<T>, Receiver<T>) { - let (tx, rx) = channel2(None); - (UnboundedSender(tx), rx) -} - -fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) { - let inner = Arc::new(Inner { - buffer: buffer, - state: AtomicUsize::new(INIT_STATE), - message_queue: Queue::new(), - parked_queue: Queue::new(), - num_senders: AtomicUsize::new(1), - recv_task: Mutex::new(ReceiverTask { - unparked: false, - task: None, - }), - }); - - let tx = Sender { - inner: inner.clone(), - sender_task: Arc::new(Mutex::new(SenderTask::new())), - maybe_parked: false, - }; - - let rx = Receiver { - inner: inner, - }; - - (tx, rx) -} - -/* - * - * ===== impl Sender ===== - * - */ - -impl<T> Sender<T> { - /// Attempts to send a message on this `Sender<T>` without blocking. - /// - /// This function, unlike `start_send`, is safe to call whether it's being - /// called on a task or not. Note that this function, however, will *not* - /// attempt to block the current task if the message cannot be sent. - /// - /// It is not recommended to call this function from inside of a future, - /// only from an external thread where you've otherwise arranged to be - /// notified when the channel is no longer full. - pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { - // If the sender is currently blocked, reject the message - if !self.poll_unparked(false).is_ready() { - return Err(TrySendError { - kind: TrySendErrorKind::Full(msg), - }); - } - - // The channel has capacity to accept the message, so send it - self.do_send(Some(msg), false) - .map_err(|SendError(v)| { - TrySendError { - kind: TrySendErrorKind::Disconnected(v), - } - }) - } - - // Do the send without failing - // None means close - fn do_send(&mut self, msg: Option<T>, do_park: bool) -> Result<(), SendError<T>> { - // First, increment the number of messages contained by the channel. - // This operation will also atomically determine if the sender task - // should be parked. - // - // None is returned in the case that the channel has been closed by the - // receiver. This happens when `Receiver::close` is called or the - // receiver is dropped. - let park_self = match self.inc_num_messages(msg.is_none()) { - Some( |