From 2b909d6805990abf0bc2a5dea9e7267ff87df704 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 29 Oct 2019 15:11:31 -0700 Subject: sync: move into `tokio` crate (#1705) A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The sync implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags. --- Cargo.toml | 1 - azure-pipelines.yml | 2 - ci/patch.toml | 2 +- tokio-sync/CHANGELOG.md | 76 -- tokio-sync/Cargo.toml | 39 - tokio-sync/LICENSE | 25 - tokio-sync/README.md | 13 - tokio-sync/benches/mpsc.rs | 536 --------- tokio-sync/benches/oneshot.rs | 239 ---- tokio-sync/src/barrier.rs | 134 --- tokio-sync/src/lib.rs | 43 - tokio-sync/src/loom.rs | 38 - tokio-sync/src/mpsc/block.rs | 386 ------- tokio-sync/src/mpsc/bounded.rs | 340 ------ tokio-sync/src/mpsc/chan.rs | 450 -------- tokio-sync/src/mpsc/list.rs | 347 ------ tokio-sync/src/mpsc/mod.rs | 61 -- tokio-sync/src/mpsc/unbounded.rs | 233 ---- tokio-sync/src/mutex.rs | 148 --- tokio-sync/src/oneshot.rs | 576 ---------- tokio-sync/src/semaphore.rs | 1142 -------------------- tokio-sync/src/task/atomic_waker.rs | 323 ------ tokio-sync/src/task/mod.rs | 5 - tokio-sync/src/watch.rs | 459 -------- tokio-sync/tests/atomic_waker.rs | 37 - tokio-sync/tests/barrier.rs | 93 -- tokio-sync/tests/errors.rs | 29 - tokio-sync/tests/fuzz_atomic_waker.rs | 53 - tokio-sync/tests/fuzz_list.rs | 71 -- tokio-sync/tests/fuzz_mpsc.rs | 40 - tokio-sync/tests/fuzz_oneshot.rs | 115 -- tokio-sync/tests/fuzz_semaphore.rs | 160 --- tokio-sync/tests/mpsc.rs | 453 -------- tokio-sync/tests/mutex.rs | 79 -- tokio-sync/tests/oneshot.rs | 228 ---- tokio-sync/tests/semaphore.rs | 153 --- tokio-sync/tests/watch.rs | 287 ----- tokio-test/Cargo.toml | 1 - tokio/Cargo.toml | 9 +- tokio/benches/mpsc.rs | 270 +++++ tokio/benches/oneshot.rs | 120 ++ tokio/benches/thread_pool.rs | 2 +- tokio/src/executor/blocking/mod.rs | 2 +- tokio/src/executor/thread_pool/shutdown.rs | 3 +- .../driver/reactor/dispatch/page/scheduled_io.rs | 3 +- tokio/src/net/unix/incoming.rs | 2 - tokio/src/net/unix/listener.rs | 1 - tokio/src/net/unix/mod.rs | 1 - tokio/src/signal/registry.rs | 2 +- tokio/src/signal/unix.rs | 3 +- tokio/src/signal/windows.rs | 5 +- tokio/src/sync.rs | 18 - tokio/src/sync/barrier.rs | 135 +++ tokio/src/sync/loom.rs | 48 + tokio/src/sync/mod.rs | 58 + tokio/src/sync/mpsc/block.rs | 387 +++++++ tokio/src/sync/mpsc/bounded.rs | 337 ++++++ tokio/src/sync/mpsc/chan.rs | 451 ++++++++ tokio/src/sync/mpsc/list.rs | 348 ++++++ tokio/src/sync/mpsc/mod.rs | 67 ++ tokio/src/sync/mpsc/unbounded.rs | 230 ++++ tokio/src/sync/mutex.rs | 149 +++ tokio/src/sync/oneshot.rs | 576 ++++++++++ tokio/src/sync/semaphore.rs | 1142 ++++++++++++++++++++ tokio/src/sync/task/atomic_waker.rs | 323 ++++++ tokio/src/sync/task/mod.rs | 4 + tokio/src/sync/tests/loom_atomic_waker.rs | 45 + tokio/src/sync/tests/loom_list.rs | 52 + tokio/src/sync/tests/loom_mpsc.rs | 23 + tokio/src/sync/tests/loom_oneshot.rs | 109 ++ tokio/src/sync/tests/loom_semaphore.rs | 151 +++ tokio/src/sync/tests/mod.rs | 7 + tokio/src/sync/watch.rs | 454 ++++++++ tokio/src/timer/timer/entry.rs | 3 +- tokio/tests/support/mock_pool.rs | 2 +- tokio/tests/sync_atomic_waker.rs | 38 + tokio/tests/sync_barrier.rs | 94 ++ tokio/tests/sync_errors.rs | 29 + tokio/tests/sync_mpsc.rs | 451 ++++++++ tokio/tests/sync_mutex.rs | 80 ++ tokio/tests/sync_oneshot.rs | 228 ++++ tokio/tests/sync_semaphore.rs | 153 +++ tokio/tests/sync_watch.rs | 264 +++++ tokio/tests/thread_pool.rs | 2 +- tokio/tests/timer_timeout.rs | 7 +- 85 files changed, 6842 insertions(+), 7463 deletions(-) delete mode 100644 tokio-sync/CHANGELOG.md delete mode 100644 tokio-sync/Cargo.toml delete mode 100644 tokio-sync/LICENSE delete mode 100644 tokio-sync/README.md delete mode 100644 tokio-sync/benches/mpsc.rs delete mode 100644 tokio-sync/benches/oneshot.rs delete mode 100644 tokio-sync/src/barrier.rs delete mode 100644 tokio-sync/src/lib.rs delete mode 100644 tokio-sync/src/loom.rs delete mode 100644 tokio-sync/src/mpsc/block.rs delete mode 100644 tokio-sync/src/mpsc/bounded.rs delete mode 100644 tokio-sync/src/mpsc/chan.rs delete mode 100644 tokio-sync/src/mpsc/list.rs delete mode 100644 tokio-sync/src/mpsc/mod.rs delete mode 100644 tokio-sync/src/mpsc/unbounded.rs delete mode 100644 tokio-sync/src/mutex.rs delete mode 100644 tokio-sync/src/oneshot.rs delete mode 100644 tokio-sync/src/semaphore.rs delete mode 100644 tokio-sync/src/task/atomic_waker.rs delete mode 100644 tokio-sync/src/task/mod.rs delete mode 100644 tokio-sync/src/watch.rs delete mode 100644 tokio-sync/tests/atomic_waker.rs delete mode 100644 tokio-sync/tests/barrier.rs delete mode 100644 tokio-sync/tests/errors.rs delete mode 100644 tokio-sync/tests/fuzz_atomic_waker.rs delete mode 100644 tokio-sync/tests/fuzz_list.rs delete mode 100644 tokio-sync/tests/fuzz_mpsc.rs delete mode 100644 tokio-sync/tests/fuzz_oneshot.rs delete mode 100644 tokio-sync/tests/fuzz_semaphore.rs delete mode 100644 tokio-sync/tests/mpsc.rs delete mode 100644 tokio-sync/tests/mutex.rs delete mode 100644 tokio-sync/tests/oneshot.rs delete mode 100644 tokio-sync/tests/semaphore.rs delete mode 100644 tokio-sync/tests/watch.rs create mode 100644 tokio/benches/mpsc.rs create mode 100644 tokio/benches/oneshot.rs delete mode 100644 tokio/src/sync.rs create mode 100644 tokio/src/sync/barrier.rs create mode 100644 tokio/src/sync/loom.rs create mode 100644 tokio/src/sync/mod.rs create mode 100644 tokio/src/sync/mpsc/block.rs create mode 100644 tokio/src/sync/mpsc/bounded.rs create mode 100644 tokio/src/sync/mpsc/chan.rs create mode 100644 tokio/src/sync/mpsc/list.rs create mode 100644 tokio/src/sync/mpsc/mod.rs create mode 100644 tokio/src/sync/mpsc/unbounded.rs create mode 100644 tokio/src/sync/mutex.rs create mode 100644 tokio/src/sync/oneshot.rs create mode 100644 tokio/src/sync/semaphore.rs create mode 100644 tokio/src/sync/task/atomic_waker.rs create mode 100644 tokio/src/sync/task/mod.rs create mode 100644 tokio/src/sync/tests/loom_atomic_waker.rs create mode 100644 tokio/src/sync/tests/loom_list.rs create mode 100644 tokio/src/sync/tests/loom_mpsc.rs create mode 100644 tokio/src/sync/tests/loom_oneshot.rs create mode 100644 tokio/src/sync/tests/loom_semaphore.rs create mode 100644 tokio/src/sync/tests/mod.rs create mode 100644 tokio/src/sync/watch.rs create mode 100644 tokio/tests/sync_atomic_waker.rs create mode 100644 tokio/tests/sync_barrier.rs create mode 100644 tokio/tests/sync_errors.rs create mode 100644 tokio/tests/sync_mpsc.rs create mode 100644 tokio/tests/sync_mutex.rs create mode 100644 tokio/tests/sync_oneshot.rs create mode 100644 tokio/tests/sync_semaphore.rs create mode 100644 tokio/tests/sync_watch.rs diff --git a/Cargo.toml b/Cargo.toml index 4a9e88a0..4a2a71ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,6 @@ members = [ "tokio", "tokio-macros", - "tokio-sync", "tokio-test", "tokio-tls", "tokio-util", diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 4fb21761..ae2bd1bb 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -47,8 +47,6 @@ jobs: displayName: Test sub crates - rust: beta crates: - tokio-sync: - - async-traits tokio-macros: [] tokio-test: [] tokio-util: [] diff --git a/ci/patch.toml b/ci/patch.toml index 333c4dff..22311cf9 100644 --- a/ci/patch.toml +++ b/ci/patch.toml @@ -3,6 +3,6 @@ [patch.crates-io] tokio = { path = "tokio" } tokio-macros = { path = "tokio-macros" } -tokio-sync = { path = "tokio-sync" } +tokio-test = { path = "tokio-test" } tokio-tls = { path = "tokio-tls" } tokio-util = { path = "tokio-util" } diff --git a/tokio-sync/CHANGELOG.md b/tokio-sync/CHANGELOG.md deleted file mode 100644 index 44033503..00000000 --- a/tokio-sync/CHANGELOG.md +++ /dev/null @@ -1,76 +0,0 @@ -# 0.2.0-alpha.6 (September 30, 2019) - -- Move to `futures-*-preview 0.3.0-alpha.19` -- Move to `pin-project 0.4` - -# 0.2.0-alpha.5 (September 19, 2019) - -### Changed -- rename `Lock` -> `Mutex` and make it more like `std::sync::Mutex` (#1573). - -### Added -- `Barrier`, an async version of `std::sync::Barrier` (#1571). - -# 0.2.0-alpha.4 (August 29, 2019) - -- Track tokio release. - -# 0.2.0-alpha.3 (August 23, 2019) - -- Track `tokio` version number - -# 0.2.0-alpha.2 (August 17, 2019) - -### Changed -- Update `futures` dependency to 0.3.0-alpha.18. - -# 0.2.0-alpha.1 (August 8, 2019) - -### Changed -- Switch to `async`, `await`, and `std::future`. - -# 0.1.6 (June 4, 2019) - -### Added -- Add Sync impl for Lock (#1117). - -# 0.1.5 (April 22, 2019) - -### Added -- Add asynchronous mutual exclusion primitive (#964). - -# 0.1.4 (March 13, 2019) - -### Fixed -- Fix memory leak on channel drop (#917). - -### Added -- `std::error::Error` implementation for `oneshot`, `watch` error types (#967). - -# 0.1.3 (March 1, 2019) - -### Added -- `Watch`, a single value broadcast channel (#922). -- `std::error::Error` implementation for more `mpsc` types (#937). - -# 0.1.2 (February 20, 2019) - -### Fixes -- `mpsc` and `Semaphore` when releasing permits (#904). -- `oneshot` task handle leak (#911). - -### Changes -- Performance improvements in `AtomicTask` (#892). -- Improved assert message when creating a channel with bound of zero (#906). - -### Adds -- `AtomicTask::take_task` (#895). - -# 0.1.1 (February 1, 2019) - -### Fixes -- Panic when creating a channel with bound 0 (#879). - -# 0.1.0 (January 24, 2019) - -- Initial Release diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml deleted file mode 100644 index bb2fd607..00000000 --- a/tokio-sync/Cargo.toml +++ /dev/null @@ -1,39 +0,0 @@ -[package] -name = "tokio-sync" -# When releasing to crates.io: -# - Remove path dependencies -# - Update html_root_url. -# - Update doc url -# - Cargo.toml -# - Update CHANGELOG.md. -# - Create "v0.2.x" git tag. -version = "0.2.0-alpha.6" -edition = "2018" -authors = ["Tokio Contributors "] -license = "MIT" -repository = "https://github.com/tokio-rs/tokio" -homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-sync/0.2.0-alpha.6/tokio_sync" -description = """ -Synchronization utilities. -""" -categories = ["asynchronous"] - -[features] -async-traits = ["futures-sink-preview"] - -[dependencies] -fnv = "1.0.6" -futures-core-preview = { version = "=0.3.0-alpha.19" } -futures-sink-preview = { version = "=0.3.0-alpha.19", optional = true } -futures-util-preview = { version = "=0.3.0-alpha.19" } - -[dev-dependencies] -tokio = { version = "0.2.0-alpha.6", path = "../tokio" } -tokio-test = { version = "0.2.0-alpha.6", path = "../tokio-test" } - -env_logger = { version = "0.6", default-features = false } -loom = { version = "0.2.1", features = ["futures"] } - -[package.metadata.docs.rs] -all-features = true diff --git a/tokio-sync/LICENSE b/tokio-sync/LICENSE deleted file mode 100644 index cdb28b4b..00000000 --- a/tokio-sync/LICENSE +++ /dev/null @@ -1,25 +0,0 @@ -Copyright (c) 2019 Tokio Contributors - -Permission is hereby granted, free of charge, to any -person obtaining a copy of this software and associated -documentation files (the "Software"), to deal in the -Software without restriction, including without -limitation the rights to use, copy, modify, merge, -publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software -is furnished to do so, subject to the following -conditions: - -The above copyright notice and this permission notice -shall be included in all copies or substantial portions -of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. diff --git a/tokio-sync/README.md b/tokio-sync/README.md deleted file mode 100644 index 6e209f69..00000000 --- a/tokio-sync/README.md +++ /dev/null @@ -1,13 +0,0 @@ -# tokio-sync - -Synchronization utilities - -## License - -This project is licensed under the [MIT license](LICENSE). - -### Contribution - -Unless you explicitly state otherwise, any contribution intentionally submitted -for inclusion in Tokio by you, shall be licensed as MIT, without any additional -terms or conditions. diff --git a/tokio-sync/benches/mpsc.rs b/tokio-sync/benches/mpsc.rs deleted file mode 100644 index 95d8e5c7..00000000 --- a/tokio-sync/benches/mpsc.rs +++ /dev/null @@ -1,536 +0,0 @@ -#![cfg(feature = "broken")] -#![feature(test)] -#![warn(rust_2018_idioms)] - -extern crate test; - -type Medium = [usize; 64]; -type Large = [Medium; 64]; - -mod tokio { - use futures::{future, Async, Future, Sink, Stream}; - use std::thread; - use test::{self, Bencher}; - use tokio_sync::mpsc::*; - - #[bench] - fn bounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::(1_000)); - }) - } - - #[bench] - fn unbounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded_channel::()); - }) - } - #[bench] - fn bounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::(1_000)); - }) - } - - #[bench] - fn unbounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded_channel::()); - }) - } - - #[bench] - fn send_one_message(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1_000); - - // Send - tx.try_send(1).unwrap(); - - // Receive - assert_eq!(Async::Ready(Some(1)), rx.poll().unwrap()); - }) - } - - #[bench] - fn send_one_message_large(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel::(1_000); - - // Send - let _ = tx.try_send([[0; 64]; 64]); - - // Receive - let _ = test::black_box(&rx.poll()); - }) - } - - #[bench] - fn bounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = channel::(1_000); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::(1); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_not_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::(1); - tx.try_send(1).unwrap(); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = unbounded_channel::(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready_x5(b: &mut Bencher) { - let (_tx, mut rx) = unbounded_channel::(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_uncontended_1(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1_000); - - for i in 0..1000 { - tx.try_send(i).unwrap(); - // No need to create a task, because poll is not going to park. - assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap()); - } - }) - } - - #[bench] - fn bounded_uncontended_1_large(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel::(1_000); - - for i in 0..1000 { - let _ = tx.try_send([[i; 64]; 64]); - // No need to create a task, because poll is not going to park. - let _ = test::black_box(&rx.poll()); - } - }) - } - - #[bench] - fn bounded_uncontended_2(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1000); - - for i in 0..1000 { - tx.try_send(i).unwrap(); - } - - for i in 0..1000 { - // No need to create a task, because poll is not going to park. - assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap()); - } - }) - } - - #[bench] - fn contended_unbounded_tx(b: &mut Bencher) { - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..4 { - let (tx, rx) = ::std::sync::mpsc::channel::>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for mut tx in rx.iter() { - for i in 0..1_000 { - tx.try_send(i).unwrap(); - } - } - })); - } - - b.iter(|| { - // TODO make unbounded - let (tx, rx) = channel::(1_000_000); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(4 * 1_000); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } - } - - #[bench] - fn contended_bounded_tx(b: &mut Bencher) { - const THREADS: usize = 4; - const ITERS: usize = 100; - - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..THREADS { - let (tx, rx) = ::std::sync::mpsc::channel::>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for tx in rx.iter() { - let mut tx = tx.wait(); - for i in 0..ITERS { - tx.send(i as i32).unwrap(); - } - } - })); - } - - b.iter(|| { - let (tx, rx) = channel::(1); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(THREADS * ITERS); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } - } -} - -mod legacy { - use futures::sync::mpsc::*; - use futures::{future, Async, Future, Sink, Stream}; - use std::thread; - use test::{self, Bencher}; - - #[bench] - fn bounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::(1_000)); - }) - } - - #[bench] - fn unbounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded::()); - }) - } - - #[bench] - fn bounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::(1_000)); - }) - } - - #[bench] - fn unbounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded::()); - }) - } - - #[bench] - fn send_one_message(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1_000); - - // Send - tx.try_send(1).unwrap(); - - // Receive - assert_eq!(Ok(Async::Ready(Some(1))), rx.poll()); - }) - } - - #[bench] - fn send_one_message_large(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel::(1_000); - - // Send - let _ = tx.try_send([[0; 64]; 64]); - - // Receive - let _ = test::black_box(&rx.poll()); - }) - } - - #[bench] - fn bounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = channel::(1_000); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::(0); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_not_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::(0); - tx.try_send(1).unwrap(); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = unbounded::(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready_x5(b: &mut Bencher) { - let (_tx, mut rx) = unbounded::(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_uncontended_1(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = unbounded(); - - for i in 0..1000 { - UnboundedSender::unbounded_send(&tx, i).expect("send"); - // No need to create a task, because poll is not going to park. - assert_eq!(Ok(Async::Ready(Some(i))), rx.poll()); - } - }) - } - - #[bench] - fn unbounded_uncontended_1_large(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = unbounded::(); - - for i in 0..1000 { - let _ = UnboundedSender::unbounded_send(&tx, [[i; 64]; 64]); - // No need to create a task, because poll is not going to park. - let _ = test::black_box(&rx.poll()); - } - }) - } - - #[bench] - fn unbounded_uncontended_2(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = unbounded(); - - for i in 0..1000 { - UnboundedSender::unbounded_send(&tx, i).expect("send"); - } - - for i in 0..1000 { - // No need to create a task, because poll is not going to park. - assert_eq!(Ok(Async::Ready(Some(i))), rx.poll()); - } - }) - } - - #[bench] - fn multi_thread_unbounded_tx(b: &mut Bencher) { - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..4 { - let (tx, rx) = ::std::sync::mpsc::channel::>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for mut tx in rx.iter() { - for i in 0..1_000 { - tx.try_send(i).unwrap(); - } - } - })); - } - - b.iter(|| { - let (tx, rx) = channel::(1_000_000); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(4 * 1_000); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } - } - - #[bench] - fn contended_bounded_tx(b: &mut Bencher) { - const THREADS: usize = 4; - const ITERS: usize = 100; - - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..THREADS { - let (tx, rx) = ::std::sync::mpsc::channel::>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for tx in rx.iter() { - let mut tx = tx.wait(); - for i in 0..ITERS { - tx.send(i as i32).unwrap(); - } - } - })); - } - - b.iter(|| { - let (tx, rx) = channel::(1); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(THREADS * ITERS); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } - } -} diff --git a/tokio-sync/benches/oneshot.rs b/tokio-sync/benches/oneshot.rs deleted file mode 100644 index b2f37805..00000000 --- a/tokio-sync/benches/oneshot.rs +++ /dev/null @@ -1,239 +0,0 @@ -#![cfg(feature = "broken")] -#![feature(test)] -#![warn(rust_2018_idioms)] - -extern crate test; - -mod tokio { - use futures::{future, Async, Future}; - use test::Bencher; - use tokio_sync::oneshot; - - #[bench] - fn new(b: &mut Bencher) { - b.iter(|| { - let _ = ::test::black_box(&oneshot::channel::()); - }) - } - - #[bench] - fn same_thread_send_recv(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = oneshot::channel(); - - let _ = tx.send(1); - - assert_eq!(Async::Ready(1), rx.poll().unwrap()); - }); - } - - #[bench] - fn same_thread_recv_multi_send_recv(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = oneshot::channel(); - - future::lazy(|| { - let _ = rx.poll(); - let _ = rx.poll(); - let _ = rx.poll(); - let _ = rx.poll(); - - let _ = tx.send(1); - assert_eq!(Async::Ready(1), rx.poll().unwrap()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }); - } - - #[bench] - fn multi_thread_send_recv(b: &mut Bencher) { - const MAX: usize = 10_000_000; - - use std::thread; - - fn spin(mut f: F) -> Result { - use futures::Async::Ready; - loop { - match f.poll() { - Ok(Ready(v)) => return Ok(v), - Ok(_) => {} - Err(e) => return Err(e), - } - } - } - - let mut ping_txs = vec![]; - let mut ping_rxs = vec![]; - let mut pong_txs = vec![]; - let mut pong_rxs = vec![]; - - for _ in 0..MAX { - let (tx, rx) = oneshot::channel::<()>(); - - ping_txs.push(Some(tx)); - ping_rxs.push(Some(rx)); - - let (tx, rx) = oneshot::channel::<()>(); - - pong_txs.push(Some(tx)); - pong_rxs.push(Some(rx)); - } - - thread::spawn(move || { - future::lazy(|| { - for i in 0..MAX { - let ping_rx = ping_rxs[i].take().unwrap(); - let pong_tx = pong_txs[i].take().unwrap(); - - if spin(ping_rx).is_err() { - return Ok(()); - } - - pong_tx.send(()).unwrap(); - } - - Ok::<(), ()>(()) - }) - .wait() - .unwrap(); - }); - - future::lazy(|| { - let mut i = 0; - - b.iter(|| { - let ping_tx = ping_txs[i].take().unwrap(); - let pong_rx = pong_rxs[i].take().unwrap(); - - ping_tx.send(()).unwrap(); - spin(pong_rx).unwrap(); - - i += 1; - }); - - Ok::<(), ()>(()) - }) - .wait() - .unwrap(); - } -} - -mod legacy { - use futures::sync::oneshot; - use futures::{future, Async, Future}; - use test::Bencher; - - #[bench] - fn new(b: &mut Bencher) { - b.iter(|| { - let _ = ::test::black_box(&oneshot::channel::()); - }) - } - - #[bench] - fn same_thread_send_recv(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = oneshot::channel(); - - let _ = tx.send(1); - - assert_eq!(Async::Ready(1), rx.poll().unwrap()); - }); - } - - #[bench] - fn same_thread_recv_multi_send_recv(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = oneshot::channel(); - - future::lazy(|| { - let _ = rx.poll(); - let _ = rx.poll(); - let _ = rx.poll(); - let _ = rx.poll(); - - let _ = tx.send(1); - assert_eq!(Async::Ready(1), rx.poll().unwrap()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }); - } - - #[bench] - fn multi_thread_send_recv(b: &mut Bencher) { - const MAX: usize = 10_000_000; - - use std::thread; - - fn spin(mut f: F) -> Result { - use futures::Async::Ready; - loop { - match f.poll() { - Ok(Ready(v)) => return Ok(v), - Ok(_) => {} - Err(e) => return Err(e), - } - } - } - - let mut ping_txs = vec![]; - let mut ping_rxs = vec![]; - let mut pong_txs = vec![]; - let mut pong_rxs = vec![]; - - for _ in 0..MAX { - let (tx, rx) = oneshot::channel::<()>(); - - ping_txs.push(Some(tx)); - ping_rxs.push(Some(rx)); - - let (tx, rx) = oneshot::channel::<()>(); - - pong_txs.push(Some(tx)); - pong_rxs.push(Some(rx)); - } - - thread::spawn(move || { - future::lazy(|| { - for i in 0..MAX { - let ping_rx = ping_rxs[i].take().unwrap(); - let pong_tx = pong_txs[i].take().unwrap(); - - if spin(ping_rx).is_err() { - return Ok(()); - } - - pong_tx.send(()).unwrap(); - } - - Ok::<(), ()>(()) - }) - .wait() - .unwrap(); - }); - - future::lazy(|| { - let mut i = 0; - - b.iter(|| { - let ping_tx = ping_txs[i].take().unwrap(); - let pong_rx = pong_rxs[i].take().unwrap(); - - ping_tx.send(()).unwrap(); - spin(pong_rx).unwrap(); - - i += 1; - }); - - Ok::<(), ()>(()) - }) - .wait() - .unwrap(); - } -} diff --git a/tokio-sync/src/barrier.rs b/tokio-sync/src/barrier.rs deleted file mode 100644 index 6a409e26..00000000 --- a/tokio-sync/src/barrier.rs +++ /dev/null @@ -1,134 +0,0 @@ -use crate::watch; -use std::sync::Mutex; - -/// A barrier enables multiple threads to synchronize the beginning of some computation. -/// -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// use std::sync::Arc; -/// use tokio_sync::Barrier; -/// use futures_util::future::join_all; -/// -/// let mut handles = Vec::with_capacity(10); -/// let barrier = Arc::new(Barrier::new(10)); -/// for _ in 0..10 { -/// let c = barrier.clone(); -/// // The same messages will be printed together. -/// // You will NOT see any interleaving. -/// handles.push(async move { -/// println!("before wait"); -/// let wr = c.wait().await; -/// println!("after wait"); -/// wr -/// }); -/// } -/// // Will not resolve until all "before wait" messages have been printed -/// let wrs = join_all(handles).await; -/// // Exactly one barrier will resolve as the "leader" -/// assert_eq!(wrs.into_iter().filter(|wr| wr.is_leader()).count(), 1); -/// # } -/// ``` -#[derive(Debug)] -pub struct Barrier { - state: Mutex, - wait: watch::Receiver, - n: usize, -} - -#[derive(Debug)] -struct BarrierState { - waker: watch::Sender, - arrived: usize, - generation: usize, -} - -impl Barrier { - /// Creates a new barrier that can block a given number of threads. - /// - /// A barrier will block `n`-1 threads which call [`Barrier::wait`] and then wake up all - /// threads at once when the `n`th thread calls `wait`. - pub fn new(mut n: usize) -> Barrier { - let (waker, wait) = crate::watch::channel(0); - - if n == 0 { - // if n is 0, it's not clear what behavior the user wants. - // in std::sync::Barrier, an n of 0 exhibits the same behavior as n == 1, where every - // .wait() immediately unblocks, so we adopt that here as well. - n = 1; - } - - Barrier { - state: Mutex::new(BarrierState { - waker, - arrived: 0, - generation: 1, - }), - n, - wait, - } - } - - /// Does not resolve until all tasks have rendezvoused here. - /// - /// Barriers are re-usable after all threads have rendezvoused once, and can - /// be used continuously. - /// - /// A single (arbitrary) future will receive a [`BarrierWaitResult`] that returns `true` from - /// [`BarrierWaitResult::is_leader`] when returning from this function, and all other threads - /// will receive a result that will return `false` from `is_leader`. - pub async fn wait(&self) -> BarrierWaitResult { - // NOTE: we are taking a _synchronous_ lock here. - // It is okay to do so because the critical section is fast and never yields, so it cannot - // deadlock even if another future is concurrently holding the lock. - // It is _desireable_ to do so as synchronous Mutexes are, at least in theory, faster than - // the asynchronous counter-parts, so we should use them where possible [citation needed]. - // NOTE: the extra scope here is so that the compiler doesn't think `state` is held across - // a yield point, and thus marks the returned future as !Send. - let generation = { - let mut state = self.state.lock().unwrap(); - let generation = state.generation; - state.arrived += 1; - if state.arrived == self.n { - // we are the leader for this generation - // wake everyone, increment the generation, and return - state - .waker - .broadcast(state.generation) - .expect("there is at least one receiver"); - state.arrived = 0; - state.generation += 1; - return BarrierWaitResult(true); - } - - generation - }; - - // we're going to have to wait for the last of the generation to arrive - let mut wait = self.wait.clone(); - - loop { - // note that the first time through the loop, this _will_ yield a generation - // immediately, since we cloned a receiver that has never seen any values. - if wait.recv().await.expect("sender hasn't been closed") >= generation { - break; - } - } - - BarrierWaitResult(false) - } -} - -/// A `BarrierWaitResult` is returned by `wait` when all threads in the `Barrier` have rendezvoused. -#[derive(Debug, Clone)] -pub struct BarrierWaitResult(bool); - -impl BarrierWaitResult { - /// Returns true if this thread from wait is the "leader thread". - /// - /// Only one thread will have `true` returned from their result, all other threads will have - /// `false` returned. - pub fn is_leader(&self) -> bool { - self.0 - } -} diff --git a/tokio-sync/src/lib.rs b/tokio-sync/src/lib.rs deleted file mode 100644 index 6055f024..00000000 --- a/tokio-sync/src/lib.rs +++ /dev/null @@ -1,43 +0,0 @@ -#![doc(html_root_url = "https://docs.rs/tokio-sync/0.2.0-alpha.6")] -#![warn( - missing_debug_implementations, - missing_docs, - rust_2018_idioms, - unreachable_pub -)] -#![deny(intra_doc_link_resolution_failure)] -#![doc(test( - no_crate_inject, - attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) -))] - -//! Asynchronous synchronization primitives. -//! -//! This crate provides primitives for synchronizing asynchronous tasks. - -macro_rules! debug { - ($($t:tt)*) => { - if false { - println!($($t)*); - } - } -} - -macro_rules! if_fuzz { - ($($t:tt)*) => {{ - if false { $($t)* } - }} -} - -mod barrier; -mod loom; -pub mod mpsc; -mod mutex; -pub mod oneshot; -pub mod semaphore; -mod task; -pub mod watch; - -pub use barrier::{Barrier, BarrierWaitResult}; -pub use mutex::{Mutex, MutexGuard}; -pub use task::AtomicWaker; diff --git a/tokio-sync/src/loom.rs b/tokio-sync/src/loom.rs deleted file mode 100644 index 564efc4f..00000000 --- a/tokio-sync/src/loom.rs +++ /dev/null @@ -1,38 +0,0 @@ -pub(crate) mod future { - pub(crate) use crate::task::AtomicWaker; -} - -pub(crate) mod sync { - pub(crate) use std::sync::atomic; - pub(crate) use std::sync::Arc; - - use std::cell::UnsafeCell; - - pub(crate) struct CausalCell(UnsafeCell); - - impl CausalCell { - pub(crate) fn new(data: T) -> CausalCell { - CausalCell(UnsafeCell::new(data)) - } - - pub(crate) fn with(&self, f: F) -> R - where - F: FnOnce(*const T) -> R, - { - f(self.0.get()) - } - - pub(crate) fn with_mut(&self, f: F) -> R - where - F: FnOnce(*mut T) -> R, - { - f(self.0.get()) - } - } -} - -pub(crate) mod thread { - pub(crate) fn yield_now() { - ::std::sync::atomic::spin_loop_hint(); - } -} diff --git a/tokio-sync/src/mpsc/block.rs b/tokio-sync/src/mpsc/block.rs deleted file mode 100644 index 7d7f2e53..00000000 --- a/tokio-sync/src/mpsc/block.rs +++ /dev/null @@ -1,386 +0,0 @@ -use crate::loom::{ - sync::atomic::{AtomicPtr, AtomicUsize}, - sync::CausalCell, - thread, -}; -use std::mem::MaybeUninit; -use std::ops; -use std::ptr::{self, NonNull}; -use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release}; - -/// A block in a linked list. -/// -/// Each block in the list can hold up to `BLOCK_CAP` messages. -pub(crate) struct Block { - /// The start index of this block. - /// - /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`. - start_index: usize, - - /// The next block in the linked list. - next: AtomicPtr>, - - /// Bitfield tracking slots that are ready to have their values consumed. - ready_slots: AtomicUsize, - - /// The observed `tail_position` value *after* the block has been passed by - /// `block_tail`. - observed_tail_position: CausalCell, - - /// Array containing values pushed into the block. Values are stored in a - /// continuous array in order to improve cache line behavior when reading. - /// The values must be manually dropped. - values: Values, -} - -pub(crate) enum Read { - Value(T), - Closed, -} - -struct Values([CausalCell>; BLOCK_CAP]); - -use super::BLOCK_CAP; - -/// Masks an index to get the block identifier -const BLOCK_MASK: usize = !(BLOCK_CAP - 1); - -/// Masks an index to get the value offset in a block. -const SLOT_MASK: usize = BLOCK_CAP - 1; - -/// Flag tracking that a block has gone through the sender's release routine. -/// -/// When this is set, the receiver may consider freeing the block. -const RELEASED: usize = 1 << BLOCK_CAP; - -/// Flag tracking all senders dropped. -/// -/// When this flag is set, the send half of the channel has closed. -const TX_CLOSED: usize = RELEASED << 1; - -/// Mask covering all bits used to track slot readiness. -const READY_MASK: usize = RELEASED - 1; - -/// Returns the index of the first slot in the block referenced by `slot_index`. -#[inline(always)] -pub(crate) fn start_index(slot_index: usize) -> usize { - BLOCK_MASK & slot_index -} - -/// Returns the offset into the block referenced by `slot_index`. -#[inline(always)] -pub(crate) fn offset(slot_index: usize) -> usize { - SLOT_MASK & slot_index -} - -impl Block { - pub(crate) fn new(start_index: usize) -> Block { - Block { - // The absolute index in the channel of the first slot in the block. - start_index, - - // Pointer to the next block in the linked list. - next: AtomicPtr::new(ptr::null_mut()), - - ready_slots: AtomicUsize::new(0), - - observed_tail_position: CausalCell::new(0), - - // Value storage - values: unsafe { Values::uninitialized() }, - } - } - - /// Returns `true` if the block matches the given index - pub(crate) fn is_at_index(&self, index: usize) -> bool { - debug_assert!(offset(index) == 0); - self.start_index == index - } - - /// Returns the number of blocks between `self` and the block at the - /// specified index. - /// - /// `start_index` must represent a block *after* `self`. - pub(crate) fn distance(&self, other_index: usize) -> usize { - debug_assert!(offset(other_index) == 0); - other_index.wrapping_sub(self.start_index) / BLOCK_CAP - } - - /// Read the value at the given offset. - /// - /// Returns `None` if the slot is empty. - /// - /// # Safety - /// - /// To maintain safety, the caller must ensure: - /// - /// * No concurrent access to the slot. - pub(crate) unsafe fn read(&self, slot_index: usize) -> Option> { - let offset = offset(slot_index); - - let ready_bits = self.ready_slots.load(Acquire); - - if !is_ready(ready_bits, offset) { - if is_tx_closed(ready_bits) { - return Some(Read::Closed); - } - - return None; - } - - // Get the value - let value = self.values[offset].with(|ptr| ptr::read(ptr)); - - Some(Read::Value(value.assume_init())) - } - - /// Write a value to the block at the given offset. - /// - /// # Safety - /// - /// To maintain safety, the caller must ensure: - /// - /// * The slot is empty. - /// * No concurrent access to the slot. - pub(crate) unsafe fn write(&self, slot_index: usize, value: T) { - // Get the offset into the block - let slot_offset = offset(slot_index); - - self.values[slot_offset].with_mut(|ptr| { - ptr::write(ptr, MaybeUninit::new(value)); - }); - - // Release the value. After this point, the slot ref may no longer - // be used. It is possible for the receiver to free the memory at - // any point. - self.set_ready(slot_offset); - } - - /// Signal to the receiver that the sender half of the list is closed. - pub(crate) unsafe fn tx_close(&self) { - self.ready_slots.fetch_or(TX_CLOSED, Release); - } - - /// Reset the block to a blank state. This enables reusing blocks in the - /// channel. - /// - /// # Safety - /// - /// To maintain safety, the caller must ensure: - /// - /// * All slots are empty. - /// * The caller holds a unique pointer to the block. - pub(crate) unsafe fn reclaim(&mut self) { - self.start_index = 0; - self.next = AtomicPtr::new(ptr::null_mut()); - self.ready_slots = AtomicUsize::new(0); - } - - /// Release the block to the rx half for freeing. - /// - /// This function is called by the tx half once it can be guaranteed that no - /// more senders will attempt to access the block. - /// - /// # Safety - /// - /// To maintain safety, the caller must ensure: - /// - /// * The block will no longer be accessed by any sender. - pub(crate) unsafe fn tx_release(&self, tail_position: usize) { - // Track the observed tail_position. Any sender targetting a greater - // tail_position is guaranteed to not access this block. - self.observed_tail_position - .with_mut(|ptr| *ptr = tail_position); - - // Set the released bit, signalling to the receiver that it is safe to - // free the block's memory as soon as all slots **prior** to - // `observed_tail_position` have been filled. - self.ready_slots.fetch_or(RELEASED, Release); - } - - /// Mark a slot as ready - fn set_ready(&self, slot: usize) { - let mask = 1 << slot; - self.ready_slots.fetch_or(mask, Release); - } - - /// Returns `true` when all slots have their `ready` bits set. - /// - /// This indicates that the block is in its final state and will no longer - /// be mutated. - /// - /// # Implementation - /// - /// The implementation walks each slot checking the `ready` flag. It might - /// be that it would make more sense to coalesce ready flags as bits in a - /// single atomic cell. However, this could have negative impact on cache - /// behavior as there would be many more mutations to a single slot. - pub(crate) fn is_final(&self) -> bool { - self.ready_slots.load(Acquire) & READY_MASK == READY_MASK - } - - /// Returns the `observed_tail_position` value, if set - pub(crate) fn observed_tail_position(&self) -> Option { - if 0 == RELEASED & self.ready_slots.load(Acquire) { - None - } else { - Some(self.observed_tail_position.with(|ptr| unsafe { *ptr })) - } - } - - /// Load the next block - pub(crate) fn load_next(&self, ordering: Ordering) -> Option>> { - let ret = NonNull::new(self.next.load(ordering)); - - debug_assert!(unsafe { - ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP)) - .unwrap_or(true) - }); - - ret - } - - /// Push `block` as the next block in the link. - /// - /// Returns Ok if successful, otherwise, a pointer to the next block in - /// the list is returned. - /// - /// This requires that the next pointer is null. - /// - /// # Ordering - /// - /// This performs a compare-and-swap on `next` using AcqRel ordering. - /// - /// # Safety - /// - /// To maintain safety, the caller must ensure: - /// - /// * `block` is not freed until it has been removed from the list. - pub(crate) unsafe fn try_push( - &self, - block: &mut NonNull>, - ordering: Ordering, - ) -> Result<(), NonNull>> { - block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP); - - let next_ptr = self - .next - .compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering); - - match NonNull::new(next_ptr) { - Some(next_ptr) => Err(next_ptr), - None => Ok(()), - } - } - - /// Grow the `Block` linked list by allocating and appending a new block. - /// - /// The next block in the linked list is returned. This may or may not be - /// the one allocated by the function call. - /// - /// # Implementation - /// - /// It is assumed that `self.next` is null. A new block is allocated with - /// `start_index` set to be the next block. A compare-and-swap is performed - /// with AcqRel memory ordering. If the compare-and-swap is successful, the - /// newly allocated block is released to other threads walking the block - /// linked list. If the compare-and-swap fails, the current thread acquires - /// the next block in the linked list, allowing the current thread to access - /// the slots. - pub(crate) fn grow(&self) -> NonNull> { - // Create the new block. It is assumed that the block will become the - // next one after `&self`. If this turns out to not be the case, - // `start_index` is updated accordingly. - let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP)); - - let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) }; - - // Attempt to store the block. The first compare-and-swap attempt is - // "unrolled" due to minor differences in logic - // - // `AcqRel` is used as the ordering **only** when attempting the - // compare-and-swap on self.next. - // - // If the compare-and-swap fails, then the actual value of the cell is - // returned from this function and accessed by the caller. Given this, - // the memory must be acquired. - // - // `Release` ensures that the newly allocated block is available to - // other threads acquiring the next pointer. - let next = NonNull::new(self.next.compare_and_swap( - ptr::null_mut(), - new_block.as_ptr(), - AcqRel, - )); - - let next = match next { - Some(next) => next, - None => { - // The compare-and-swap succeeded and the newly allocated block - // is successfully pushed. - return new_block; - } - }; - - // There already is a next block in the linked list. The newly allocated - // block could be dropped and the discovered next block returned; - // however, that would be wasteful. Instead, the linked list is walked - // by repeatedly attempting to compare-and-swap the pointer into the - // `next` register until the compare-and-swap succeed. - // - // Care is taken to update new_block's start_index field as appropriate. - - let mut curr = next; - - // TODO: Should this iteration be capped? - loop { - let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) }; - - curr = match actual { - Ok(_) => { - return next; - } - Err(curr) => curr, - }; - - // When running outside of loom, this calls `spin_loop_hint`. - thread::yield_now(); - } - } -} - -/// Returns `true` if the specificed slot has a value ready to be consumed. -fn is_ready(bits: usize, slot: usize) -> bool { - let mask = 1 << slot; - mask == mask & bits -} - -/// Returns `true` if the closed flag has been set. -fn is_tx_closed(bits: usize) -> bool { - TX_CLOSED == bits & TX_CLOSED -} - -impl Values { - unsafe fn uninitialized() -> Values { - let mut vals = MaybeUninit::uninit(); - - // When fuzzing, `CausalCell` needs to be initialized. - if_fuzz! { - let p = vals.as_mut_ptr() as *mut CausalCell>; - for i in 0..BLOCK_CAP { - p.add(i) - .write(CausalCell::new(MaybeUninit::uninit())); - } - } - - Values(vals.assume_init()) - } -} - -impl ops::Index for Values { - type Output = CausalCell>; - - fn index(&self, index: usize) -> &Self::Output { - self.0.index(index) - } -} diff --git a/tokio-sync/src/mpsc/bounded.rs b/tokio-sync/src/mpsc/bounded.rs deleted file mode 100644 index 711173ae..00000000 --- a/tokio-sync/src/mpsc/bounded.rs +++ /dev/null @@ -1,340 +0,0 @@ -use super::chan; - -use std::fmt; -use std::task::{Context, Poll}; - -#[cfg(feature = "async-traits")] -use std::pin::Pin; - -/// Send values to the associated `Receiver`. -/// -/// Instances are created by the [`channel`](fn.channel.html) function. -pub struct Sender { - chan: chan::Tx, -} - -impl Clone for Sender { - fn clone(&self) -> Self { - Sender { - chan: self.chan.clone(), - } - } -} - -impl fmt::Debug for Sender { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Sender") - .field("chan", &self.chan) - .finish() - } -} - -/// Receive values from the associated `Sender`. -/// -/// Instances are created by the [`channel`](fn.channel.html) function. -pub struct Receiver { - /// The channel receiver - chan: chan::Rx, -} - -impl fmt::Debug for Receiver { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Receiver") - .field("chan", &self.chan) - .finish() - } -} - -/// Error returned by the `Sender`. -#[derive(Debug)] -pub struct SendError(()); - -/// Error returned by `Sender::try_send`. -#[derive(Debug)] -pub struct TrySendError { - kind: ErrorKind, - value: T, -} - -#[derive(Debug)] -enum ErrorKind { - Closed, - NoCapacity, -} - -/// Error returned by `Receiver`. -#[derive(Debug)] -pub struct RecvError(()); - -/// Create a bounded mpsc channel for communicating between asynchronous tasks, -/// returning the sender/receiver halves. -/// -/// All data sent on `Sender` will become available on `Receiver` in the same -/// order as it was sent. -/// -/// The `Sender` can be cloned to `send` to the same channel from multiple code -/// locations. Only one `Receiver` is supported. -/// -/// If the `Receiver` is disconnected while trying to `send`, the `send` method -/// will return a `SendError`. Similarly, if `Sender` is disconnected while -/// trying to `recv`, the `recv` method will return a `RecvError`. -/// -/// # Examples -/// -/// ```rust -/// use tokio::sync::mpsc; -/// -/// #[tokio::main] -/// async fn main() { -/// let (mut tx, mut rx) = mpsc::channel(100); -/// -/// tokio::spawn(async move { -/// for i in 0..10 { -/// if let Err(_) = tx.send(i).await { -/// println!("receiver dropped"); -/// return; -/// } -/// } -/// }); -/// -/// while let Some(i) = rx.recv().await { -/// println!("got = {}", i); -/// } -/// } -/// ``` -pub fn channel(buffer: usize) -> (Sender, Receiver) { - assert!(buffer > 0, "mpsc bounded channel requires buffer > 0"); - let semaphore = (crate::semaphore::Semaphore::new(buffer), buffer); - let (tx, rx) = chan::channel(semaphore); - - let tx = Sender::new(tx); - let rx = Receiver::new(rx); - - (tx, rx) -} - -/// Channel semaphore is a tuple of the semaphore implementation and a `usize` -/// representing the channel bound. -type Semaphore = (crate::semaphore::Semaphore, usize); - -impl Receiver { - pub(crate) fn new(chan: chan::Rx) -> Receiver { - Receiver { chan } - } - - /// Receive the next value for this receiver. - /// - /// `None` is returned when all `Sender` halves have dropped, indicating - /// that no further values can be sent on the channel. - /// - /// # Examples - /// - /// ``` - /// use tokio::sync::mpsc; - /// - /// #[tokio::main] - /// async fn main() { - /// let (mut tx, mut rx) = mpsc::channel(100); - /// - /// tokio::spawn(async move { - /// tx.send("hello").await.unwrap(); - /// }); - /// - /// assert_eq!(Some("hello"), rx.recv().await); - /// assert_eq!(None, rx.recv().await); - /// } - /// ``` - /// - /// Values are buffered: - /// - /// ``` - /// use tokio::sync::mpsc; - /// - /// #[tokio::main] - /// async fn main() { - /// let (mut tx, mut rx) = mpsc::channel(100); - /// - /// tx.send("hello").await.unwrap(); - /// tx.send("world").await.unwrap(); - /// - /// assert_eq!(Some("hello"), rx.recv().await); - /// assert_eq!(Some("world"), rx.recv().await); - /// } - /// ``` - pub async fn recv(&mut self) -> Option { - use futures_util::future::poll_fn; - - poll_fn(|cx| self.poll_recv(cx)).await - } - - #[doc(hidden)] // TODO: remove - pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { - self.chan.recv(cx) - } - - /// Closes the receiving half of a channel, without dropping it. - /// - /// This prevents any further messages from being sent on the channel while - /// still enabling the receiver to drain messages that are buffered. - pub fn close(&mut self) { - self.chan.close(); - } -} - -#[cfg(feature = "async-traits")] -impl futures_core::Stream for Receiver { - type Item = T; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_mut().poll_recv(cx) - } -} - -impl Sender { - pub(crate) fn new(chan: chan::Tx) -> Sender { - Sender { chan } - } - - #[doc(hidden)] // TODO: remove - pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.chan.poll_ready(cx).map_err(|_| SendError(())) - } - - /// Attempts to send a message on this `Sender`, returning the message - /// if there was an error. - pub fn try_send(&mut self, message: T) -> Result<(), TrySendError> { - self.chan.try_send(message)?; - Ok(()) - } - - /// Send a value, waiting until there is capacity. - /// - /// # Examples - /// - /// In the following example, each call to `send` will block until the - /// previously sent value was received. - /// - /// ```rust - /// use tokio::sync::mpsc; - /// - /// #[tokio::main] - /// async fn main() { - /// let (mut tx, mut rx) = mpsc::channel(1); - /// - /// tokio::spawn(async move { - /// for i in 0..10 { - /// if let Err(_) = tx.send(i).await { - /// println!("receiver dropped"); - /// return; - /// } - /// } - /// }); - /// - /// while let Some(i) = rx.recv().await { - /// println!("got = {}", i); - /// } - /// } - /// ``` - pub async fn send(&mut self, value: T) -> Result<(), SendError> { - use futures_util::future::poll_fn; - - poll_fn(|cx| self.poll_ready(cx)).await?; - - self.try_send(value).map_err(|_| SendError(())) - } -} - -#[cfg(feature = "async-traits")] -impl futures_sink::Sink for Sender { - type Error = SendError; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Sender::poll_ready(self.get_mut(), cx) - } - - fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { - self.as_mut().try_send(msg).map_err(|err| { - assert!(err.is_full(), "call `poll_ready` before sending"); - SendError(()) - }) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } -} - -// ===== impl SendError ===== - -impl fmt::Display for SendError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "channel closed") - } -} - -impl ::std::error::Error for SendError {} - -// ===== impl TrySendError ===== - -impl TrySendError { - /// Get the inner value. - pub fn into_inner(self) -> T { - self.value - } - - /// Did the send fail because the channel has been closed? - pub fn is_closed(&self) -> bool { - if let ErrorKind::Closed = self.kind { - true - } else { - false - } - } - - /// Did the send fail because the channel was at capacity? - pub fn is_full(&self) -> bool { - if let ErrorKind::NoCapacity = self.kind { - true - } else { - false - } - } -} - -impl fmt::Display for TrySendError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - let descr = match self.kind { - ErrorKind::Closed => "channel closed", - ErrorKind::NoCapacity => "no available capacity", - }; - write!(fmt, "{}", descr) - } -} - -impl ::std::error::Error for TrySendError {} - -impl From<(T, chan::TrySendError)> for TrySendError { - fn from((value, err): (T, chan::TrySendError)) -> TrySendError { - TrySendError { - value, - kind: match err { - chan::TrySendError::Closed => ErrorKind::Closed, - chan::TrySendError::NoPermits => ErrorKind::NoCapacity, - }, - } - } -} - -// ===== impl R