summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.toml1
-rw-r--r--azure-pipelines.yml2
-rw-r--r--ci/patch.toml2
-rw-r--r--tokio-sync/CHANGELOG.md76
-rw-r--r--tokio-sync/Cargo.toml39
-rw-r--r--tokio-sync/LICENSE25
-rw-r--r--tokio-sync/README.md13
-rw-r--r--tokio-sync/benches/mpsc.rs536
-rw-r--r--tokio-sync/benches/oneshot.rs239
-rw-r--r--tokio-sync/src/lib.rs43
-rw-r--r--tokio-sync/src/loom.rs38
-rw-r--r--tokio-test/Cargo.toml1
-rw-r--r--tokio/Cargo.toml9
-rw-r--r--tokio/benches/mpsc.rs270
-rw-r--r--tokio/benches/oneshot.rs120
-rw-r--r--tokio/benches/thread_pool.rs2
-rw-r--r--tokio/src/executor/blocking/mod.rs2
-rw-r--r--tokio/src/executor/thread_pool/shutdown.rs3
-rw-r--r--tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs3
-rw-r--r--tokio/src/net/unix/incoming.rs2
-rw-r--r--tokio/src/net/unix/listener.rs1
-rw-r--r--tokio/src/net/unix/mod.rs1
-rw-r--r--tokio/src/signal/registry.rs2
-rw-r--r--tokio/src/signal/unix.rs3
-rw-r--r--tokio/src/signal/windows.rs5
-rw-r--r--tokio/src/sync/barrier.rs (renamed from tokio-sync/src/barrier.rs)7
-rw-r--r--tokio/src/sync/loom.rs48
-rw-r--r--tokio/src/sync/mod.rs (renamed from tokio/src/sync.rs)46
-rw-r--r--tokio/src/sync/mpsc/block.rs (renamed from tokio-sync/src/mpsc/block.rs)5
-rw-r--r--tokio/src/sync/mpsc/bounded.rs (renamed from tokio-sync/src/mpsc/bounded.rs)13
-rw-r--r--tokio/src/sync/mpsc/chan.rs (renamed from tokio-sync/src/mpsc/chan.rs)11
-rw-r--r--tokio/src/sync/mpsc/list.rs (renamed from tokio-sync/src/mpsc/list.rs)5
-rw-r--r--tokio/src/sync/mpsc/mod.rs (renamed from tokio-sync/src/mpsc/mod.rs)18
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs (renamed from tokio-sync/src/mpsc/unbounded.rs)7
-rw-r--r--tokio/src/sync/mutex.rs (renamed from tokio-sync/src/mutex.rs)3
-rw-r--r--tokio/src/sync/oneshot.rs (renamed from tokio-sync/src/oneshot.rs)2
-rw-r--r--tokio/src/sync/semaphore.rs (renamed from tokio-sync/src/semaphore.rs)4
-rw-r--r--tokio/src/sync/task/atomic_waker.rs (renamed from tokio-sync/src/task/atomic_waker.rs)4
-rw-r--r--tokio/src/sync/task/mod.rs (renamed from tokio-sync/src/task/mod.rs)1
-rw-r--r--tokio/src/sync/tests/loom_atomic_waker.rs (renamed from tokio-sync/tests/fuzz_atomic_waker.rs)10
-rw-r--r--tokio/src/sync/tests/loom_list.rs (renamed from tokio-sync/tests/fuzz_list.rs)23
-rw-r--r--tokio/src/sync/tests/loom_mpsc.rs (renamed from tokio-sync/tests/fuzz_mpsc.rs)19
-rw-r--r--tokio/src/sync/tests/loom_oneshot.rs (renamed from tokio-sync/tests/fuzz_oneshot.rs)10
-rw-r--r--tokio/src/sync/tests/loom_semaphore.rs (renamed from tokio-sync/tests/fuzz_semaphore.rs)11
-rw-r--r--tokio/src/sync/tests/mod.rs7
-rw-r--r--tokio/src/sync/watch.rs (renamed from tokio-sync/src/watch.rs)7
-rw-r--r--tokio/src/timer/timer/entry.rs3
-rw-r--r--tokio/tests/support/mock_pool.rs2
-rw-r--r--tokio/tests/sync_atomic_waker.rs (renamed from tokio-sync/tests/atomic_waker.rs)5
-rw-r--r--tokio/tests/sync_barrier.rs (renamed from tokio-sync/tests/barrier.rs)3
-rw-r--r--tokio/tests/sync_errors.rs (renamed from tokio-sync/tests/errors.rs)6
-rw-r--r--tokio/tests/sync_mpsc.rs (renamed from tokio-sync/tests/mpsc.rs)4
-rw-r--r--tokio/tests/sync_mutex.rs (renamed from tokio-sync/tests/mutex.rs)5
-rw-r--r--tokio/tests/sync_oneshot.rs (renamed from tokio-sync/tests/oneshot.rs)2
-rw-r--r--tokio/tests/sync_semaphore.rs (renamed from tokio-sync/tests/semaphore.rs)2
-rw-r--r--tokio/tests/sync_watch.rs (renamed from tokio-sync/tests/watch.rs)25
-rw-r--r--tokio/tests/thread_pool.rs2
-rw-r--r--tokio/tests/timer_timeout.rs7
58 files changed, 572 insertions, 1193 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 4a9e88a0..4a2a71ba 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,7 +3,6 @@
members = [
"tokio",
"tokio-macros",
- "tokio-sync",
"tokio-test",
"tokio-tls",
"tokio-util",
diff --git a/azure-pipelines.yml b/azure-pipelines.yml
index 4fb21761..ae2bd1bb 100644
--- a/azure-pipelines.yml
+++ b/azure-pipelines.yml
@@ -47,8 +47,6 @@ jobs:
displayName: Test sub crates -
rust: beta
crates:
- tokio-sync:
- - async-traits
tokio-macros: []
tokio-test: []
tokio-util: []
diff --git a/ci/patch.toml b/ci/patch.toml
index 333c4dff..22311cf9 100644
--- a/ci/patch.toml
+++ b/ci/patch.toml
@@ -3,6 +3,6 @@
[patch.crates-io]
tokio = { path = "tokio" }
tokio-macros = { path = "tokio-macros" }
-tokio-sync = { path = "tokio-sync" }
+tokio-test = { path = "tokio-test" }
tokio-tls = { path = "tokio-tls" }
tokio-util = { path = "tokio-util" }
diff --git a/tokio-sync/CHANGELOG.md b/tokio-sync/CHANGELOG.md
deleted file mode 100644
index 44033503..00000000
--- a/tokio-sync/CHANGELOG.md
+++ /dev/null
@@ -1,76 +0,0 @@
-# 0.2.0-alpha.6 (September 30, 2019)
-
-- Move to `futures-*-preview 0.3.0-alpha.19`
-- Move to `pin-project 0.4`
-
-# 0.2.0-alpha.5 (September 19, 2019)
-
-### Changed
-- rename `Lock` -> `Mutex` and make it more like `std::sync::Mutex` (#1573).
-
-### Added
-- `Barrier`, an async version of `std::sync::Barrier` (#1571).
-
-# 0.2.0-alpha.4 (August 29, 2019)
-
-- Track tokio release.
-
-# 0.2.0-alpha.3 (August 23, 2019)
-
-- Track `tokio` version number
-
-# 0.2.0-alpha.2 (August 17, 2019)
-
-### Changed
-- Update `futures` dependency to 0.3.0-alpha.18.
-
-# 0.2.0-alpha.1 (August 8, 2019)
-
-### Changed
-- Switch to `async`, `await`, and `std::future`.
-
-# 0.1.6 (June 4, 2019)
-
-### Added
-- Add Sync impl for Lock (#1117).
-
-# 0.1.5 (April 22, 2019)
-
-### Added
-- Add asynchronous mutual exclusion primitive (#964).
-
-# 0.1.4 (March 13, 2019)
-
-### Fixed
-- Fix memory leak on channel drop (#917).
-
-### Added
-- `std::error::Error` implementation for `oneshot`, `watch` error types (#967).
-
-# 0.1.3 (March 1, 2019)
-
-### Added
-- `Watch`, a single value broadcast channel (#922).
-- `std::error::Error` implementation for more `mpsc` types (#937).
-
-# 0.1.2 (February 20, 2019)
-
-### Fixes
-- `mpsc` and `Semaphore` when releasing permits (#904).
-- `oneshot` task handle leak (#911).
-
-### Changes
-- Performance improvements in `AtomicTask` (#892).
-- Improved assert message when creating a channel with bound of zero (#906).
-
-### Adds
-- `AtomicTask::take_task` (#895).
-
-# 0.1.1 (February 1, 2019)
-
-### Fixes
-- Panic when creating a channel with bound 0 (#879).
-
-# 0.1.0 (January 24, 2019)
-
-- Initial Release
diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml
deleted file mode 100644
index bb2fd607..00000000
--- a/tokio-sync/Cargo.toml
+++ /dev/null
@@ -1,39 +0,0 @@
-[package]
-name = "tokio-sync"
-# When releasing to crates.io:
-# - Remove path dependencies
-# - Update html_root_url.
-# - Update doc url
-# - Cargo.toml
-# - Update CHANGELOG.md.
-# - Create "v0.2.x" git tag.
-version = "0.2.0-alpha.6"
-edition = "2018"
-authors = ["Tokio Contributors <team@tokio.rs>"]
-license = "MIT"
-repository = "https://github.com/tokio-rs/tokio"
-homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-sync/0.2.0-alpha.6/tokio_sync"
-description = """
-Synchronization utilities.
-"""
-categories = ["asynchronous"]
-
-[features]
-async-traits = ["futures-sink-preview"]
-
-[dependencies]
-fnv = "1.0.6"
-futures-core-preview = { version = "=0.3.0-alpha.19" }
-futures-sink-preview = { version = "=0.3.0-alpha.19", optional = true }
-futures-util-preview = { version = "=0.3.0-alpha.19" }
-
-[dev-dependencies]
-tokio = { version = "0.2.0-alpha.6", path = "../tokio" }
-tokio-test = { version = "0.2.0-alpha.6", path = "../tokio-test" }
-
-env_logger = { version = "0.6", default-features = false }
-loom = { version = "0.2.1", features = ["futures"] }
-
-[package.metadata.docs.rs]
-all-features = true
diff --git a/tokio-sync/LICENSE b/tokio-sync/LICENSE
deleted file mode 100644
index cdb28b4b..00000000
--- a/tokio-sync/LICENSE
+++ /dev/null
@@ -1,25 +0,0 @@
-Copyright (c) 2019 Tokio Contributors
-
-Permission is hereby granted, free of charge, to any
-person obtaining a copy of this software and associated
-documentation files (the "Software"), to deal in the
-Software without restriction, including without
-limitation the rights to use, copy, modify, merge,
-publish, distribute, sublicense, and/or sell copies of
-the Software, and to permit persons to whom the Software
-is furnished to do so, subject to the following
-conditions:
-
-The above copyright notice and this permission notice
-shall be included in all copies or substantial portions
-of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
-ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
-TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
-PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
-SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
-CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
-OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
-IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
-DEALINGS IN THE SOFTWARE.
diff --git a/tokio-sync/README.md b/tokio-sync/README.md
deleted file mode 100644
index 6e209f69..00000000
--- a/tokio-sync/README.md
+++ /dev/null
@@ -1,13 +0,0 @@
-# tokio-sync
-
-Synchronization utilities
-
-## License
-
-This project is licensed under the [MIT license](LICENSE).
-
-### Contribution
-
-Unless you explicitly state otherwise, any contribution intentionally submitted
-for inclusion in Tokio by you, shall be licensed as MIT, without any additional
-terms or conditions.
diff --git a/tokio-sync/benches/mpsc.rs b/tokio-sync/benches/mpsc.rs
deleted file mode 100644
index 95d8e5c7..00000000
--- a/tokio-sync/benches/mpsc.rs
+++ /dev/null
@@ -1,536 +0,0 @@
-#![cfg(feature = "broken")]
-#![feature(test)]
-#![warn(rust_2018_idioms)]
-
-extern crate test;
-
-type Medium = [usize; 64];
-type Large = [Medium; 64];
-
-mod tokio {
- use futures::{future, Async, Future, Sink, Stream};
- use std::thread;
- use test::{self, Bencher};
- use tokio_sync::mpsc::*;
-
- #[bench]
- fn bounded_new_medium(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&channel::<super::Medium>(1_000));
- })
- }
-
- #[bench]
- fn unbounded_new_medium(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&unbounded_channel::<super::Medium>());
- })
- }
- #[bench]
- fn bounded_new_large(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&channel::<super::Large>(1_000));
- })
- }
-
- #[bench]
- fn unbounded_new_large(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&unbounded_channel::<super::Large>());
- })
- }
-
- #[bench]
- fn send_one_message(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel(1_000);
-
- // Send
- tx.try_send(1).unwrap();
-
- // Receive
- assert_eq!(Async::Ready(Some(1)), rx.poll().unwrap());
- })
- }
-
- #[bench]
- fn send_one_message_large(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel::<super::Large>(1_000);
-
- // Send
- let _ = tx.try_send([[0; 64]; 64]);
-
- // Receive
- let _ = test::black_box(&rx.poll());
- })
- }
-
- #[bench]
- fn bounded_rx_not_ready(b: &mut Bencher) {
- let (_tx, mut rx) = channel::<i32>(1_000);
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn bounded_tx_poll_ready(b: &mut Bencher) {
- let (mut tx, _rx) = channel::<i32>(1);
- b.iter(|| {
- future::lazy(|| {
- assert!(tx.poll_ready().unwrap().is_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn bounded_tx_poll_not_ready(b: &mut Bencher) {
- let (mut tx, _rx) = channel::<i32>(1);
- tx.try_send(1).unwrap();
- b.iter(|| {
- future::lazy(|| {
- assert!(tx.poll_ready().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn unbounded_rx_not_ready(b: &mut Bencher) {
- let (_tx, mut rx) = unbounded_channel::<i32>();
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn unbounded_rx_not_ready_x5(b: &mut Bencher) {
- let (_tx, mut rx) = unbounded_channel::<i32>();
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn bounded_uncontended_1(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel(1_000);
-
- for i in 0..1000 {
- tx.try_send(i).unwrap();
- // No need to create a task, because poll is not going to park.
- assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap());
- }
- })
- }
-
- #[bench]
- fn bounded_uncontended_1_large(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel::<super::Large>(1_000);
-
- for i in 0..1000 {
- let _ = tx.try_send([[i; 64]; 64]);
- // No need to create a task, because poll is not going to park.
- let _ = test::black_box(&rx.poll());
- }
- })
- }
-
- #[bench]
- fn bounded_uncontended_2(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel(1000);
-
- for i in 0..1000 {
- tx.try_send(i).unwrap();
- }
-
- for i in 0..1000 {
- // No need to create a task, because poll is not going to park.
- assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap());
- }
- })
- }
-
- #[bench]
- fn contended_unbounded_tx(b: &mut Bencher) {
- let mut threads = vec![];
- let mut txs = vec![];
-
- for _ in 0..4 {
- let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
- txs.push(tx);
-
- threads.push(thread::spawn(move || {
- for mut tx in rx.iter() {
- for i in 0..1_000 {
- tx.try_send(i).unwrap();
- }
- }
- }));
- }
-
- b.iter(|| {
- // TODO make unbounded
- let (tx, rx) = channel::<i32>(1_000_000);
-
- for th in &txs {
- th.send(tx.clone()).unwrap();
- }
-
- drop(tx);
-
- let rx = rx.wait().take(4 * 1_000);
-
- for v in rx {
- let _ = test::black_box(v);
- }
- });
-
- drop(txs);
-
- for th in threads {
- th.join().unwrap();
- }
- }
-
- #[bench]
- fn contended_bounded_tx(b: &mut Bencher) {
- const THREADS: usize = 4;
- const ITERS: usize = 100;
-
- let mut threads = vec![];
- let mut txs = vec![];
-
- for _ in 0..THREADS {
- let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
- txs.push(tx);
-
- threads.push(thread::spawn(move || {
- for tx in rx.iter() {
- let mut tx = tx.wait();
- for i in 0..ITERS {
- tx.send(i as i32).unwrap();
- }
- }
- }));
- }
-
- b.iter(|| {
- let (tx, rx) = channel::<i32>(1);
-
- for th in &txs {
- th.send(tx.clone()).unwrap();
- }
-
- drop(tx);
-
- let rx = rx.wait().take(THREADS * ITERS);
-
- for v in rx {
- let _ = test::black_box(v);
- }
- });
-
- drop(txs);
-
- for th in threads {
- th.join().unwrap();
- }
- }
-}
-
-mod legacy {
- use futures::sync::mpsc::*;
- use futures::{future, Async, Future, Sink, Stream};
- use std::thread;
- use test::{self, Bencher};
-
- #[bench]
- fn bounded_new_medium(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&channel::<super::Medium>(1_000));
- })
- }
-
- #[bench]
- fn unbounded_new_medium(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&unbounded::<super::Medium>());
- })
- }
-
- #[bench]
- fn bounded_new_large(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&channel::<super::Large>(1_000));
- })
- }
-
- #[bench]
- fn unbounded_new_large(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&unbounded::<super::Large>());
- })
- }
-
- #[bench]
- fn send_one_message(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel(1_000);
-
- // Send
- tx.try_send(1).unwrap();
-
- // Receive
- assert_eq!(Ok(Async::Ready(Some(1))), rx.poll());
- })
- }
-
- #[bench]
- fn send_one_message_large(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel::<super::Large>(1_000);
-
- // Send
- let _ = tx.try_send([[0; 64]; 64]);
-
- // Receive
- let _ = test::black_box(&rx.poll());
- })
- }
-
- #[bench]
- fn bounded_rx_not_ready(b: &mut Bencher) {
- let (_tx, mut rx) = channel::<i32>(1_000);
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn bounded_tx_poll_ready(b: &mut Bencher) {
- let (mut tx, _rx) = channel::<i32>(0);
- b.iter(|| {
- future::lazy(|| {
- assert!(tx.poll_ready().unwrap().is_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn bounded_tx_poll_not_ready(b: &mut Bencher) {
- let (mut tx, _rx) = channel::<i32>(0);
- tx.try_send(1).unwrap();
- b.iter(|| {
- future::lazy(|| {
- assert!(tx.poll_ready().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn unbounded_rx_not_ready(b: &mut Bencher) {
- let (_tx, mut rx) = unbounded::<i32>();
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn unbounded_rx_not_ready_x5(b: &mut Bencher) {
- let (_tx, mut rx) = unbounded::<i32>();
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn unbounded_uncontended_1(b: &mut Bencher) {
- b.iter(|| {
- let (tx, mut rx) = unbounded();
-
- for i in 0..1000 {
- UnboundedSender::unbounded_send(&tx, i).expect("send");
- // No need to create a task, because poll is not going to park.
- assert_eq!(Ok(Async::Ready(Some(i))), rx.poll());
- }
- })
- }
-
- #[bench]
- fn unbounded_uncontended_1_large(b: &mut Bencher) {
- b.iter(|| {
- let (tx, mut rx) = unbounded::<super::Large>();
-
- for i in 0..1000 {
- let _ = UnboundedSender::unbounded_send(&tx, [[i; 64]; 64]);
- // No need to create a task, because poll is not going to park.
- let _ = test::black_box(&rx.poll());
- }
- })
- }
-
- #[bench]
- fn unbounded_uncontended_2(b: &mut Bencher) {
- b.iter(|| {
- let (tx, mut rx) = unbounded();
-
- for i in 0..1000 {
- UnboundedSender::unbounded_send(&tx, i).expect("send");
- }
-
- for i in 0..1000 {
- // No need to create a task, because poll is not going to park.
- assert_eq!(Ok(Async::Ready(Some(i))), rx.poll());
- }
- })
- }
-
- #[bench]
- fn multi_thread_unbounded_tx(b: &mut Bencher) {
- let mut threads = vec![];
- let mut txs = vec![];
-
- for _ in 0..4 {
- let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
- txs.push(tx);
-
- threads.push(thread::spawn(move || {
- for mut tx in rx.iter() {
- for i in 0..1_000 {
- tx.try_send(i).unwrap();
- }
- }
- }));
- }
-
- b.iter(|| {
- let (tx, rx) = channel::<i32>(1_000_000);
-
- for th in &txs {
- th.send(tx.clone()).unwrap();
- }
-
- drop(tx);
-
- let rx = rx.wait().take(4 * 1_000);
-
- for v in rx {
- let _ = test::black_box(v);
- }
- });
-
- drop(txs);
-
- for th in threads {
- th.join().unwrap();
- }
- }
-
- #[bench]
- fn contended_bounded_tx(b: &mut Bencher) {
- const THREADS: usize = 4;
- const ITERS: usize = 100;
-
- let mut threads = vec![];
- let mut txs = vec![];
-
- for _ in 0..THREADS {
- let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
- txs.push(tx);
-
- threads.push(thread::spawn(move || {
- for tx in rx.iter() {
- let mut tx = tx.wait();
- for i