diff options
author | Carl Lerche <me@carllerche.com> | 2019-10-29 15:11:31 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-29 15:11:31 -0700 |
commit | 2b909d6805990abf0bc2a5dea9e7267ff87df704 (patch) | |
tree | de255969c720c294af754b3840efabff3e6d69a0 /tokio | |
parent | c62ef2d232dea1535a8e22484fa2ca083f03e903 (diff) |
sync: move into `tokio` crate (#1705)
A step towards collapsing Tokio sub crates into a single `tokio`
crate (#1318).
The sync implementation is now provided by the main `tokio` crate.
Functionality can be opted out of by using the various net related
feature flags.
Diffstat (limited to 'tokio')
46 files changed, 6826 insertions, 32 deletions
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 569a3fe7..a85f0c7a 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -63,7 +63,7 @@ signal = [ "net-driver", "signal-hook-registry" ] -sync = ["tokio-sync"] +sync = ["fnv"] tcp = ["io", "net-driver"] timer = ["crossbeam-utils", "slab"] udp = ["io", "net-driver"] @@ -81,6 +81,8 @@ process = [ ] [dependencies] +tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" } + futures-core-preview = "=0.3.0-alpha.19" futures-sink-preview = "=0.3.0-alpha.19" futures-util-preview = { version = "=0.3.0-alpha.19", features = ["sink", "channel"] } @@ -89,6 +91,7 @@ futures-util-preview = { version = "=0.3.0-alpha.19", features = ["sink", "chann bytes = { version = "0.4", optional = true } crossbeam-channel = { version = "0.3.8", optional = true } crossbeam-utils = { version = "0.6.0", optional = true } +fnv = { version = "1.0.6", optional = true } iovec = { version = "0.1", optional = true } lazy_static = { version = "1.0.2", optional = true } memchr = { version = "2.2", optional = true } @@ -97,8 +100,6 @@ num_cpus = { version = "1.8.0", optional = true } pin-project = { version = "0.4", optional = true } # Backs `DelayQueue` slab = { version = "0.4.1", optional = true } -tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" } -tokio-sync = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-sync", features = ["async-traits"] } [target.'cfg(unix)'.dependencies] crossbeam-queue = { version = "0.1.2", optional = true } @@ -124,7 +125,7 @@ flate2 = { version = "1", features = ["tokio"] } http = "0.1" httparse = "1.0" libc = "0.2" -loom = { version = "0.2.11", features = ["futures", "checkpoint"] } +loom = { version = "0.2.12", features = ["futures", "checkpoint"] } num_cpus = "1.0" rand = "0.7.2" serde = { version = "1.0", features = ["derive"] } diff --git a/tokio/benches/mpsc.rs b/tokio/benches/mpsc.rs new file mode 100644 index 00000000..0b97d55d --- /dev/null +++ b/tokio/benches/mpsc.rs @@ -0,0 +1,270 @@ +#![feature(test)] +#![warn(rust_2018_idioms)] + +extern crate test; + +use tokio::sync::mpsc::*; + +use futures::{future, Async, Future, Sink, Stream}; +use std::thread; +use test::Bencher; + +type Medium = [usize; 64]; +type Large = [Medium; 64]; + +#[bench] +fn bounded_new_medium(b: &mut Bencher) { + b.iter(|| { + let _ = test::black_box(&channel::<Medium>(1_000)); + }) +} + +#[bench] +fn unbounded_new_medium(b: &mut Bencher) { + b.iter(|| { + let _ = test::black_box(&unbounded_channel::<Medium>()); + }) +} +#[bench] +fn bounded_new_large(b: &mut Bencher) { + b.iter(|| { + let _ = test::black_box(&channel::<Large>(1_000)); + }) +} + +#[bench] +fn unbounded_new_large(b: &mut Bencher) { + b.iter(|| { + let _ = test::black_box(&unbounded_channel::<Large>()); + }) +} + +#[bench] +fn send_one_message(b: &mut Bencher) { + b.iter(|| { + let (mut tx, mut rx) = channel(1_000); + + // Send + tx.try_send(1).unwrap(); + + // Receive + assert_eq!(Async::Ready(Some(1)), rx.poll().unwrap()); + }) +} + +#[bench] +fn send_one_message_large(b: &mut Bencher) { + b.iter(|| { + let (mut tx, mut rx) = channel::<Large>(1_000); + + // Send + let _ = tx.try_send([[0; 64]; 64]); + + // Receive + let _ = test::black_box(&rx.poll()); + }) +} + +#[bench] +fn bounded_rx_not_ready(b: &mut Bencher) { + let (_tx, mut rx) = channel::<i32>(1_000); + b.iter(|| { + future::lazy(|| { + assert!(rx.poll().unwrap().is_not_ready()); + + Ok::<_, ()>(()) + }) + .wait() + .unwrap(); + }) +} + +#[bench] +fn bounded_tx_poll_ready(b: &mut Bencher) { + let (mut tx, _rx) = channel::<i32>(1); + b.iter(|| { + future::lazy(|| { + assert!(tx.poll_ready().unwrap().is_ready()); + + Ok::<_, ()>(()) + }) + .wait() + .unwrap(); + }) +} + +#[bench] +fn bounded_tx_poll_not_ready(b: &mut Bencher) { + let (mut tx, _rx) = channel::<i32>(1); + tx.try_send(1).unwrap(); + b.iter(|| { + future::lazy(|| { + assert!(tx.poll_ready().unwrap().is_not_ready()); + + Ok::<_, ()>(()) + }) + .wait() + .unwrap(); + }) +} + +#[bench] +fn unbounded_rx_not_ready(b: &mut Bencher) { + let (_tx, mut rx) = unbounded_channel::<i32>(); + b.iter(|| { + future::lazy(|| { + assert!(rx.poll().unwrap().is_not_ready()); + + Ok::<_, ()>(()) + }) + .wait() + .unwrap(); + }) +} + +#[bench] +fn unbounded_rx_not_ready_x5(b: &mut Bencher) { + let (_tx, mut rx) = unbounded_channel::<i32>(); + b.iter(|| { + future::lazy(|| { + assert!(rx.poll().unwrap().is_not_ready()); + assert!(rx.poll().unwrap().is_not_ready()); + assert!(rx.poll().unwrap().is_not_ready()); + assert!(rx.poll().unwrap().is_not_ready()); + assert!(rx.poll().unwrap().is_not_ready()); + + Ok::<_, ()>(()) + }) + .wait() + .unwrap(); + }) +} + +#[bench] +fn bounded_uncontended_1(b: &mut Bencher) { + b.iter(|| { + let (mut tx, mut rx) = channel(1_000); + + for i in 0..1000 { + tx.try_send(i).unwrap(); + // No need to create a task, because poll is not going to park. + assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap()); + } + }) +} + +#[bench] +fn bounded_uncontended_1_large(b: &mut Bencher) { + b.iter(|| { + let (mut tx, mut rx) = channel::<Large>(1_000); + + for i in 0..1000 { + let _ = tx.try_send([[i; 64]; 64]); + // No need to create a task, because poll is not going to park. + let _ = test::black_box(&rx.poll()); + } + }) +} + +#[bench] +fn bounded_uncontended_2(b: &mut Bencher) { + b.iter(|| { + let (mut tx, mut rx) = channel(1000); + + for i in 0..1000 { + tx.try_send(i).unwrap(); + } + + for i in 0..1000 { + // No need to create a task, because poll is not going to park. + assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap()); + } + }) +} + +#[bench] +fn contended_unbounded_tx(b: &mut Bencher) { + let mut threads = vec![]; + let mut txs = vec![]; + + for _ in 0..4 { + let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>(); + txs.push(tx); + + threads.push(thread::spawn(move || { + for mut tx in rx.iter() { + for i in 0..1_000 { + tx.try_send(i).unwrap(); + } + } + })); + } + + b.iter(|| { + // TODO make unbounded + let (tx, rx) = channel::<i32>(1_000_000); + + for th in &txs { + th.send(tx.clone()).unwrap(); + } + + drop(tx); + + let rx = rx.wait().take(4 * 1_000); + + for v in rx { + let _ = test::black_box(v); + } + }); + + drop(txs); + + for th in threads { + th.join().unwrap(); + } +} + +#[bench] +fn contended_bounded_tx(b: &mut Bencher) { + const THREADS: usize = 4; + const ITERS: usize = 100; + + let mut threads = vec![]; + let mut txs = vec![]; + + for _ in 0..THREADS { + let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>(); + txs.push(tx); + + threads.push(thread::spawn(move || { + for tx in rx.iter() { + let mut tx = tx.wait(); + for i in 0..ITERS { + tx.send(i as i32).unwrap(); + } + } + })); + } + + b.iter(|| { + let (tx, rx) = channel::<i32>(1); + + for th in &txs { + th.send(tx.clone()).unwrap(); + } + + drop(tx); + + let rx = rx.wait().take(THREADS * ITERS); + + for v in rx { + let _ = test::black_box(v); + } + }); + + drop(txs); + + for th in threads { + th.join().unwrap(); + } +} diff --git a/tokio/benches/oneshot.rs b/tokio/benches/oneshot.rs new file mode 100644 index 00000000..a7f43c2f --- /dev/null +++ b/tokio/benches/oneshot.rs @@ -0,0 +1,120 @@ +#![feature(test)] +#![warn(rust_2018_idioms)] + +extern crate test; + +use tokio::sync::oneshot; + +use futures::{future, Async, Future}; +use test::Bencher; + +#[bench] +fn new(b: &mut Bencher) { + b.iter(|| { + let _ = ::test::black_box(&oneshot::channel::<i32>()); + }) +} + +#[bench] +fn same_thread_send_recv(b: &mut Bencher) { + b.iter(|| { + let (tx, mut rx) = oneshot::channel(); + + let _ = tx.send(1); + + assert_eq!(Async::Ready(1), rx.poll().unwrap()); + }); +} + +#[bench] +fn same_thread_recv_multi_send_recv(b: &mut Bencher) { + b.iter(|| { + let (tx, mut rx) = oneshot::channel(); + + future::lazy(|| { + let _ = rx.poll(); + let _ = rx.poll(); + let _ = rx.poll(); + let _ = rx.poll(); + + let _ = tx.send(1); + assert_eq!(Async::Ready(1), rx.poll().unwrap()); + + Ok::<_, ()>(()) + }) + .wait() + .unwrap(); + }); +} + +#[bench] +fn multi_thread_send_recv(b: &mut Bencher) { + const MAX: usize = 10_000_000; + + use std::thread; + + fn spin<F: Future>(mut f: F) -> Result<F::Item, F::Error> { + use futures::Async::Ready; + loop { + match f.poll() { + Ok(Ready(v)) => return Ok(v), + Ok(_) => {} + Err(e) => return Err(e), + } + } + } + + let mut ping_txs = vec![]; + let mut ping_rxs = vec![]; + let mut pong_txs = vec![]; + let mut pong_rxs = vec![]; + + for _ in 0..MAX { + let (tx, rx) = oneshot::channel::<()>(); + + ping_txs.push(Some(tx)); + ping_rxs.push(Some(rx)); + + let (tx, rx) = oneshot::channel::<()>(); + + pong_txs.push(Some(tx)); + pong_rxs.push(Some(rx)); + } + + thread::spawn(move || { + future::lazy(|| { + for i in 0..MAX { + let ping_rx = ping_rxs[i].take().unwrap(); + let pong_tx = pong_txs[i].take().unwrap(); + + if spin(ping_rx).is_err() { + return Ok(()); + } + + pong_tx.send(()).unwrap(); + } + + Ok::<(), ()>(()) + }) + .wait() + .unwrap(); + }); + + future::lazy(|| { + let mut i = 0; + + b.iter(|| { + let ping_tx = ping_txs[i].take().unwrap(); + let pong_rx = pong_rxs[i].take().unwrap(); + + ping_tx.send(()).unwrap(); + spin(pong_rx).unwrap(); + + i += 1; + }); + + Ok::<(), ()>(()) + }) + .wait() + .unwrap(); +} diff --git a/tokio/benches/thread_pool.rs b/tokio/benches/thread_pool.rs index 3e1462f3..97b25f18 100644 --- a/tokio/benches/thread_pool.rs +++ b/tokio/benches/thread_pool.rs @@ -3,7 +3,7 @@ extern crate test; use tokio::executor::thread_pool::{Builder, Spawner}; -use tokio_sync::oneshot; +use tokio::sync::oneshot; use std::future::Future; use std::pin::Pin; diff --git a/tokio/src/executor/blocking/mod.rs b/tokio/src/executor/blocking/mod.rs index 16faa03e..2ad573d8 100644 --- a/tokio/src/executor/blocking/mod.rs +++ b/tokio/src/executor/blocking/mod.rs @@ -3,7 +3,7 @@ use crate::executor::loom::sync::{Arc, Condvar, Mutex}; use crate::executor::loom::thread; #[cfg(feature = "blocking")] -use tokio_sync::oneshot; +use crate::sync::oneshot; use std::cell::Cell; use std::collections::VecDeque; diff --git a/tokio/src/executor/thread_pool/shutdown.rs b/tokio/src/executor/thread_pool/shutdown.rs index 40d8f04a..b7c4177f 100644 --- a/tokio/src/executor/thread_pool/shutdown.rs +++ b/tokio/src/executor/thread_pool/shutdown.rs @@ -4,8 +4,7 @@ //! dropped, the `Receiver` receives a notification. use crate::executor::loom::sync::Arc; - -use tokio_sync::oneshot; +use crate::sync::oneshot; #[derive(Debug, Clone)] pub(super) struct Sender { diff --git a/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs b/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs index 34a07ea8..9cd99a86 100644 --- a/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs +++ b/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs @@ -3,8 +3,7 @@ use crate::loom::{ atomic::{AtomicUsize, Ordering}, CausalCell, }; - -use tokio_sync::AtomicWaker; +use crate::sync::AtomicWaker; #[derive(Debug)] pub(crate) struct ScheduledIo { diff --git a/tokio/src/net/unix/incoming.rs b/tokio/src/net/unix/incoming.rs index 542b5e1d..a66f21da 100644 --- a/tokio/src/net/unix/incoming.rs +++ b/tokio/src/net/unix/incoming.rs @@ -1,5 +1,3 @@ -#![cfg(feature = "async-traits")] - use super::{UnixListener, UnixStream}; use futures_core::ready; diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index 3a36dc90..3cf8eff3 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -90,7 +90,6 @@ impl UnixListener { /// /// This method returns an implementation of the `Stream` trait which /// resolves to the sockets the are accepted on this listener. - #[cfg(feature = "async-traits")] pub fn incoming(self) -> super::Incoming { super::Incoming::new(self) } diff --git a/tokio/src/net/unix/mod.rs b/tokio/src/net/unix/mod.rs index 4447ca5c..977e3a0f 100644 --- a/tokio/src/net/unix/mod.rs +++ b/tokio/src/net/unix/mod.rs @@ -6,7 +6,6 @@ mod datagram; pub use self::datagram::UnixDatagram; mod incoming; -#[cfg(feature = "async-traits")] pub use self::incoming::Incoming; mod listener; diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs index 7d2fd7a9..e70d0495 100644 --- a/tokio/src/signal/registry.rs +++ b/tokio/src/signal/registry.rs @@ -1,6 +1,6 @@ use crate::signal::os::{OsExtraData, OsStorage}; -use tokio_sync::mpsc::Sender; +use crate::sync::mpsc::Sender; use lazy_static::lazy_static; use std::ops; diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index 075e788c..87871503 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -8,8 +8,7 @@ use crate::io::AsyncRead; use crate::net::util::PollEvented; use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage}; - -use tokio_sync::mpsc::{channel, Receiver}; +use crate::sync::mpsc::{channel, Receiver}; use futures_core::stream::Stream; use libc::c_int; diff --git a/tokio/src/signal/windows.rs b/tokio/src/signal/windows.rs index abde334b..1e68d628 100644 --- a/tokio/src/signal/windows.rs +++ b/tokio/src/signal/windows.rs @@ -7,9 +7,8 @@ #![cfg(windows)] -use super::registry::{globals, EventId, EventInfo, Init, Storage}; - -use tokio_sync::mpsc::{channel, Receiver}; +use crate::signal::registry::{globals, EventId, EventInfo, Init, Storage}; +use crate::sync::mpsc::{channel, Receiver}; use futures_core::stream::Stream; use std::convert::TryFrom; diff --git a/tokio/src/sync/barrier.rs b/tokio/src/sync/barrier.rs new file mode 100644 index 00000000..1582120e --- /dev/null +++ b/tokio/src/sync/barrier.rs @@ -0,0 +1,135 @@ +use crate::sync::watch; + +use std::sync::Mutex; + +/// A barrier enables multiple threads to synchronize the beginning of some computation. +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use tokio::sync::Barrier; +/// use std::sync::Arc; +/// use futures_util::future::join_all; +/// +/// let mut handles = Vec::with_capacity(10); +/// let barrier = Arc::new(Barrier::new(10)); +/// for _ in 0..10 { +/// let c = barrier.clone(); +/// // The same messages will be printed together. +/// // You will NOT see any interleaving. +/// handles.push(async move { +/// println!("before wait"); +/// let wr = c.wait().await; +/// println!("after wait"); +/// wr +/// }); +/// } +/// // Will not resolve until all "before wait" messages have been printed +/// let wrs = join_all(handles).await; +/// // Exactly one barrier will resolve as the "leader" +/// assert_eq!(wrs.into_iter().filter(|wr| wr.is_leader()).count(), 1); +/// # } +/// ``` +#[derive(Debug)] +pub struct Barrier { + state: Mutex<BarrierState>, + wait: watch::Receiver<usize>, + n: usize, +} + +#[derive(Debug)] +struct BarrierState { + waker: watch::Sender<usize>, + arrived: usize, + generation: usize, +} + +impl Barrier { + /// Creates a new barrier that can block a given number of threads. + /// + /// A barrier will block `n`-1 threads which call [`Barrier::wait`] and then wake up all + /// threads at once when the `n`th thread calls `wait`. + pub fn new(mut n: usize) -> Barrier { + let (waker, wait) = crate::sync::watch::channel(0); + + if n == 0 { + // if n is 0, it's not clear what behavior the user wants. + // in std::sync::Barrier, an n of 0 exhibits the same behavior as n == 1, where every + // .wait() immediately unblocks, so we adopt that here as well. + n = 1; + } + + Barrier { + state: Mutex::new(BarrierState { + waker, + arrived: 0, + generation: 1, + }), + n, + wait, + } + } + + /// Does not resolve until all tasks have rendezvoused here. + /// + /// Barriers are re-usable after all threads have rendezvoused once, and can + /// be used continuously. + /// + /// A single (arbitrary) future will receive a [`BarrierWaitResult`] that returns `true` from + /// [`BarrierWaitResult::is_leader`] when returning from this function, and all other threads + /// will receive a result that will return `false` from `is_leader`. + pub async fn wait(&self) -> BarrierWaitResult { + // NOTE: we are taking a _synchronous_ lock here. + // It is okay to do so because the critical section is fast and never yields, so it cannot + // deadlock even if another future is concurrently holding the lock. + // It is _desireable_ to do so as synchronous Mutexes are, at least in theory, faster than + // the asynchronous counter-parts, so we should use them where possible [citation needed]. + // NOTE: the extra scope here is so that the compiler doesn't think `state` is held across + // a yield point, and thus marks the returned future as !Send. + let generation = { + let mut state = self.state.lock().unwrap(); + let generation = state.generation; + state.arrived += 1; + if state.arrived == self.n { + // we are the leader for this generation + // wake everyone, increment the generation, and return + state + .waker + .broadcast(state.generation) + .expect("there is at least one receiver"); + state.arrived = 0; + state.generation += 1; + return BarrierWaitResult(true); + } + + generation + }; + + // we're going to have to wait for the last of the generation to arrive + let mut wait = self.wait.clone(); + + loop { + // note that the first time through the loop, this _will_ yield a generation + // immediately, since we cloned a receiver that has never seen any values. + if wait.recv().await.expect("sender hasn't been closed") >= generation { + break; + } + } + + BarrierWaitResult(false) + } +} + +/// A `BarrierWaitResult` is returned by `wait` when all threads in the `Barrier` have rendezvoused. +#[derive(Debug, Clone)] +pub struct BarrierWaitResult(bool); + +impl BarrierWaitResult { + /// Returns true if this thread from wait is the "leader thread". + /// + /// Only one thread will have `true` returned from their result, all other threads will have + /// `false` returned. + pub fn is_leader(&self) -> bool { + self.0 + } +} diff --git a/tokio/src/sync/loom.rs b/tokio/src/sync/loom.rs new file mode 100644 index 00000000..1b5a5c9d --- /dev/null +++ b/tokio/src/sync/loom.rs @@ -0,0 +1,48 @@ +#[cfg(not(all(test, loom)))] +mod imp { + pub(crate) mod future { + pub(crate) use crate::sync::task::AtomicWaker; + } + + pub(crate) mod sync { + pub(crate) use std::sync::atomic; + pub(crate) use std::sync::Arc; + + use std::cell::UnsafeCell; + + pub(crate) struct CausalCell<T>(UnsafeCell<T>); + + impl<T> CausalCell<T> { + pub(crate) fn new(data: T) -> CausalCell<T> { + CausalCell(UnsafeCell::new(data)) + } + + pub(crate) fn with<F, R>(&self, f: F) -> R + where + F: FnOnce(*const T) -> R, + { + f(self.0.get()) + } + + pub(crate) fn with_mut<F, R>(&self, f: F) -> R + where + F: FnOnce(*mut T) -> R, + { + f(self.0.get()) + } + } + } + + pub(crate) mod thread { |