diff options
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | azure-pipelines.yml | 2 | ||||
-rw-r--r-- | ci/patch.toml | 2 | ||||
-rw-r--r-- | tokio-sync/CHANGELOG.md | 76 | ||||
-rw-r--r-- | tokio-sync/Cargo.toml | 39 | ||||
-rw-r--r-- | tokio-sync/LICENSE | 25 | ||||
-rw-r--r-- | tokio-sync/README.md | 13 | ||||
-rw-r--r-- | tokio-sync/benches/mpsc.rs | 536 | ||||
-rw-r--r-- | tokio-sync/benches/oneshot.rs | 239 | ||||
-rw-r--r-- | tokio-sync/src/lib.rs | 43 | ||||
-rw-r--r-- | tokio-sync/src/loom.rs | 38 | ||||
-rw-r--r-- | tokio-test/Cargo.toml | 1 | ||||
-rw-r--r-- | tokio/Cargo.toml | 9 | ||||
-rw-r--r-- | tokio/benches/mpsc.rs | 270 | ||||
-rw-r--r-- | tokio/benches/oneshot.rs | 120 | ||||
-rw-r--r-- | tokio/benches/thread_pool.rs | 2 | ||||
-rw-r--r-- | tokio/src/executor/blocking/mod.rs | 2 | ||||
-rw-r--r-- | tokio/src/executor/thread_pool/shutdown.rs | 3 | ||||
-rw-r--r-- | tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs | 3 | ||||
-rw-r--r-- | tokio/src/net/unix/incoming.rs | 2 | ||||
-rw-r--r-- | tokio/src/net/unix/listener.rs | 1 | ||||
-rw-r--r-- | tokio/src/net/unix/mod.rs | 1 | ||||
-rw-r--r-- | tokio/src/signal/registry.rs | 2 | ||||
-rw-r--r-- | tokio/src/signal/unix.rs | 3 | ||||
-rw-r--r-- | tokio/src/signal/windows.rs | 5 | ||||
-rw-r--r-- | tokio/src/sync/barrier.rs (renamed from tokio-sync/src/barrier.rs) | 7 | ||||
-rw-r--r-- | tokio/src/sync/loom.rs | 48 | ||||
-rw-r--r-- | tokio/src/sync/mod.rs (renamed from tokio/src/sync.rs) | 46 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/block.rs (renamed from tokio-sync/src/mpsc/block.rs) | 5 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/bounded.rs (renamed from tokio-sync/src/mpsc/bounded.rs) | 13 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/chan.rs (renamed from tokio-sync/src/mpsc/chan.rs) | 11 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/list.rs (renamed from tokio-sync/src/mpsc/list.rs) | 5 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/mod.rs (renamed from tokio-sync/src/mpsc/mod.rs) | 18 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/unbounded.rs (renamed from tokio-sync/src/mpsc/unbounded.rs) | 7 | ||||
-rw-r--r-- | tokio/src/sync/mutex.rs (renamed from tokio-sync/src/mutex.rs) | 3 | ||||
-rw-r--r-- | tokio/src/sync/oneshot.rs (renamed from tokio-sync/src/oneshot.rs) | 2 | ||||
-rw-r--r-- | tokio/src/sync/semaphore.rs (renamed from tokio-sync/src/semaphore.rs) | 4 | ||||
-rw-r--r-- | tokio/src/sync/task/atomic_waker.rs (renamed from tokio-sync/src/task/atomic_waker.rs) | 4 | ||||
-rw-r--r-- | tokio/src/sync/task/mod.rs (renamed from tokio-sync/src/task/mod.rs) | 1 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_atomic_waker.rs (renamed from tokio-sync/tests/fuzz_atomic_waker.rs) | 10 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_list.rs (renamed from tokio-sync/tests/fuzz_list.rs) | 23 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_mpsc.rs (renamed from tokio-sync/tests/fuzz_mpsc.rs) | 19 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_oneshot.rs (renamed from tokio-sync/tests/fuzz_oneshot.rs) | 10 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_semaphore.rs (renamed from tokio-sync/tests/fuzz_semaphore.rs) | 11 | ||||
-rw-r--r-- | tokio/src/sync/tests/mod.rs | 7 | ||||
-rw-r--r-- | tokio/src/sync/watch.rs (renamed from tokio-sync/src/watch.rs) | 7 | ||||
-rw-r--r-- | tokio/src/timer/timer/entry.rs | 3 | ||||
-rw-r--r-- | tokio/tests/support/mock_pool.rs | 2 | ||||
-rw-r--r-- | tokio/tests/sync_atomic_waker.rs (renamed from tokio-sync/tests/atomic_waker.rs) | 5 | ||||
-rw-r--r-- | tokio/tests/sync_barrier.rs (renamed from tokio-sync/tests/barrier.rs) | 3 | ||||
-rw-r--r-- | tokio/tests/sync_errors.rs (renamed from tokio-sync/tests/errors.rs) | 6 | ||||
-rw-r--r-- | tokio/tests/sync_mpsc.rs (renamed from tokio-sync/tests/mpsc.rs) | 4 | ||||
-rw-r--r-- | tokio/tests/sync_mutex.rs (renamed from tokio-sync/tests/mutex.rs) | 5 | ||||
-rw-r--r-- | tokio/tests/sync_oneshot.rs (renamed from tokio-sync/tests/oneshot.rs) | 2 | ||||
-rw-r--r-- | tokio/tests/sync_semaphore.rs (renamed from tokio-sync/tests/semaphore.rs) | 2 | ||||
-rw-r--r-- | tokio/tests/sync_watch.rs (renamed from tokio-sync/tests/watch.rs) | 25 | ||||
-rw-r--r-- | tokio/tests/thread_pool.rs | 2 | ||||
-rw-r--r-- | tokio/tests/timer_timeout.rs | 7 |
58 files changed, 572 insertions, 1193 deletions
@@ -3,7 +3,6 @@ members = [ "tokio", "tokio-macros", - "tokio-sync", "tokio-test", "tokio-tls", "tokio-util", diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 4fb21761..ae2bd1bb 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -47,8 +47,6 @@ jobs: displayName: Test sub crates - rust: beta crates: - tokio-sync: - - async-traits tokio-macros: [] tokio-test: [] tokio-util: [] diff --git a/ci/patch.toml b/ci/patch.toml index 333c4dff..22311cf9 100644 --- a/ci/patch.toml +++ b/ci/patch.toml @@ -3,6 +3,6 @@ [patch.crates-io] tokio = { path = "tokio" } tokio-macros = { path = "tokio-macros" } -tokio-sync = { path = "tokio-sync" } +tokio-test = { path = "tokio-test" } tokio-tls = { path = "tokio-tls" } tokio-util = { path = "tokio-util" } diff --git a/tokio-sync/CHANGELOG.md b/tokio-sync/CHANGELOG.md deleted file mode 100644 index 44033503..00000000 --- a/tokio-sync/CHANGELOG.md +++ /dev/null @@ -1,76 +0,0 @@ -# 0.2.0-alpha.6 (September 30, 2019) - -- Move to `futures-*-preview 0.3.0-alpha.19` -- Move to `pin-project 0.4` - -# 0.2.0-alpha.5 (September 19, 2019) - -### Changed -- rename `Lock` -> `Mutex` and make it more like `std::sync::Mutex` (#1573). - -### Added -- `Barrier`, an async version of `std::sync::Barrier` (#1571). - -# 0.2.0-alpha.4 (August 29, 2019) - -- Track tokio release. - -# 0.2.0-alpha.3 (August 23, 2019) - -- Track `tokio` version number - -# 0.2.0-alpha.2 (August 17, 2019) - -### Changed -- Update `futures` dependency to 0.3.0-alpha.18. - -# 0.2.0-alpha.1 (August 8, 2019) - -### Changed -- Switch to `async`, `await`, and `std::future`. - -# 0.1.6 (June 4, 2019) - -### Added -- Add Sync impl for Lock (#1117). - -# 0.1.5 (April 22, 2019) - -### Added -- Add asynchronous mutual exclusion primitive (#964). - -# 0.1.4 (March 13, 2019) - -### Fixed -- Fix memory leak on channel drop (#917). - -### Added -- `std::error::Error` implementation for `oneshot`, `watch` error types (#967). - -# 0.1.3 (March 1, 2019) - -### Added -- `Watch`, a single value broadcast channel (#922). -- `std::error::Error` implementation for more `mpsc` types (#937). - -# 0.1.2 (February 20, 2019) - -### Fixes -- `mpsc` and `Semaphore` when releasing permits (#904). -- `oneshot` task handle leak (#911). - -### Changes -- Performance improvements in `AtomicTask` (#892). -- Improved assert message when creating a channel with bound of zero (#906). - -### Adds -- `AtomicTask::take_task` (#895). - -# 0.1.1 (February 1, 2019) - -### Fixes -- Panic when creating a channel with bound 0 (#879). - -# 0.1.0 (January 24, 2019) - -- Initial Release diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml deleted file mode 100644 index bb2fd607..00000000 --- a/tokio-sync/Cargo.toml +++ /dev/null @@ -1,39 +0,0 @@ -[package] -name = "tokio-sync" -# When releasing to crates.io: -# - Remove path dependencies -# - Update html_root_url. -# - Update doc url -# - Cargo.toml -# - Update CHANGELOG.md. -# - Create "v0.2.x" git tag. -version = "0.2.0-alpha.6" -edition = "2018" -authors = ["Tokio Contributors <team@tokio.rs>"] -license = "MIT" -repository = "https://github.com/tokio-rs/tokio" -homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-sync/0.2.0-alpha.6/tokio_sync" -description = """ -Synchronization utilities. -""" -categories = ["asynchronous"] - -[features] -async-traits = ["futures-sink-preview"] - -[dependencies] -fnv = "1.0.6" -futures-core-preview = { version = "=0.3.0-alpha.19" } -futures-sink-preview = { version = "=0.3.0-alpha.19", optional = true } -futures-util-preview = { version = "=0.3.0-alpha.19" } - -[dev-dependencies] -tokio = { version = "0.2.0-alpha.6", path = "../tokio" } -tokio-test = { version = "0.2.0-alpha.6", path = "../tokio-test" } - -env_logger = { version = "0.6", default-features = false } -loom = { version = "0.2.1", features = ["futures"] } - -[package.metadata.docs.rs] -all-features = true diff --git a/tokio-sync/LICENSE b/tokio-sync/LICENSE deleted file mode 100644 index cdb28b4b..00000000 --- a/tokio-sync/LICENSE +++ /dev/null @@ -1,25 +0,0 @@ -Copyright (c) 2019 Tokio Contributors - -Permission is hereby granted, free of charge, to any -person obtaining a copy of this software and associated -documentation files (the "Software"), to deal in the -Software without restriction, including without -limitation the rights to use, copy, modify, merge, -publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software -is furnished to do so, subject to the following -conditions: - -The above copyright notice and this permission notice -shall be included in all copies or substantial portions -of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. diff --git a/tokio-sync/README.md b/tokio-sync/README.md deleted file mode 100644 index 6e209f69..00000000 --- a/tokio-sync/README.md +++ /dev/null @@ -1,13 +0,0 @@ -# tokio-sync - -Synchronization utilities - -## License - -This project is licensed under the [MIT license](LICENSE). - -### Contribution - -Unless you explicitly state otherwise, any contribution intentionally submitted -for inclusion in Tokio by you, shall be licensed as MIT, without any additional -terms or conditions. diff --git a/tokio-sync/benches/mpsc.rs b/tokio-sync/benches/mpsc.rs deleted file mode 100644 index 95d8e5c7..00000000 --- a/tokio-sync/benches/mpsc.rs +++ /dev/null @@ -1,536 +0,0 @@ -#![cfg(feature = "broken")] -#![feature(test)] -#![warn(rust_2018_idioms)] - -extern crate test; - -type Medium = [usize; 64]; -type Large = [Medium; 64]; - -mod tokio { - use futures::{future, Async, Future, Sink, Stream}; - use std::thread; - use test::{self, Bencher}; - use tokio_sync::mpsc::*; - - #[bench] - fn bounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::<super::Medium>(1_000)); - }) - } - - #[bench] - fn unbounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded_channel::<super::Medium>()); - }) - } - #[bench] - fn bounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::<super::Large>(1_000)); - }) - } - - #[bench] - fn unbounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded_channel::<super::Large>()); - }) - } - - #[bench] - fn send_one_message(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1_000); - - // Send - tx.try_send(1).unwrap(); - - // Receive - assert_eq!(Async::Ready(Some(1)), rx.poll().unwrap()); - }) - } - - #[bench] - fn send_one_message_large(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel::<super::Large>(1_000); - - // Send - let _ = tx.try_send([[0; 64]; 64]); - - // Receive - let _ = test::black_box(&rx.poll()); - }) - } - - #[bench] - fn bounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = channel::<i32>(1_000); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::<i32>(1); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_not_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::<i32>(1); - tx.try_send(1).unwrap(); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = unbounded_channel::<i32>(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready_x5(b: &mut Bencher) { - let (_tx, mut rx) = unbounded_channel::<i32>(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_uncontended_1(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1_000); - - for i in 0..1000 { - tx.try_send(i).unwrap(); - // No need to create a task, because poll is not going to park. - assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap()); - } - }) - } - - #[bench] - fn bounded_uncontended_1_large(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel::<super::Large>(1_000); - - for i in 0..1000 { - let _ = tx.try_send([[i; 64]; 64]); - // No need to create a task, because poll is not going to park. - let _ = test::black_box(&rx.poll()); - } - }) - } - - #[bench] - fn bounded_uncontended_2(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1000); - - for i in 0..1000 { - tx.try_send(i).unwrap(); - } - - for i in 0..1000 { - // No need to create a task, because poll is not going to park. - assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap()); - } - }) - } - - #[bench] - fn contended_unbounded_tx(b: &mut Bencher) { - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..4 { - let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for mut tx in rx.iter() { - for i in 0..1_000 { - tx.try_send(i).unwrap(); - } - } - })); - } - - b.iter(|| { - // TODO make unbounded - let (tx, rx) = channel::<i32>(1_000_000); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(4 * 1_000); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } - } - - #[bench] - fn contended_bounded_tx(b: &mut Bencher) { - const THREADS: usize = 4; - const ITERS: usize = 100; - - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..THREADS { - let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for tx in rx.iter() { - let mut tx = tx.wait(); - for i in 0..ITERS { - tx.send(i as i32).unwrap(); - } - } - })); - } - - b.iter(|| { - let (tx, rx) = channel::<i32>(1); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(THREADS * ITERS); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } - } -} - -mod legacy { - use futures::sync::mpsc::*; - use futures::{future, Async, Future, Sink, Stream}; - use std::thread; - use test::{self, Bencher}; - - #[bench] - fn bounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::<super::Medium>(1_000)); - }) - } - - #[bench] - fn unbounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded::<super::Medium>()); - }) - } - - #[bench] - fn bounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::<super::Large>(1_000)); - }) - } - - #[bench] - fn unbounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded::<super::Large>()); - }) - } - - #[bench] - fn send_one_message(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1_000); - - // Send - tx.try_send(1).unwrap(); - - // Receive - assert_eq!(Ok(Async::Ready(Some(1))), rx.poll()); - }) - } - - #[bench] - fn send_one_message_large(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel::<super::Large>(1_000); - - // Send - let _ = tx.try_send([[0; 64]; 64]); - - // Receive - let _ = test::black_box(&rx.poll()); - }) - } - - #[bench] - fn bounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = channel::<i32>(1_000); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::<i32>(0); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_not_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::<i32>(0); - tx.try_send(1).unwrap(); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = unbounded::<i32>(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready_x5(b: &mut Bencher) { - let (_tx, mut rx) = unbounded::<i32>(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_uncontended_1(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = unbounded(); - - for i in 0..1000 { - UnboundedSender::unbounded_send(&tx, i).expect("send"); - // No need to create a task, because poll is not going to park. - assert_eq!(Ok(Async::Ready(Some(i))), rx.poll()); - } - }) - } - - #[bench] - fn unbounded_uncontended_1_large(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = unbounded::<super::Large>(); - - for i in 0..1000 { - let _ = UnboundedSender::unbounded_send(&tx, [[i; 64]; 64]); - // No need to create a task, because poll is not going to park. - let _ = test::black_box(&rx.poll()); - } - }) - } - - #[bench] - fn unbounded_uncontended_2(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = unbounded(); - - for i in 0..1000 { - UnboundedSender::unbounded_send(&tx, i).expect("send"); - } - - for i in 0..1000 { - // No need to create a task, because poll is not going to park. - assert_eq!(Ok(Async::Ready(Some(i))), rx.poll()); - } - }) - } - - #[bench] - fn multi_thread_unbounded_tx(b: &mut Bencher) { - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..4 { - let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for mut tx in rx.iter() { - for i in 0..1_000 { - tx.try_send(i).unwrap(); - } - } - })); - } - - b.iter(|| { - let (tx, rx) = channel::<i32>(1_000_000); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(4 * 1_000); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } - } - - #[bench] - fn contended_bounded_tx(b: &mut Bencher) { - const THREADS: usize = 4; - const ITERS: usize = 100; - - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..THREADS { - let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for tx in rx.iter() { - let mut tx = tx.wait(); - for i |