summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-29 15:11:31 -0700
committerGitHub <noreply@github.com>2019-10-29 15:11:31 -0700
commit2b909d6805990abf0bc2a5dea9e7267ff87df704 (patch)
treede255969c720c294af754b3840efabff3e6d69a0 /tokio
parentc62ef2d232dea1535a8e22484fa2ca083f03e903 (diff)
sync: move into `tokio` crate (#1705)
A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The sync implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags.
Diffstat (limited to 'tokio')
-rw-r--r--tokio/Cargo.toml9
-rw-r--r--tokio/benches/mpsc.rs270
-rw-r--r--tokio/benches/oneshot.rs120
-rw-r--r--tokio/benches/thread_pool.rs2
-rw-r--r--tokio/src/executor/blocking/mod.rs2
-rw-r--r--tokio/src/executor/thread_pool/shutdown.rs3
-rw-r--r--tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs3
-rw-r--r--tokio/src/net/unix/incoming.rs2
-rw-r--r--tokio/src/net/unix/listener.rs1
-rw-r--r--tokio/src/net/unix/mod.rs1
-rw-r--r--tokio/src/signal/registry.rs2
-rw-r--r--tokio/src/signal/unix.rs3
-rw-r--r--tokio/src/signal/windows.rs5
-rw-r--r--tokio/src/sync/barrier.rs135
-rw-r--r--tokio/src/sync/loom.rs48
-rw-r--r--tokio/src/sync/mod.rs (renamed from tokio/src/sync.rs)46
-rw-r--r--tokio/src/sync/mpsc/block.rs387
-rw-r--r--tokio/src/sync/mpsc/bounded.rs337
-rw-r--r--tokio/src/sync/mpsc/chan.rs451
-rw-r--r--tokio/src/sync/mpsc/list.rs348
-rw-r--r--tokio/src/sync/mpsc/mod.rs67
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs230
-rw-r--r--tokio/src/sync/mutex.rs149
-rw-r--r--tokio/src/sync/oneshot.rs576
-rw-r--r--tokio/src/sync/semaphore.rs1142
-rw-r--r--tokio/src/sync/task/atomic_waker.rs323
-rw-r--r--tokio/src/sync/task/mod.rs4
-rw-r--r--tokio/src/sync/tests/loom_atomic_waker.rs45
-rw-r--r--tokio/src/sync/tests/loom_list.rs52
-rw-r--r--tokio/src/sync/tests/loom_mpsc.rs23
-rw-r--r--tokio/src/sync/tests/loom_oneshot.rs109
-rw-r--r--tokio/src/sync/tests/loom_semaphore.rs151
-rw-r--r--tokio/src/sync/tests/mod.rs7
-rw-r--r--tokio/src/sync/watch.rs454
-rw-r--r--tokio/src/timer/timer/entry.rs3
-rw-r--r--tokio/tests/support/mock_pool.rs2
-rw-r--r--tokio/tests/sync_atomic_waker.rs38
-rw-r--r--tokio/tests/sync_barrier.rs94
-rw-r--r--tokio/tests/sync_errors.rs29
-rw-r--r--tokio/tests/sync_mpsc.rs451
-rw-r--r--tokio/tests/sync_mutex.rs80
-rw-r--r--tokio/tests/sync_oneshot.rs228
-rw-r--r--tokio/tests/sync_semaphore.rs153
-rw-r--r--tokio/tests/sync_watch.rs264
-rw-r--r--tokio/tests/thread_pool.rs2
-rw-r--r--tokio/tests/timer_timeout.rs7
46 files changed, 6826 insertions, 32 deletions
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml
index 569a3fe7..a85f0c7a 100644
--- a/tokio/Cargo.toml
+++ b/tokio/Cargo.toml
@@ -63,7 +63,7 @@ signal = [
"net-driver",
"signal-hook-registry"
]
-sync = ["tokio-sync"]
+sync = ["fnv"]
tcp = ["io", "net-driver"]
timer = ["crossbeam-utils", "slab"]
udp = ["io", "net-driver"]
@@ -81,6 +81,8 @@ process = [
]
[dependencies]
+tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" }
+
futures-core-preview = "=0.3.0-alpha.19"
futures-sink-preview = "=0.3.0-alpha.19"
futures-util-preview = { version = "=0.3.0-alpha.19", features = ["sink", "channel"] }
@@ -89,6 +91,7 @@ futures-util-preview = { version = "=0.3.0-alpha.19", features = ["sink", "chann
bytes = { version = "0.4", optional = true }
crossbeam-channel = { version = "0.3.8", optional = true }
crossbeam-utils = { version = "0.6.0", optional = true }
+fnv = { version = "1.0.6", optional = true }
iovec = { version = "0.1", optional = true }
lazy_static = { version = "1.0.2", optional = true }
memchr = { version = "2.2", optional = true }
@@ -97,8 +100,6 @@ num_cpus = { version = "1.8.0", optional = true }
pin-project = { version = "0.4", optional = true }
# Backs `DelayQueue`
slab = { version = "0.4.1", optional = true }
-tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" }
-tokio-sync = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-sync", features = ["async-traits"] }
[target.'cfg(unix)'.dependencies]
crossbeam-queue = { version = "0.1.2", optional = true }
@@ -124,7 +125,7 @@ flate2 = { version = "1", features = ["tokio"] }
http = "0.1"
httparse = "1.0"
libc = "0.2"
-loom = { version = "0.2.11", features = ["futures", "checkpoint"] }
+loom = { version = "0.2.12", features = ["futures", "checkpoint"] }
num_cpus = "1.0"
rand = "0.7.2"
serde = { version = "1.0", features = ["derive"] }
diff --git a/tokio/benches/mpsc.rs b/tokio/benches/mpsc.rs
new file mode 100644
index 00000000..0b97d55d
--- /dev/null
+++ b/tokio/benches/mpsc.rs
@@ -0,0 +1,270 @@
+#![feature(test)]
+#![warn(rust_2018_idioms)]
+
+extern crate test;
+
+use tokio::sync::mpsc::*;
+
+use futures::{future, Async, Future, Sink, Stream};
+use std::thread;
+use test::Bencher;
+
+type Medium = [usize; 64];
+type Large = [Medium; 64];
+
+#[bench]
+fn bounded_new_medium(b: &mut Bencher) {
+ b.iter(|| {
+ let _ = test::black_box(&channel::<Medium>(1_000));
+ })
+}
+
+#[bench]
+fn unbounded_new_medium(b: &mut Bencher) {
+ b.iter(|| {
+ let _ = test::black_box(&unbounded_channel::<Medium>());
+ })
+}
+#[bench]
+fn bounded_new_large(b: &mut Bencher) {
+ b.iter(|| {
+ let _ = test::black_box(&channel::<Large>(1_000));
+ })
+}
+
+#[bench]
+fn unbounded_new_large(b: &mut Bencher) {
+ b.iter(|| {
+ let _ = test::black_box(&unbounded_channel::<Large>());
+ })
+}
+
+#[bench]
+fn send_one_message(b: &mut Bencher) {
+ b.iter(|| {
+ let (mut tx, mut rx) = channel(1_000);
+
+ // Send
+ tx.try_send(1).unwrap();
+
+ // Receive
+ assert_eq!(Async::Ready(Some(1)), rx.poll().unwrap());
+ })
+}
+
+#[bench]
+fn send_one_message_large(b: &mut Bencher) {
+ b.iter(|| {
+ let (mut tx, mut rx) = channel::<Large>(1_000);
+
+ // Send
+ let _ = tx.try_send([[0; 64]; 64]);
+
+ // Receive
+ let _ = test::black_box(&rx.poll());
+ })
+}
+
+#[bench]
+fn bounded_rx_not_ready(b: &mut Bencher) {
+ let (_tx, mut rx) = channel::<i32>(1_000);
+ b.iter(|| {
+ future::lazy(|| {
+ assert!(rx.poll().unwrap().is_not_ready());
+
+ Ok::<_, ()>(())
+ })
+ .wait()
+ .unwrap();
+ })
+}
+
+#[bench]
+fn bounded_tx_poll_ready(b: &mut Bencher) {
+ let (mut tx, _rx) = channel::<i32>(1);
+ b.iter(|| {
+ future::lazy(|| {
+ assert!(tx.poll_ready().unwrap().is_ready());
+
+ Ok::<_, ()>(())
+ })
+ .wait()
+ .unwrap();
+ })
+}
+
+#[bench]
+fn bounded_tx_poll_not_ready(b: &mut Bencher) {
+ let (mut tx, _rx) = channel::<i32>(1);
+ tx.try_send(1).unwrap();
+ b.iter(|| {
+ future::lazy(|| {
+ assert!(tx.poll_ready().unwrap().is_not_ready());
+
+ Ok::<_, ()>(())
+ })
+ .wait()
+ .unwrap();
+ })
+}
+
+#[bench]
+fn unbounded_rx_not_ready(b: &mut Bencher) {
+ let (_tx, mut rx) = unbounded_channel::<i32>();
+ b.iter(|| {
+ future::lazy(|| {
+ assert!(rx.poll().unwrap().is_not_ready());
+
+ Ok::<_, ()>(())
+ })
+ .wait()
+ .unwrap();
+ })
+}
+
+#[bench]
+fn unbounded_rx_not_ready_x5(b: &mut Bencher) {
+ let (_tx, mut rx) = unbounded_channel::<i32>();
+ b.iter(|| {
+ future::lazy(|| {
+ assert!(rx.poll().unwrap().is_not_ready());
+ assert!(rx.poll().unwrap().is_not_ready());
+ assert!(rx.poll().unwrap().is_not_ready());
+ assert!(rx.poll().unwrap().is_not_ready());
+ assert!(rx.poll().unwrap().is_not_ready());
+
+ Ok::<_, ()>(())
+ })
+ .wait()
+ .unwrap();
+ })
+}
+
+#[bench]
+fn bounded_uncontended_1(b: &mut Bencher) {
+ b.iter(|| {
+ let (mut tx, mut rx) = channel(1_000);
+
+ for i in 0..1000 {
+ tx.try_send(i).unwrap();
+ // No need to create a task, because poll is not going to park.
+ assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap());
+ }
+ })
+}
+
+#[bench]
+fn bounded_uncontended_1_large(b: &mut Bencher) {
+ b.iter(|| {
+ let (mut tx, mut rx) = channel::<Large>(1_000);
+
+ for i in 0..1000 {
+ let _ = tx.try_send([[i; 64]; 64]);
+ // No need to create a task, because poll is not going to park.
+ let _ = test::black_box(&rx.poll());
+ }
+ })
+}
+
+#[bench]
+fn bounded_uncontended_2(b: &mut Bencher) {
+ b.iter(|| {
+ let (mut tx, mut rx) = channel(1000);
+
+ for i in 0..1000 {
+ tx.try_send(i).unwrap();
+ }
+
+ for i in 0..1000 {
+ // No need to create a task, because poll is not going to park.
+ assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap());
+ }
+ })
+}
+
+#[bench]
+fn contended_unbounded_tx(b: &mut Bencher) {
+ let mut threads = vec![];
+ let mut txs = vec![];
+
+ for _ in 0..4 {
+ let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
+ txs.push(tx);
+
+ threads.push(thread::spawn(move || {
+ for mut tx in rx.iter() {
+ for i in 0..1_000 {
+ tx.try_send(i).unwrap();
+ }
+ }
+ }));
+ }
+
+ b.iter(|| {
+ // TODO make unbounded
+ let (tx, rx) = channel::<i32>(1_000_000);
+
+ for th in &txs {
+ th.send(tx.clone()).unwrap();
+ }
+
+ drop(tx);
+
+ let rx = rx.wait().take(4 * 1_000);
+
+ for v in rx {
+ let _ = test::black_box(v);
+ }
+ });
+
+ drop(txs);
+
+ for th in threads {
+ th.join().unwrap();
+ }
+}
+
+#[bench]
+fn contended_bounded_tx(b: &mut Bencher) {
+ const THREADS: usize = 4;
+ const ITERS: usize = 100;
+
+ let mut threads = vec![];
+ let mut txs = vec![];
+
+ for _ in 0..THREADS {
+ let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
+ txs.push(tx);
+
+ threads.push(thread::spawn(move || {
+ for tx in rx.iter() {
+ let mut tx = tx.wait();
+ for i in 0..ITERS {
+ tx.send(i as i32).unwrap();
+ }
+ }
+ }));
+ }
+
+ b.iter(|| {
+ let (tx, rx) = channel::<i32>(1);
+
+ for th in &txs {
+ th.send(tx.clone()).unwrap();
+ }
+
+ drop(tx);
+
+ let rx = rx.wait().take(THREADS * ITERS);
+
+ for v in rx {
+ let _ = test::black_box(v);
+ }
+ });
+
+ drop(txs);
+
+ for th in threads {
+ th.join().unwrap();
+ }
+}
diff --git a/tokio/benches/oneshot.rs b/tokio/benches/oneshot.rs
new file mode 100644
index 00000000..a7f43c2f
--- /dev/null
+++ b/tokio/benches/oneshot.rs
@@ -0,0 +1,120 @@
+#![feature(test)]
+#![warn(rust_2018_idioms)]
+
+extern crate test;
+
+use tokio::sync::oneshot;
+
+use futures::{future, Async, Future};
+use test::Bencher;
+
+#[bench]
+fn new(b: &mut Bencher) {
+ b.iter(|| {
+ let _ = ::test::black_box(&oneshot::channel::<i32>());
+ })
+}
+
+#[bench]
+fn same_thread_send_recv(b: &mut Bencher) {
+ b.iter(|| {
+ let (tx, mut rx) = oneshot::channel();
+
+ let _ = tx.send(1);
+
+ assert_eq!(Async::Ready(1), rx.poll().unwrap());
+ });
+}
+
+#[bench]
+fn same_thread_recv_multi_send_recv(b: &mut Bencher) {
+ b.iter(|| {
+ let (tx, mut rx) = oneshot::channel();
+
+ future::lazy(|| {
+ let _ = rx.poll();
+ let _ = rx.poll();
+ let _ = rx.poll();
+ let _ = rx.poll();
+
+ let _ = tx.send(1);
+ assert_eq!(Async::Ready(1), rx.poll().unwrap());
+
+ Ok::<_, ()>(())
+ })
+ .wait()
+ .unwrap();
+ });
+}
+
+#[bench]
+fn multi_thread_send_recv(b: &mut Bencher) {
+ const MAX: usize = 10_000_000;
+
+ use std::thread;
+
+ fn spin<F: Future>(mut f: F) -> Result<F::Item, F::Error> {
+ use futures::Async::Ready;
+ loop {
+ match f.poll() {
+ Ok(Ready(v)) => return Ok(v),
+ Ok(_) => {}
+ Err(e) => return Err(e),
+ }
+ }
+ }
+
+ let mut ping_txs = vec![];
+ let mut ping_rxs = vec![];
+ let mut pong_txs = vec![];
+ let mut pong_rxs = vec![];
+
+ for _ in 0..MAX {
+ let (tx, rx) = oneshot::channel::<()>();
+
+ ping_txs.push(Some(tx));
+ ping_rxs.push(Some(rx));
+
+ let (tx, rx) = oneshot::channel::<()>();
+
+ pong_txs.push(Some(tx));
+ pong_rxs.push(Some(rx));
+ }
+
+ thread::spawn(move || {
+ future::lazy(|| {
+ for i in 0..MAX {
+ let ping_rx = ping_rxs[i].take().unwrap();
+ let pong_tx = pong_txs[i].take().unwrap();
+
+ if spin(ping_rx).is_err() {
+ return Ok(());
+ }
+
+ pong_tx.send(()).unwrap();
+ }
+
+ Ok::<(), ()>(())
+ })
+ .wait()
+ .unwrap();
+ });
+
+ future::lazy(|| {
+ let mut i = 0;
+
+ b.iter(|| {
+ let ping_tx = ping_txs[i].take().unwrap();
+ let pong_rx = pong_rxs[i].take().unwrap();
+
+ ping_tx.send(()).unwrap();
+ spin(pong_rx).unwrap();
+
+ i += 1;
+ });
+
+ Ok::<(), ()>(())
+ })
+ .wait()
+ .unwrap();
+}
diff --git a/tokio/benches/thread_pool.rs b/tokio/benches/thread_pool.rs
index 3e1462f3..97b25f18 100644
--- a/tokio/benches/thread_pool.rs
+++ b/tokio/benches/thread_pool.rs
@@ -3,7 +3,7 @@
extern crate test;
use tokio::executor::thread_pool::{Builder, Spawner};
-use tokio_sync::oneshot;
+use tokio::sync::oneshot;
use std::future::Future;
use std::pin::Pin;
diff --git a/tokio/src/executor/blocking/mod.rs b/tokio/src/executor/blocking/mod.rs
index 16faa03e..2ad573d8 100644
--- a/tokio/src/executor/blocking/mod.rs
+++ b/tokio/src/executor/blocking/mod.rs
@@ -3,7 +3,7 @@
use crate::executor::loom::sync::{Arc, Condvar, Mutex};
use crate::executor::loom::thread;
#[cfg(feature = "blocking")]
-use tokio_sync::oneshot;
+use crate::sync::oneshot;
use std::cell::Cell;
use std::collections::VecDeque;
diff --git a/tokio/src/executor/thread_pool/shutdown.rs b/tokio/src/executor/thread_pool/shutdown.rs
index 40d8f04a..b7c4177f 100644
--- a/tokio/src/executor/thread_pool/shutdown.rs
+++ b/tokio/src/executor/thread_pool/shutdown.rs
@@ -4,8 +4,7 @@
//! dropped, the `Receiver` receives a notification.
use crate::executor::loom::sync::Arc;
-
-use tokio_sync::oneshot;
+use crate::sync::oneshot;
#[derive(Debug, Clone)]
pub(super) struct Sender {
diff --git a/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs b/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs
index 34a07ea8..9cd99a86 100644
--- a/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs
+++ b/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs
@@ -3,8 +3,7 @@ use crate::loom::{
atomic::{AtomicUsize, Ordering},
CausalCell,
};
-
-use tokio_sync::AtomicWaker;
+use crate::sync::AtomicWaker;
#[derive(Debug)]
pub(crate) struct ScheduledIo {
diff --git a/tokio/src/net/unix/incoming.rs b/tokio/src/net/unix/incoming.rs
index 542b5e1d..a66f21da 100644
--- a/tokio/src/net/unix/incoming.rs
+++ b/tokio/src/net/unix/incoming.rs
@@ -1,5 +1,3 @@
-#![cfg(feature = "async-traits")]
-
use super::{UnixListener, UnixStream};
use futures_core::ready;
diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs
index 3a36dc90..3cf8eff3 100644
--- a/tokio/src/net/unix/listener.rs
+++ b/tokio/src/net/unix/listener.rs
@@ -90,7 +90,6 @@ impl UnixListener {
///
/// This method returns an implementation of the `Stream` trait which
/// resolves to the sockets the are accepted on this listener.
- #[cfg(feature = "async-traits")]
pub fn incoming(self) -> super::Incoming {
super::Incoming::new(self)
}
diff --git a/tokio/src/net/unix/mod.rs b/tokio/src/net/unix/mod.rs
index 4447ca5c..977e3a0f 100644
--- a/tokio/src/net/unix/mod.rs
+++ b/tokio/src/net/unix/mod.rs
@@ -6,7 +6,6 @@ mod datagram;
pub use self::datagram::UnixDatagram;
mod incoming;
-#[cfg(feature = "async-traits")]
pub use self::incoming::Incoming;
mod listener;
diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs
index 7d2fd7a9..e70d0495 100644
--- a/tokio/src/signal/registry.rs
+++ b/tokio/src/signal/registry.rs
@@ -1,6 +1,6 @@
use crate::signal::os::{OsExtraData, OsStorage};
-use tokio_sync::mpsc::Sender;
+use crate::sync::mpsc::Sender;
use lazy_static::lazy_static;
use std::ops;
diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs
index 075e788c..87871503 100644
--- a/tokio/src/signal/unix.rs
+++ b/tokio/src/signal/unix.rs
@@ -8,8 +8,7 @@
use crate::io::AsyncRead;
use crate::net::util::PollEvented;
use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage};
-
-use tokio_sync::mpsc::{channel, Receiver};
+use crate::sync::mpsc::{channel, Receiver};
use futures_core::stream::Stream;
use libc::c_int;
diff --git a/tokio/src/signal/windows.rs b/tokio/src/signal/windows.rs
index abde334b..1e68d628 100644
--- a/tokio/src/signal/windows.rs
+++ b/tokio/src/signal/windows.rs
@@ -7,9 +7,8 @@
#![cfg(windows)]
-use super::registry::{globals, EventId, EventInfo, Init, Storage};
-
-use tokio_sync::mpsc::{channel, Receiver};
+use crate::signal::registry::{globals, EventId, EventInfo, Init, Storage};
+use crate::sync::mpsc::{channel, Receiver};
use futures_core::stream::Stream;
use std::convert::TryFrom;
diff --git a/tokio/src/sync/barrier.rs b/tokio/src/sync/barrier.rs
new file mode 100644
index 00000000..1582120e
--- /dev/null
+++ b/tokio/src/sync/barrier.rs
@@ -0,0 +1,135 @@
+use crate::sync::watch;
+
+use std::sync::Mutex;
+
+/// A barrier enables multiple threads to synchronize the beginning of some computation.
+///
+/// ```
+/// # #[tokio::main]
+/// # async fn main() {
+/// use tokio::sync::Barrier;
+/// use std::sync::Arc;
+/// use futures_util::future::join_all;
+///
+/// let mut handles = Vec::with_capacity(10);
+/// let barrier = Arc::new(Barrier::new(10));
+/// for _ in 0..10 {
+/// let c = barrier.clone();
+/// // The same messages will be printed together.
+/// // You will NOT see any interleaving.
+/// handles.push(async move {
+/// println!("before wait");
+/// let wr = c.wait().await;
+/// println!("after wait");
+/// wr
+/// });
+/// }
+/// // Will not resolve until all "before wait" messages have been printed
+/// let wrs = join_all(handles).await;
+/// // Exactly one barrier will resolve as the "leader"
+/// assert_eq!(wrs.into_iter().filter(|wr| wr.is_leader()).count(), 1);
+/// # }
+/// ```
+#[derive(Debug)]
+pub struct Barrier {
+ state: Mutex<BarrierState>,
+ wait: watch::Receiver<usize>,
+ n: usize,
+}
+
+#[derive(Debug)]
+struct BarrierState {
+ waker: watch::Sender<usize>,
+ arrived: usize,
+ generation: usize,
+}
+
+impl Barrier {
+ /// Creates a new barrier that can block a given number of threads.
+ ///
+ /// A barrier will block `n`-1 threads which call [`Barrier::wait`] and then wake up all
+ /// threads at once when the `n`th thread calls `wait`.
+ pub fn new(mut n: usize) -> Barrier {
+ let (waker, wait) = crate::sync::watch::channel(0);
+
+ if n == 0 {
+ // if n is 0, it's not clear what behavior the user wants.
+ // in std::sync::Barrier, an n of 0 exhibits the same behavior as n == 1, where every
+ // .wait() immediately unblocks, so we adopt that here as well.
+ n = 1;
+ }
+
+ Barrier {
+ state: Mutex::new(BarrierState {
+ waker,
+ arrived: 0,
+ generation: 1,
+ }),
+ n,
+ wait,
+ }
+ }
+
+ /// Does not resolve until all tasks have rendezvoused here.
+ ///
+ /// Barriers are re-usable after all threads have rendezvoused once, and can
+ /// be used continuously.
+ ///
+ /// A single (arbitrary) future will receive a [`BarrierWaitResult`] that returns `true` from
+ /// [`BarrierWaitResult::is_leader`] when returning from this function, and all other threads
+ /// will receive a result that will return `false` from `is_leader`.
+ pub async fn wait(&self) -> BarrierWaitResult {
+ // NOTE: we are taking a _synchronous_ lock here.
+ // It is okay to do so because the critical section is fast and never yields, so it cannot
+ // deadlock even if another future is concurrently holding the lock.
+ // It is _desireable_ to do so as synchronous Mutexes are, at least in theory, faster than
+ // the asynchronous counter-parts, so we should use them where possible [citation needed].
+ // NOTE: the extra scope here is so that the compiler doesn't think `state` is held across
+ // a yield point, and thus marks the returned future as !Send.
+ let generation = {
+ let mut state = self.state.lock().unwrap();
+ let generation = state.generation;
+ state.arrived += 1;
+ if state.arrived == self.n {
+ // we are the leader for this generation
+ // wake everyone, increment the generation, and return
+ state
+ .waker
+ .broadcast(state.generation)
+ .expect("there is at least one receiver");
+ state.arrived = 0;
+ state.generation += 1;
+ return BarrierWaitResult(true);
+ }
+
+ generation
+ };
+
+ // we're going to have to wait for the last of the generation to arrive
+ let mut wait = self.wait.clone();
+
+ loop {
+ // note that the first time through the loop, this _will_ yield a generation
+ // immediately, since we cloned a receiver that has never seen any values.
+ if wait.recv().await.expect("sender hasn't been closed") >= generation {
+ break;
+ }
+ }
+
+ BarrierWaitResult(false)
+ }
+}
+
+/// A `BarrierWaitResult` is returned by `wait` when all threads in the `Barrier` have rendezvoused.
+#[derive(Debug, Clone)]
+pub struct BarrierWaitResult(bool);
+
+impl BarrierWaitResult {
+ /// Returns true if this thread from wait is the "leader thread".
+ ///
+ /// Only one thread will have `true` returned from their result, all other threads will have
+ /// `false` returned.
+ pub fn is_leader(&self) -> bool {
+ self.0
+ }
+}
diff --git a/tokio/src/sync/loom.rs b/tokio/src/sync/loom.rs
new file mode 100644
index 00000000..1b5a5c9d
--- /dev/null
+++ b/tokio/src/sync/loom.rs
@@ -0,0 +1,48 @@
+#[cfg(not(all(test, loom)))]
+mod imp {
+ pub(crate) mod future {
+ pub(crate) use crate::sync::task::AtomicWaker;
+ }
+
+ pub(crate) mod sync {
+ pub(crate) use std::sync::atomic;
+ pub(crate) use std::sync::Arc;
+
+ use std::cell::UnsafeCell;
+
+ pub(crate) struct CausalCell<T>(UnsafeCell<T>);
+
+ impl<T> CausalCell<T> {
+ pub(crate) fn new(data: T) -> CausalCell<T> {
+ CausalCell(UnsafeCell::new(data))
+ }
+
+ pub(crate) fn with<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(*const T) -> R,
+ {
+ f(self.0.get())
+ }
+
+ pub(crate) fn with_mut<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(*mut T) -> R,
+ {
+ f(self.0.get())
+ }
+ }
+ }
+
+ pub(crate) mod thread {