summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-01-22 11:37:26 -0800
committerGitHub <noreply@github.com>2019-01-22 11:37:26 -0800
commit13083153aad0750bfee5772d633c725b70f5d243 (patch)
treee16419be74a39cf60efc7b235db6991e81ccd6cf
parent91f20e33a41108be2b42a8593c70cbff68a23bd6 (diff)
Introduce tokio-sync crate containing synchronization primitives. (#839)
Introduce a tokio-sync crate containing useful synchronization primitives for programs written using Tokio. The initial release contains: * An mpsc channel * A oneshot channel * A semaphore implementation * An `AtomicTask` primitive. The `oneshot` and `mpsc` channels are new implementations providing improved performance characteristics. In some benchmarks, the new mpsc channel shows up to 7x improvement over the version provided by the `futures` crate. Unfortunately, the `oneshot` implementation only provides a slight performance improvement as it is mostly limited by the `futures` 0.1 task system. Once updated to the `std` version of `Future` (currently nightly only), much greater performance improvements should be achievable by `oneshot`. Additionally, he implementations provided here are checked using [Loom](http://github.com/carllerche/loom/), which provides greater confidence of correctness.
-rw-r--r--.appveyor.yml1
-rw-r--r--.travis.yml2
-rw-r--r--Cargo.toml5
-rw-r--r--src/lib.rs4
-rw-r--r--src/sync.rs16
-rw-r--r--tokio-channel/LICENSE51
-rw-r--r--tokio-channel/src/lib.rs14
-rw-r--r--tokio-channel/src/lock.rs105
-rw-r--r--tokio-channel/src/mpsc/mod.rs989
-rw-r--r--tokio-channel/src/mpsc/queue.rs151
-rw-r--r--tokio-channel/src/oneshot.rs426
-rw-r--r--tokio-channel/tests/mpsc-close.rs22
-rw-r--r--tokio-channel/tests/mpsc.rs481
-rw-r--r--tokio-channel/tests/oneshot.rs124
-rw-r--r--tokio-channel/tests/support/mod.rs16
-rw-r--r--tokio-sync/CHANGELOG.md (renamed from tokio-channel/CHANGELOG.md)0
-rw-r--r--tokio-sync/Cargo.toml (renamed from tokio-channel/Cargo.toml)15
-rw-r--r--tokio-sync/LICENSE25
-rw-r--r--tokio-sync/README.md (renamed from tokio-channel/README.md)0
-rw-r--r--tokio-sync/benches/mpsc.rs280
-rw-r--r--tokio-sync/benches/oneshot.rs213
-rw-r--r--tokio-sync/src/lib.rs29
-rw-r--r--tokio-sync/src/loom.rs36
-rw-r--r--tokio-sync/src/mpsc/block.rs395
-rw-r--r--tokio-sync/src/mpsc/bounded.rs243
-rw-r--r--tokio-sync/src/mpsc/chan.rs402
-rw-r--r--tokio-sync/src/mpsc/list.rs329
-rw-r--r--tokio-sync/src/mpsc/mod.rs79
-rw-r--r--tokio-sync/src/mpsc/rx.rs1
-rw-r--r--tokio-sync/src/mpsc/tx.rs0
-rw-r--r--tokio-sync/src/mpsc/unbounded.rs157
-rw-r--r--tokio-sync/src/oneshot.rs509
-rw-r--r--tokio-sync/src/semaphore.rs1116
-rw-r--r--tokio-sync/src/task/atomic_task.rs286
-rw-r--r--tokio-sync/src/task/mod.rs5
-rw-r--r--tokio-sync/tests/atomic_task.rs14
-rw-r--r--tokio-sync/tests/fuzz_atomic_task.rs54
-rw-r--r--tokio-sync/tests/fuzz_list.rs71
-rw-r--r--tokio-sync/tests/fuzz_oneshot.rs23
-rw-r--r--tokio-sync/tests/fuzz_semaphore.rs134
-rw-r--r--tokio-sync/tests/mpsc.rs288
-rw-r--r--tokio-sync/tests/oneshot.rs244
-rw-r--r--tokio-sync/tests/semaphore.rs174
43 files changed, 5145 insertions, 2384 deletions
diff --git a/.appveyor.yml b/.appveyor.yml
index 4674a388..d04ab464 100644
--- a/.appveyor.yml
+++ b/.appveyor.yml
@@ -11,6 +11,7 @@ install:
- rustup-init.exe -y --default-host %TARGET%
- set PATH=%PATH%;C:\Users\appveyor\.cargo\bin
- set RUST_BACKTRACE=1
+ - set LOOM_MAX_DURATION=10
- rustc -V
- cargo -V
diff --git a/.travis.yml b/.travis.yml
index d95ac7a9..8ac0308a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -101,6 +101,8 @@ script: |
cargo check --all --exclude tokio-tls --target $TARGET
cargo check --tests --all --exclude tokio-tls --target $TARGET
else
+ # Limit the execution time of loom tests.
+ export LOOM_MAX_DURATION=10
cargo test --all --no-fail-fast
cargo doc --all
fi
diff --git a/Cargo.toml b/Cargo.toml
index 97f3b128..cc729ed7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -26,7 +26,6 @@ members = [
"./",
"tokio-async-await",
"tokio-buf",
- "tokio-channel",
"tokio-codec",
"tokio-current-thread",
"tokio-executor",
@@ -34,6 +33,7 @@ members = [
"tokio-io",
"tokio-reactor",
"tokio-signal",
+ "tokio-sync",
"tokio-threadpool",
"tokio-timer",
"tokio-tcp",
@@ -49,6 +49,7 @@ default = [
"io",
"reactor",
"rt-full",
+ "sync",
"tcp",
"timer",
"udp",
@@ -67,6 +68,7 @@ rt-full = [
"tokio-executor",
"tokio-threadpool",
]
+sync = ["tokio-sync"]
tcp = ["tokio-tcp"]
timer = ["tokio-timer"]
udp = ["tokio-udp"]
@@ -95,6 +97,7 @@ tokio-fs = { version = "0.1.3", path = "tokio-fs", optional = true }
tokio-io = { version = "0.1.6", path = "tokio-io", optional = true }
tokio-executor = { version = "0.1.5", path = "tokio-executor", optional = true }
tokio-reactor = { version = "0.1.1", path = "tokio-reactor", optional = true }
+tokio-sync = { version = "0.1.0", path = "tokio-sync", optional = true }
tokio-threadpool = { version = "0.1.8", path = "tokio-threadpool", optional = true }
tokio-tcp = { version = "0.1.0", path = "tokio-tcp", optional = true }
tokio-udp = { version = "0.1.0", path = "tokio-udp", optional = true }
diff --git a/src/lib.rs b/src/lib.rs
index a9ee31c2..1828675f 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -100,6 +100,8 @@ extern crate tokio_fs;
extern crate tokio_reactor;
#[cfg(feature = "rt-full")]
extern crate tokio_threadpool;
+#[cfg(feature = "sync")]
+extern crate tokio_sync;
#[cfg(feature = "timer")]
extern crate tokio_timer;
#[cfg(feature = "tcp")]
@@ -126,6 +128,8 @@ pub mod net;
pub mod prelude;
#[cfg(feature = "reactor")]
pub mod reactor;
+#[cfg(feature = "sync")]
+pub mod sync;
#[cfg(feature = "timer")]
pub mod timer;
pub mod util;
diff --git a/src/sync.rs b/src/sync.rs
new file mode 100644
index 00000000..52b8787b
--- /dev/null
+++ b/src/sync.rs
@@ -0,0 +1,16 @@
+//! Future-aware synchronization
+//!
+//! This module is enabled with the **`sync`** feature flag.
+//!
+//! Tasks sometimes need to communicate with each other. This module contains
+//! two basic abstractions for doing so:
+//!
+//! - [oneshot](oneshot/index.html), a way of sending a single value
+//! from one task to another.
+//! - [mpsc](mpsc/index.html), a multi-producer, single-consumer channel for
+//! sending values between tasks.
+
+pub use tokio_sync::{
+ mpsc,
+ oneshot,
+};
diff --git a/tokio-channel/LICENSE b/tokio-channel/LICENSE
deleted file mode 100644
index 37effb0f..00000000
--- a/tokio-channel/LICENSE
+++ /dev/null
@@ -1,51 +0,0 @@
-Copyright (c) 2019 Tokio Contributors
-
-Permission is hereby granted, free of charge, to any
-person obtaining a copy of this software and associated
-documentation files (the "Software"), to deal in the
-Software without restriction, including without
-limitation the rights to use, copy, modify, merge,
-publish, distribute, sublicense, and/or sell copies of
-the Software, and to permit persons to whom the Software
-is furnished to do so, subject to the following
-conditions:
-
-The above copyright notice and this permission notice
-shall be included in all copies or substantial portions
-of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
-ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
-TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
-PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
-SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
-CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
-OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
-IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
-DEALINGS IN THE SOFTWARE.
-
-Copyright (c) 2016 futures-rs Contributors
-
-Permission is hereby granted, free of charge, to any
-person obtaining a copy of this software and associated
-documentation files (the "Software"), to deal in the
-Software without restriction, including without
-limitation the rights to use, copy, modify, merge,
-publish, distribute, sublicense, and/or sell copies of
-the Software, and to permit persons to whom the Software
-is furnished to do so, subject to the following
-conditions:
-
-The above copyright notice and this permission notice
-shall be included in all copies or substantial portions
-of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
-ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
-TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
-PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
-SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
-CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
-OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
-IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
-DEALINGS IN THE SOFTWARE.
diff --git a/tokio-channel/src/lib.rs b/tokio-channel/src/lib.rs
deleted file mode 100644
index 9b1b7b52..00000000
--- a/tokio-channel/src/lib.rs
+++ /dev/null
@@ -1,14 +0,0 @@
-#![doc(html_root_url = "https://docs.rs/tokio-channel/0.1.0")]
-#![deny(missing_docs, warnings, missing_debug_implementations)]
-
-//! Asynchronous channels.
-//!
-//! This crate provides channels that can be used to communicate between
-//! asynchronous tasks.
-
-extern crate futures;
-
-pub mod mpsc;
-pub mod oneshot;
-
-mod lock;
diff --git a/tokio-channel/src/lock.rs b/tokio-channel/src/lock.rs
deleted file mode 100644
index d2acf0a6..00000000
--- a/tokio-channel/src/lock.rs
+++ /dev/null
@@ -1,105 +0,0 @@
-//! A "mutex" which only supports `try_lock`
-//!
-//! As a futures library the eventual call to an event loop should be the only
-//! thing that ever blocks, so this is assisted with a fast user-space
-//! implementation of a lock that can only have a `try_lock` operation.
-
-use std::cell::UnsafeCell;
-use std::ops::{Deref, DerefMut};
-use std::sync::atomic::Ordering::SeqCst;
-use std::sync::atomic::AtomicBool;
-
-/// A "mutex" around a value, similar to `std::sync::Mutex<T>`.
-///
-/// This lock only supports the `try_lock` operation, however, and does not
-/// implement poisoning.
-#[derive(Debug)]
-pub struct Lock<T> {
- locked: AtomicBool,
- data: UnsafeCell<T>,
-}
-
-/// Sentinel representing an acquired lock through which the data can be
-/// accessed.
-pub struct TryLock<'a, T: 'a> {
- __ptr: &'a Lock<T>,
-}
-
-// The `Lock` structure is basically just a `Mutex<T>`, and these two impls are
-// intended to mirror the standard library's corresponding impls for `Mutex<T>`.
-//
-// If a `T` is sendable across threads, so is the lock, and `T` must be sendable
-// across threads to be `Sync` because it allows mutable access from multiple
-// threads.
-unsafe impl<T: Send> Send for Lock<T> {}
-unsafe impl<T: Send> Sync for Lock<T> {}
-
-impl<T> Lock<T> {
- /// Creates a new lock around the given value.
- pub fn new(t: T) -> Lock<T> {
- Lock {
- locked: AtomicBool::new(false),
- data: UnsafeCell::new(t),
- }
- }
-
- /// Attempts to acquire this lock, returning whether the lock was acquired or
- /// not.
- ///
- /// If `Some` is returned then the data this lock protects can be accessed
- /// through the sentinel. This sentinel allows both mutable and immutable
- /// access.
- ///
- /// If `None` is returned then the lock is already locked, either elsewhere
- /// on this thread or on another thread.
- pub fn try_lock(&self) -> Option<TryLock<T>> {
- if !self.locked.swap(true, SeqCst) {
- Some(TryLock { __ptr: self })
- } else {
- None
- }
- }
-}
-
-impl<'a, T> Deref for TryLock<'a, T> {
- type Target = T;
- fn deref(&self) -> &T {
- // The existence of `TryLock` represents that we own the lock, so we
- // can safely access the data here.
- unsafe { &*self.__ptr.data.get() }
- }
-}
-
-impl<'a, T> DerefMut for TryLock<'a, T> {
- fn deref_mut(&mut self) -> &mut T {
- // The existence of `TryLock` represents that we own the lock, so we
- // can safely access the data here.
- //
- // Additionally, we're the *only* `TryLock` in existence so mutable
- // access should be ok.
- unsafe { &mut *self.__ptr.data.get() }
- }
-}
-
-impl<'a, T> Drop for TryLock<'a, T> {
- fn drop(&mut self) {
- self.__ptr.locked.store(false, SeqCst);
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::Lock;
-
- #[test]
- fn smoke() {
- let a = Lock::new(1);
- let mut a1 = a.try_lock().unwrap();
- assert!(a.try_lock().is_none());
- assert_eq!(*a1, 1);
- *a1 = 2;
- drop(a1);
- assert_eq!(*a.try_lock().unwrap(), 2);
- assert_eq!(*a.try_lock().unwrap(), 2);
- }
-}
diff --git a/tokio-channel/src/mpsc/mod.rs b/tokio-channel/src/mpsc/mod.rs
deleted file mode 100644
index 21abd55d..00000000
--- a/tokio-channel/src/mpsc/mod.rs
+++ /dev/null
@@ -1,989 +0,0 @@
-//! A multi-producer, single-consumer, futures-aware, FIFO queue with back pressure.
-//!
-//! A channel can be used as a communication primitive between tasks running on
-//! `futures-rs` executors. Channel creation provides `Receiver` and `Sender`
-//! handles. `Receiver` implements `Stream` and allows a task to read values
-//! out of the channel. If there is no message to read from the channel, the
-//! current task will be notified when a new value is sent. `Sender` implements
-//! the `Sink` trait and allows a task to send messages into the channel. If
-//! the channel is at capacity, then send will be rejected and the task will be
-//! notified when additional capacity is available.
-//!
-//! # Disconnection
-//!
-//! When all `Sender` handles have been dropped, it is no longer possible to
-//! send values into the channel. This is considered the termination event of
-//! the stream. As such, `Sender::poll` will return `Ok(Ready(None))`.
-//!
-//! If the receiver handle is dropped, then messages can no longer be read out
-//! of the channel. In this case, a `send` will result in an error.
-//!
-//! # Clean Shutdown
-//!
-//! If the `Receiver` is simply dropped, then it is possible for there to be
-//! messages still in the channel that will not be processed. As such, it is
-//! usually desirable to perform a "clean" shutdown. To do this, the receiver
-//! will first call `close`, which will prevent any further messages to be sent
-//! into the channel. Then, the receiver consumes the channel to completion, at
-//! which point the receiver can be dropped.
-
-// At the core, the channel uses an atomic FIFO queue for message passing. This
-// queue is used as the primary coordination primitive. In order to enforce
-// capacity limits and handle back pressure, a secondary FIFO queue is used to
-// send parked task handles.
-//
-// The general idea is that the channel is created with a `buffer` size of `n`.
-// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
-// slot to hold a message. This allows `Sender` to know for a fact that a send
-// will succeed *before* starting to do the actual work of sending the value.
-// Since most of this work is lock-free, once the work starts, it is impossible
-// to safely revert.
-//
-// If the sender is unable to process a send operation, then the current
-// task is parked and the handle is sent on the parked task queue.
-//
-// Note that the implementation guarantees that the channel capacity will never
-// exceed the configured limit, however there is no *strict* guarantee that the
-// receiver will wake up a parked task *immediately* when a slot becomes
-// available. However, it will almost always unpark a task when a slot becomes
-// available and it is *guaranteed* that a sender will be unparked when the
-// message that caused the sender to become parked is read out of the channel.
-//
-// The steps for sending a message are roughly:
-//
-// 1) Increment the channel message count
-// 2) If the channel is at capacity, push the task handle onto the wait queue
-// 3) Push the message onto the message queue.
-//
-// The steps for receiving a message are roughly:
-//
-// 1) Pop a message from the message queue
-// 2) Pop a task handle from the wait queue
-// 3) Decrement the channel message count.
-//
-// It's important for the order of operations on lock-free structures to happen
-// in reverse order between the sender and receiver. This makes the message
-// queue the primary coordination structure and establishes the necessary
-// happens-before semantics required for the acquire / release semantics used
-// by the queue structure.
-
-
-
-use mpsc::queue::{Queue, PopResult};
-
-use futures::task::{self, Task};
-use futures::{Async, AsyncSink, Poll, StartSend, Sink, Stream};
-
-use std::fmt;
-use std::error::Error;
-use std::any::Any;
-use std::sync::atomic::AtomicUsize;
-use std::sync::atomic::Ordering::SeqCst;
-use std::sync::{Arc, Mutex};
-use std::thread;
-use std::usize;
-
-mod queue;
-
-/// The transmission end of a channel which is used to send values.
-///
-/// This is created by the `channel` method.
-#[derive(Debug)]
-pub struct Sender<T> {
- // Channel state shared between the sender and receiver.
- inner: Arc<Inner<T>>,
-
- // Handle to the task that is blocked on this sender. This handle is sent
- // to the receiver half in order to be notified when the sender becomes
- // unblocked.
- sender_task: Arc<Mutex<SenderTask>>,
-
- // True if the sender might be blocked. This is an optimization to avoid
- // having to lock the mutex most of the time.
- maybe_parked: bool,
-}
-
-/// The transmission end of a channel which is used to send values.
-///
-/// This is created by the `unbounded` method.
-#[derive(Debug)]
-pub struct UnboundedSender<T>(Sender<T>);
-
-trait AssertKinds: Send + Sync + Clone {}
-impl AssertKinds for UnboundedSender<u32> {}
-
-
-/// The receiving end of a channel which implements the `Stream` trait.
-///
-/// This is a concrete implementation of a stream which can be used to represent
-/// a stream of values being computed elsewhere. This is created by the
-/// `channel` method.
-#[derive(Debug)]
-pub struct Receiver<T> {
- inner: Arc<Inner<T>>,
-}
-
-/// Error type for sending, used when the receiving end of a channel is
-/// dropped
-#[derive(Clone, PartialEq, Eq)]
-pub struct SendError<T>(T);
-
-/// Error type returned from `try_send`
-#[derive(Clone, PartialEq, Eq)]
-pub struct TrySendError<T> {
- kind: TrySendErrorKind<T>,
-}
-
-#[derive(Clone, PartialEq, Eq)]
-enum TrySendErrorKind<T> {
- Full(T),
- Disconnected(T),
-}
-
-impl<T> fmt::Debug for SendError<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
- fmt.debug_tuple("SendError")
- .field(&"...")
- .finish()
- }
-}
-
-impl<T> fmt::Display for SendError<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
- write!(fmt, "send failed because receiver is gone")
- }
-}
-
-impl<T: Any> Error for SendError<T>
-{
- fn description(&self) -> &str {
- "send failed because receiver is gone"
- }
-}
-
-impl<T> SendError<T> {
- /// Returns the message that was attempted to be sent but failed.
- pub fn into_inner(self) -> T {
- self.0
- }
-}
-
-impl<T> fmt::Debug for TrySendError<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
- fmt.debug_tuple("TrySendError")
- .field(&"...")
- .finish()
- }
-}
-
-impl<T> fmt::Display for TrySendError<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
- if self.is_full() {
- write!(fmt, "send failed because channel is full")
- } else {
- write!(fmt, "send failed because receiver is gone")
- }
- }
-}
-
-impl<T: Any> Error for TrySendError<T> {
- fn description(&self) -> &str {
- if self.is_full() {
- "send failed because channel is full"
- } else {
- "send failed because receiver is gone"
- }
- }
-}
-
-impl<T> TrySendError<T> {
- /// Returns true if this error is a result of the channel being full
- pub fn is_full(&self) -> bool {
- use self::TrySendErrorKind::*;
-
- match self.kind {
- Full(_) => true,
- _ => false,
- }
- }
-
- /// Returns true if this error is a result of the receiver being dropped
- pub fn is_disconnected(&self) -> bool {
- use self::TrySendErrorKind::*;
-
- match self.kind {
- Disconnected(_) => true,
- _ => false,
- }
- }
-
- /// Returns the message that was attempted to be sent but failed.
- pub fn into_inner(self) -> T {
- use self::TrySendErrorKind::*;
-
- match self.kind {
- Full(v) | Disconnected(v) => v,
- }
- }
-}
-
-#[derive(Debug)]
-struct Inner<T> {
- // Max buffer size of the channel. If `None` then the channel is unbounded.
- buffer: Option<usize>,
-
- // Internal channel state. Consists of the number of messages stored in the
- // channel as well as a flag signalling that the channel is closed.
- state: AtomicUsize,
-
- // Atomic, FIFO queue used to send messages to the receiver
- message_queue: Queue<Option<T>>,
-
- // Atomic, FIFO queue used to send parked task handles to the receiver.
- parked_queue: Queue<Arc<Mutex<SenderTask>>>,
-
- // Number of senders in existence
- num_senders: AtomicUsize,
-
- // Handle to the receiver's task.
- recv_task: Mutex<ReceiverTask>,
-}
-
-// Struct representation of `Inner::state`.
-#[derive(Debug, Clone, Copy)]
-struct State {
- // `true` when the channel is open
- is_open: bool,
-
- // Number of messages in the channel
- num_messages: usize,
-}
-
-#[derive(Debug)]
-struct ReceiverTask {
- unparked: bool,
- task: Option<Task>,
-}
-
-// Returned from Receiver::try_park()
-enum TryPark {
- Parked,
- Closed,
- NotEmpty,
-}
-
-// The `is_open` flag is stored in the left-most bit of `Inner::state`
-const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1);
-
-// When a new channel is created, it is created in the open state with no
-// pending messages.
-const INIT_STATE: usize = OPEN_MASK;
-
-// The maximum number of messages that a channel can track is `usize::MAX >> 1`
-const MAX_CAPACITY: usize = !(OPEN_MASK);
-
-// The maximum requested buffer size must be less than the maximum capacity of
-// a channel. This is because each sender gets a guaranteed slot.
-const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
-
-// Sent to the consumer to wake up blocked producers
-#[derive(Debug)]
-struct SenderTask {
- task: Option<Task>,
- is_parked: bool,
-}
-
-impl SenderTask {
- fn new() -> Self {
- SenderTask {
- task: None,
- is_parked: false,
- }
- }
-
- fn notify(&mut self) {
- self.is_parked = false;
-
- if let Some(task) = self.task.take() {
- task.notify();
- }
- }
-}
-
-/// Creates an in-memory channel implementation of the `Stream` trait with
-/// bounded capacity.
-///
-/// This method creates a concrete implementation of the `Stream` trait which
-/// can be used to send values across threads in a streaming fashion. This
-/// channel is unique in that it implements back pressure to ensure that the
-/// sender never outpaces the receiver. The channel capacity is equal to
-/// `buffer + num-senders`. In other words, each sender gets a guaranteed slot
-/// in the channel capacity, and on top of that there are `buffer` "first come,
-/// first serve" slots available to all senders.
-///
-/// The `Receiver` returned implements the `Stream` trait and has access to any
-/// number of the associated combinators for transforming the result.
-pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
- // Check that the requested buffer size does not exceed the maximum buffer
- // size permitted by the system.
- assert!(buffer < MAX_BUFFER, "requested buffer size too large");
- channel2(Some(buffer))
-}
-
-/// Creates an in-memory channel implementation of the `Stream` trait with
-/// unbounded capacity.
-///
-/// This method creates a concrete implementation of the `Stream` trait which
-/// can be used to send values across threads in a streaming fashion. A `send`
-/// on this channel will always succeed as long as the receive half has not
-/// been closed. If the receiver falls behind, messages will be buffered
-/// internally.
-///
-/// **Note** that the amount of available system memory is an implicit bound to
-/// the channel. Using an `unbounded` channel has the ability of causing the
-/// process to run out of memory. In this case, the process will be aborted.
-pub fn unbounded<T>() -> (UnboundedSender<T>, Receiver<T>) {
- let (tx, rx) = channel2(None);
- (UnboundedSender(tx), rx)
-}
-
-fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
- let inner = Arc::new(Inner {
- buffer: buffer,
- state: AtomicUsize::new(INIT_STATE),
- message_queue: Queue::new(),
- parked_queue: Queue::new(),
- num_senders: AtomicUsize::new(1),
- recv_task: Mutex::new(ReceiverTask {
- unparked: false,
- task: None,
- }),
- });
-
- let tx = Sender {
- inner: inner.clone(),
- sender_task: Arc::new(Mutex::new(SenderTask::new())),
- maybe_parked: false,
- };
-
- let rx = Receiver {
- inner: inner,
- };
-
- (tx, rx)
-}
-
-/*
- *
- * ===== impl Sender =====
- *
- */
-
-impl<T> Sender<T> {
- /// Attempts to send a message on this `Sender<T>` without blocking.
- ///
- /// This function, unlike `start_send`, is safe to call whether it's being
- /// called on a task or not. Note that this function, however, will *not*
- /// attempt to block the current task if the message cannot be sent.
- ///
- /// It is not recommended to call this function from inside of a future,
- /// only from an external thread where you've otherwise arranged to be
- /// notified when the channel is no longer full.
- pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
- // If the sender is currently blocked, reject the message
- if !self.poll_unparked(false).is_ready() {
- return Err(TrySendError {
- kind: TrySendErrorKind::Full(msg),
- });
- }
-
- // The channel has capacity to accept the message, so send it
- self.do_send(Some(msg), false)
- .map_err(|SendError(v)| {
- TrySendError {
- kind: TrySendErrorKind::Disconnected(v),
- }
- })
- }
-
- // Do the send without failing
- // None means close
- fn do_send(&mut self, msg: Option<T>, do_park: bool) -> Result<(), SendError<T>> {
- // First, increment the number of messages contained by the channel.
- // This operation will also atomically determine if the sender task
- // should be parked.
- //
- // None is returned in the case that the channel has been closed by the
- // receiver. This happens when `Receiver::close` is called or the
- // receiver is dropped.
- let park_self = match self.inc_num_messages(msg.is_none()) {
- Some(