summaryrefslogtreecommitdiffstats
path: root/tokio-sync
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-sync')
-rw-r--r--tokio-sync/CHANGELOG.md76
-rw-r--r--tokio-sync/Cargo.toml39
-rw-r--r--tokio-sync/LICENSE25
-rw-r--r--tokio-sync/README.md13
-rw-r--r--tokio-sync/benches/mpsc.rs536
-rw-r--r--tokio-sync/benches/oneshot.rs239
-rw-r--r--tokio-sync/src/barrier.rs134
-rw-r--r--tokio-sync/src/lib.rs43
-rw-r--r--tokio-sync/src/loom.rs38
-rw-r--r--tokio-sync/src/mpsc/block.rs386
-rw-r--r--tokio-sync/src/mpsc/bounded.rs340
-rw-r--r--tokio-sync/src/mpsc/chan.rs450
-rw-r--r--tokio-sync/src/mpsc/list.rs347
-rw-r--r--tokio-sync/src/mpsc/mod.rs61
-rw-r--r--tokio-sync/src/mpsc/unbounded.rs233
-rw-r--r--tokio-sync/src/mutex.rs148
-rw-r--r--tokio-sync/src/oneshot.rs576
-rw-r--r--tokio-sync/src/semaphore.rs1142
-rw-r--r--tokio-sync/src/task/atomic_waker.rs323
-rw-r--r--tokio-sync/src/task/mod.rs5
-rw-r--r--tokio-sync/src/watch.rs459
-rw-r--r--tokio-sync/tests/atomic_waker.rs37
-rw-r--r--tokio-sync/tests/barrier.rs93
-rw-r--r--tokio-sync/tests/errors.rs29
-rw-r--r--tokio-sync/tests/fuzz_atomic_waker.rs53
-rw-r--r--tokio-sync/tests/fuzz_list.rs71
-rw-r--r--tokio-sync/tests/fuzz_mpsc.rs40
-rw-r--r--tokio-sync/tests/fuzz_oneshot.rs115
-rw-r--r--tokio-sync/tests/fuzz_semaphore.rs160
-rw-r--r--tokio-sync/tests/mpsc.rs453
-rw-r--r--tokio-sync/tests/mutex.rs79
-rw-r--r--tokio-sync/tests/oneshot.rs228
-rw-r--r--tokio-sync/tests/semaphore.rs153
-rw-r--r--tokio-sync/tests/watch.rs287
34 files changed, 0 insertions, 7411 deletions
diff --git a/tokio-sync/CHANGELOG.md b/tokio-sync/CHANGELOG.md
deleted file mode 100644
index 44033503..00000000
--- a/tokio-sync/CHANGELOG.md
+++ /dev/null
@@ -1,76 +0,0 @@
-# 0.2.0-alpha.6 (September 30, 2019)
-
-- Move to `futures-*-preview 0.3.0-alpha.19`
-- Move to `pin-project 0.4`
-
-# 0.2.0-alpha.5 (September 19, 2019)
-
-### Changed
-- rename `Lock` -> `Mutex` and make it more like `std::sync::Mutex` (#1573).
-
-### Added
-- `Barrier`, an async version of `std::sync::Barrier` (#1571).
-
-# 0.2.0-alpha.4 (August 29, 2019)
-
-- Track tokio release.
-
-# 0.2.0-alpha.3 (August 23, 2019)
-
-- Track `tokio` version number
-
-# 0.2.0-alpha.2 (August 17, 2019)
-
-### Changed
-- Update `futures` dependency to 0.3.0-alpha.18.
-
-# 0.2.0-alpha.1 (August 8, 2019)
-
-### Changed
-- Switch to `async`, `await`, and `std::future`.
-
-# 0.1.6 (June 4, 2019)
-
-### Added
-- Add Sync impl for Lock (#1117).
-
-# 0.1.5 (April 22, 2019)
-
-### Added
-- Add asynchronous mutual exclusion primitive (#964).
-
-# 0.1.4 (March 13, 2019)
-
-### Fixed
-- Fix memory leak on channel drop (#917).
-
-### Added
-- `std::error::Error` implementation for `oneshot`, `watch` error types (#967).
-
-# 0.1.3 (March 1, 2019)
-
-### Added
-- `Watch`, a single value broadcast channel (#922).
-- `std::error::Error` implementation for more `mpsc` types (#937).
-
-# 0.1.2 (February 20, 2019)
-
-### Fixes
-- `mpsc` and `Semaphore` when releasing permits (#904).
-- `oneshot` task handle leak (#911).
-
-### Changes
-- Performance improvements in `AtomicTask` (#892).
-- Improved assert message when creating a channel with bound of zero (#906).
-
-### Adds
-- `AtomicTask::take_task` (#895).
-
-# 0.1.1 (February 1, 2019)
-
-### Fixes
-- Panic when creating a channel with bound 0 (#879).
-
-# 0.1.0 (January 24, 2019)
-
-- Initial Release
diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml
deleted file mode 100644
index bb2fd607..00000000
--- a/tokio-sync/Cargo.toml
+++ /dev/null
@@ -1,39 +0,0 @@
-[package]
-name = "tokio-sync"
-# When releasing to crates.io:
-# - Remove path dependencies
-# - Update html_root_url.
-# - Update doc url
-# - Cargo.toml
-# - Update CHANGELOG.md.
-# - Create "v0.2.x" git tag.
-version = "0.2.0-alpha.6"
-edition = "2018"
-authors = ["Tokio Contributors <team@tokio.rs>"]
-license = "MIT"
-repository = "https://github.com/tokio-rs/tokio"
-homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-sync/0.2.0-alpha.6/tokio_sync"
-description = """
-Synchronization utilities.
-"""
-categories = ["asynchronous"]
-
-[features]
-async-traits = ["futures-sink-preview"]
-
-[dependencies]
-fnv = "1.0.6"
-futures-core-preview = { version = "=0.3.0-alpha.19" }
-futures-sink-preview = { version = "=0.3.0-alpha.19", optional = true }
-futures-util-preview = { version = "=0.3.0-alpha.19" }
-
-[dev-dependencies]
-tokio = { version = "0.2.0-alpha.6", path = "../tokio" }
-tokio-test = { version = "0.2.0-alpha.6", path = "../tokio-test" }
-
-env_logger = { version = "0.6", default-features = false }
-loom = { version = "0.2.1", features = ["futures"] }
-
-[package.metadata.docs.rs]
-all-features = true
diff --git a/tokio-sync/LICENSE b/tokio-sync/LICENSE
deleted file mode 100644
index cdb28b4b..00000000
--- a/tokio-sync/LICENSE
+++ /dev/null
@@ -1,25 +0,0 @@
-Copyright (c) 2019 Tokio Contributors
-
-Permission is hereby granted, free of charge, to any
-person obtaining a copy of this software and associated
-documentation files (the "Software"), to deal in the
-Software without restriction, including without
-limitation the rights to use, copy, modify, merge,
-publish, distribute, sublicense, and/or sell copies of
-the Software, and to permit persons to whom the Software
-is furnished to do so, subject to the following
-conditions:
-
-The above copyright notice and this permission notice
-shall be included in all copies or substantial portions
-of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
-ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
-TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
-PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
-SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
-CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
-OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
-IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
-DEALINGS IN THE SOFTWARE.
diff --git a/tokio-sync/README.md b/tokio-sync/README.md
deleted file mode 100644
index 6e209f69..00000000
--- a/tokio-sync/README.md
+++ /dev/null
@@ -1,13 +0,0 @@
-# tokio-sync
-
-Synchronization utilities
-
-## License
-
-This project is licensed under the [MIT license](LICENSE).
-
-### Contribution
-
-Unless you explicitly state otherwise, any contribution intentionally submitted
-for inclusion in Tokio by you, shall be licensed as MIT, without any additional
-terms or conditions.
diff --git a/tokio-sync/benches/mpsc.rs b/tokio-sync/benches/mpsc.rs
deleted file mode 100644
index 95d8e5c7..00000000
--- a/tokio-sync/benches/mpsc.rs
+++ /dev/null
@@ -1,536 +0,0 @@
-#![cfg(feature = "broken")]
-#![feature(test)]
-#![warn(rust_2018_idioms)]
-
-extern crate test;
-
-type Medium = [usize; 64];
-type Large = [Medium; 64];
-
-mod tokio {
- use futures::{future, Async, Future, Sink, Stream};
- use std::thread;
- use test::{self, Bencher};
- use tokio_sync::mpsc::*;
-
- #[bench]
- fn bounded_new_medium(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&channel::<super::Medium>(1_000));
- })
- }
-
- #[bench]
- fn unbounded_new_medium(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&unbounded_channel::<super::Medium>());
- })
- }
- #[bench]
- fn bounded_new_large(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&channel::<super::Large>(1_000));
- })
- }
-
- #[bench]
- fn unbounded_new_large(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&unbounded_channel::<super::Large>());
- })
- }
-
- #[bench]
- fn send_one_message(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel(1_000);
-
- // Send
- tx.try_send(1).unwrap();
-
- // Receive
- assert_eq!(Async::Ready(Some(1)), rx.poll().unwrap());
- })
- }
-
- #[bench]
- fn send_one_message_large(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel::<super::Large>(1_000);
-
- // Send
- let _ = tx.try_send([[0; 64]; 64]);
-
- // Receive
- let _ = test::black_box(&rx.poll());
- })
- }
-
- #[bench]
- fn bounded_rx_not_ready(b: &mut Bencher) {
- let (_tx, mut rx) = channel::<i32>(1_000);
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn bounded_tx_poll_ready(b: &mut Bencher) {
- let (mut tx, _rx) = channel::<i32>(1);
- b.iter(|| {
- future::lazy(|| {
- assert!(tx.poll_ready().unwrap().is_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn bounded_tx_poll_not_ready(b: &mut Bencher) {
- let (mut tx, _rx) = channel::<i32>(1);
- tx.try_send(1).unwrap();
- b.iter(|| {
- future::lazy(|| {
- assert!(tx.poll_ready().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn unbounded_rx_not_ready(b: &mut Bencher) {
- let (_tx, mut rx) = unbounded_channel::<i32>();
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn unbounded_rx_not_ready_x5(b: &mut Bencher) {
- let (_tx, mut rx) = unbounded_channel::<i32>();
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn bounded_uncontended_1(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel(1_000);
-
- for i in 0..1000 {
- tx.try_send(i).unwrap();
- // No need to create a task, because poll is not going to park.
- assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap());
- }
- })
- }
-
- #[bench]
- fn bounded_uncontended_1_large(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel::<super::Large>(1_000);
-
- for i in 0..1000 {
- let _ = tx.try_send([[i; 64]; 64]);
- // No need to create a task, because poll is not going to park.
- let _ = test::black_box(&rx.poll());
- }
- })
- }
-
- #[bench]
- fn bounded_uncontended_2(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel(1000);
-
- for i in 0..1000 {
- tx.try_send(i).unwrap();
- }
-
- for i in 0..1000 {
- // No need to create a task, because poll is not going to park.
- assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap());
- }
- })
- }
-
- #[bench]
- fn contended_unbounded_tx(b: &mut Bencher) {
- let mut threads = vec![];
- let mut txs = vec![];
-
- for _ in 0..4 {
- let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
- txs.push(tx);
-
- threads.push(thread::spawn(move || {
- for mut tx in rx.iter() {
- for i in 0..1_000 {
- tx.try_send(i).unwrap();
- }
- }
- }));
- }
-
- b.iter(|| {
- // TODO make unbounded
- let (tx, rx) = channel::<i32>(1_000_000);
-
- for th in &txs {
- th.send(tx.clone()).unwrap();
- }
-
- drop(tx);
-
- let rx = rx.wait().take(4 * 1_000);
-
- for v in rx {
- let _ = test::black_box(v);
- }
- });
-
- drop(txs);
-
- for th in threads {
- th.join().unwrap();
- }
- }
-
- #[bench]
- fn contended_bounded_tx(b: &mut Bencher) {
- const THREADS: usize = 4;
- const ITERS: usize = 100;
-
- let mut threads = vec![];
- let mut txs = vec![];
-
- for _ in 0..THREADS {
- let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
- txs.push(tx);
-
- threads.push(thread::spawn(move || {
- for tx in rx.iter() {
- let mut tx = tx.wait();
- for i in 0..ITERS {
- tx.send(i as i32).unwrap();
- }
- }
- }));
- }
-
- b.iter(|| {
- let (tx, rx) = channel::<i32>(1);
-
- for th in &txs {
- th.send(tx.clone()).unwrap();
- }
-
- drop(tx);
-
- let rx = rx.wait().take(THREADS * ITERS);
-
- for v in rx {
- let _ = test::black_box(v);
- }
- });
-
- drop(txs);
-
- for th in threads {
- th.join().unwrap();
- }
- }
-}
-
-mod legacy {
- use futures::sync::mpsc::*;
- use futures::{future, Async, Future, Sink, Stream};
- use std::thread;
- use test::{self, Bencher};
-
- #[bench]
- fn bounded_new_medium(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&channel::<super::Medium>(1_000));
- })
- }
-
- #[bench]
- fn unbounded_new_medium(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&unbounded::<super::Medium>());
- })
- }
-
- #[bench]
- fn bounded_new_large(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&channel::<super::Large>(1_000));
- })
- }
-
- #[bench]
- fn unbounded_new_large(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&unbounded::<super::Large>());
- })
- }
-
- #[bench]
- fn send_one_message(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel(1_000);
-
- // Send
- tx.try_send(1).unwrap();
-
- // Receive
- assert_eq!(Ok(Async::Ready(Some(1))), rx.poll());
- })
- }
-
- #[bench]
- fn send_one_message_large(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel::<super::Large>(1_000);
-
- // Send
- let _ = tx.try_send([[0; 64]; 64]);
-
- // Receive
- let _ = test::black_box(&rx.poll());
- })
- }
-
- #[bench]
- fn bounded_rx_not_ready(b: &mut Bencher) {
- let (_tx, mut rx) = channel::<i32>(1_000);
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn bounded_tx_poll_ready(b: &mut Bencher) {
- let (mut tx, _rx) = channel::<i32>(0);
- b.iter(|| {
- future::lazy(|| {
- assert!(tx.poll_ready().unwrap().is_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn bounded_tx_poll_not_ready(b: &mut Bencher) {
- let (mut tx, _rx) = channel::<i32>(0);
- tx.try_send(1).unwrap();
- b.iter(|| {
- future::lazy(|| {
- assert!(tx.poll_ready().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn unbounded_rx_not_ready(b: &mut Bencher) {
- let (_tx, mut rx) = unbounded::<i32>();
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn unbounded_rx_not_ready_x5(b: &mut Bencher) {
- let (_tx, mut rx) = unbounded::<i32>();
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn unbounded_uncontended_1(b: &mut Bencher) {
- b.iter(|| {
- let (tx, mut rx) = unbounded();
-
- for i in 0..1000 {
- UnboundedSender::unbounded_send(&tx, i).expect("send");
- // No need to create a task, because poll is not going to park.
- assert_eq!(Ok(Async::Ready(Some(i))), rx.poll());
- }
- })
- }
-
- #[bench]
- fn unbounded_uncontended_1_large(b: &mut Bencher) {
- b.iter(|| {
- let (tx, mut rx) = unbounded::<super::Large>();
-
- for i in 0..1000 {
- let _ = UnboundedSender::unbounded_send(&tx, [[i; 64]; 64]);
- // No need to create a task, because poll is not going to park.
- let _ = test::black_box(&rx.poll());
- }
- })
- }
-
- #[bench]
- fn unbounded_uncontended_2(b: &mut Bencher) {
- b.iter(|| {
- let (tx, mut rx) = unbounded();
-
- for i in 0..1000 {
- UnboundedSender::unbounded_send(&tx, i).expect("send");
- }
-
- for i in 0..1000 {
- // No need to create a task, because poll is not going to park.
- assert_eq!(Ok(Async::Ready(Some(i))), rx.poll());
- }
- })
- }
-
- #[bench]
- fn multi_thread_unbounded_tx(b: &mut Bencher) {
- let mut threads = vec![];
- let mut txs = vec![];
-
- for _ in 0..4 {
- let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
- txs.push(tx);
-
- threads.push(thread::spawn(move || {
- for mut tx in rx.iter() {
- for i in 0..1_000 {
- tx.try_send(i).unwrap();
- }
- }
- }));
- }
-
- b.iter(|| {
- let (tx, rx) = channel::<i32>(1_000_000);
-
- for th in &txs {
- th.send(tx.clone()).unwrap();
- }
-
- drop(tx);
-
- let rx = rx.wait().take(4 * 1_000);
-
- for v in rx {
- let _ = test::black_box(v);
- }
- });
-
- drop(txs);
-
- for th in threads {
- th.join().unwrap();
- }
- }
-
- #[bench]
- fn contended_bounded_tx(b: &mut Bencher) {
- const THREADS: usize = 4;
- const ITERS: usize = 100;
-
- let mut threads = vec![];
- let mut txs = vec![];
-
- for _ in 0..THREADS {
- let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
- txs.push(tx);
-
- threads.push(thread::spawn(move || {
- for tx in rx.iter() {
- let mut tx = tx.wait();
- for i in 0..ITERS {
- tx.send(i as i32).unwrap();
- }
- }
- }));
- }
-
- b.iter(|| {
- let (tx, rx) = channel::<i32>(1);
-
- for th in &txs {
- th.send(tx.clone()).unwrap();
- }
-
- drop(tx);
-
- let rx = rx.wait().take(THREADS * ITERS);
-
- for v in rx {
- let _ = test::black_box(v);
- }
- });
-
- drop(txs);
-
- for th in threads {
- th.join().unwrap();
- }
- }
-}
diff --git a/tokio-sync/benches/oneshot.rs b/tokio-sync/benches/oneshot.rs
deleted file mode 100644
index b2f37805..00000000
--- a/tokio-sync/benches/oneshot.rs
+++ /dev/null
@@ -1,239 +0,0 @@
-#![cfg(feature = "broken")]
-#![feature(test)]
-#![warn(rust_2018_idioms)]
-
-extern crate test;
-
-mod tokio {
- use futures::{future, Async, Future};
- use test::Bencher;
- use tokio_sync::oneshot;
-
- #[bench]
- fn new(b: &mut Bencher) {
- b.iter(|| {
- let _ = ::test::black_box(&oneshot::channel::<i32>());
- })
- }
-
- #[bench]
- fn same_thread_send_recv(b: &mut Bencher) {
- b.iter(|| {
- let (tx, mut rx) = oneshot::channel();
-
- let _ = tx.send(1);
-
- assert_eq!(Async::Ready(1), rx.poll().unwrap());
- });
- }
-
- #[bench]
- fn same_thread_recv_multi_send_recv(b: &mut Bencher) {
- b.iter(|| {
- let (tx, mut rx) = oneshot::channel();
-
- future::lazy(|| {
- let _ = rx.poll();
- let _ = rx.poll();
- let _ = rx.poll();
- let _ = rx.poll();
-
- let _ = tx.send(1);
- assert_eq!(Async::Ready(1), rx.poll().unwrap());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- });
- }
-
- #[bench]
- fn multi_thread_send_recv(b: &mut Bencher) {
- const MAX: usize = 10_000_000;
-
- use std::thread;
-
- fn spin<F: Future>(mut f: F) -> Result<F::Item, F::Error> {
- use futures::Async::Ready;
- loop {
- match f.poll() {
- Ok(Ready(v)) => return Ok(v),
- Ok(_) => {}
- Err(e) => return Err(e),
- }
- }
- }
-
- let mut ping_txs = vec![];
- let mut ping_rxs = vec![];
- let mut pong_txs = vec![];
- let mut pong_rxs = vec![];
-
- for _ in 0..MAX {
- let (tx, rx) = oneshot::channel::<()>();
-
- ping_txs.push(Some(tx));
- ping_rxs.push(Some(rx));
-
- let (tx, rx) = oneshot::channel::<()>();
-
- pong_txs.push(Some(tx));
- pong_rxs.push(Some(rx));
- }
-
- thread::spawn(move || {
- future::lazy(|| {
- for i in 0..MAX {
- let ping_rx = ping_rxs[i].take().unwrap();
- let pong_tx = pong_txs[i].take().unwrap();
-
- if spin(ping_rx).is_err() {
- return Ok(());
- }
-
- pong_tx.send(()).unwrap();
- }
-
- Ok::<(), ()>(())
- })
- .wait()
- .unwrap();
- });
-
- future::lazy(|| {
- let mut i = 0;
-
- b.iter(|| {
- let ping_tx = ping_txs[i].take().unwrap();
- let pong_rx = pong_rxs[i].take().unwrap();
-
- ping_tx.send(()).unwrap();
- spin(pong_rx).unwrap();
-
- i += 1;
- });
-
- Ok::<(), ()>(())
- })
- .wait()
- .unwrap();
- }
-}
-
-mod legacy {
- use futures::sync::oneshot;
- use futures::{future, Async, Future};
- use test::Bencher;
-
- #[bench]
- fn new(b: &mut Bencher) {
- b.iter(|| {
- let _ = ::test::black_box(&oneshot::channel::<i32>());
- })
- }
-
- #[bench]
- fn same_thread_send_recv(b: &mut Bencher) {
- b.iter(|| {
- let (tx, mut rx) = oneshot::channel();
-
- let _ = tx.send(1);
-
- assert_eq!(Async::Ready(1), rx.poll().unwrap());
- });
- }
-
- #[bench]
- fn same_thread_recv_multi_send_recv(b: &mut Bencher) {
- b.iter(|| {
- let (tx, mut rx) = oneshot::channel();
-
- future::lazy(|| {
- let _ = rx.poll();
- let _ = rx.poll();
- let _ = rx.poll();
- let _ = rx.poll();
-
- let _ = tx.send(1);
- assert_eq!(Async::Ready(1), rx.poll().unwrap());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- });
- }
-
- #[bench]
- fn multi_thread_send_recv(b: &mut Bencher) {
- const MAX: usize = 10_000_000;
-
- use std::thread;
-
- fn spin<F: Future>(mut f: F) -> Result<F::Item, F::Error> {
- use futures::Async::Ready;
- loop {
- match f.poll() {
- Ok(Ready(v)) => return Ok(v),
- Ok(_) => {}
- Err(e) => return Err(e),
- }
- }
- }
-
- let mut ping_txs = vec![];
- let mut ping_rxs = vec![];
- let mut pong_txs = vec![];
- let mut pong_rxs = vec![];
-
- for _ in 0..MAX {
- let (tx, rx) = oneshot::channel::<()>();
-
- ping_txs.push(Some(tx));
- ping_rxs.push(Some(rx));
-
- let (tx, rx) = oneshot::channel::<()>();
-
- pong_txs.push(Some(tx));
- pong_rxs.push(Some(rx));
- }
-
- thread::spawn(move || {
- future::lazy(|| {
- for i in 0..MAX {
- let ping_rx = ping_rxs[i].take().unwrap();
- let pong_tx = pong_txs[i].take().unwrap();
-
- if spin(ping_rx).is_err() {
- return Ok(());
- }
-
- pong_tx.send(()).unwrap();
- }
-
- Ok::<(), ()>(())
- })
- .wait()
- .unwrap();
- });
-
- future::lazy(|| {
- let mut i = 0;
-
- b.iter(|| {
- let ping_tx = ping_txs[i].take().unwrap();
- let pong_rx = pong_rxs[i].take().unwrap();
-
- ping_tx.send(()).unwrap();
- spin(pong_rx).unwrap();
-
- i += 1;
- });
-
- Ok::<(), ()>(())