diff options
Diffstat (limited to 'tokio-sync/benches')
-rw-r--r-- | tokio-sync/benches/mpsc.rs | 536 | ||||
-rw-r--r-- | tokio-sync/benches/oneshot.rs | 239 |
2 files changed, 0 insertions, 775 deletions
diff --git a/tokio-sync/benches/mpsc.rs b/tokio-sync/benches/mpsc.rs deleted file mode 100644 index 95d8e5c7..00000000 --- a/tokio-sync/benches/mpsc.rs +++ /dev/null @@ -1,536 +0,0 @@ -#![cfg(feature = "broken")] -#![feature(test)] -#![warn(rust_2018_idioms)] - -extern crate test; - -type Medium = [usize; 64]; -type Large = [Medium; 64]; - -mod tokio { - use futures::{future, Async, Future, Sink, Stream}; - use std::thread; - use test::{self, Bencher}; - use tokio_sync::mpsc::*; - - #[bench] - fn bounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::<super::Medium>(1_000)); - }) - } - - #[bench] - fn unbounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded_channel::<super::Medium>()); - }) - } - #[bench] - fn bounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::<super::Large>(1_000)); - }) - } - - #[bench] - fn unbounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded_channel::<super::Large>()); - }) - } - - #[bench] - fn send_one_message(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1_000); - - // Send - tx.try_send(1).unwrap(); - - // Receive - assert_eq!(Async::Ready(Some(1)), rx.poll().unwrap()); - }) - } - - #[bench] - fn send_one_message_large(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel::<super::Large>(1_000); - - // Send - let _ = tx.try_send([[0; 64]; 64]); - - // Receive - let _ = test::black_box(&rx.poll()); - }) - } - - #[bench] - fn bounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = channel::<i32>(1_000); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::<i32>(1); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_not_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::<i32>(1); - tx.try_send(1).unwrap(); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = unbounded_channel::<i32>(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready_x5(b: &mut Bencher) { - let (_tx, mut rx) = unbounded_channel::<i32>(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_uncontended_1(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1_000); - - for i in 0..1000 { - tx.try_send(i).unwrap(); - // No need to create a task, because poll is not going to park. - assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap()); - } - }) - } - - #[bench] - fn bounded_uncontended_1_large(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel::<super::Large>(1_000); - - for i in 0..1000 { - let _ = tx.try_send([[i; 64]; 64]); - // No need to create a task, because poll is not going to park. - let _ = test::black_box(&rx.poll()); - } - }) - } - - #[bench] - fn bounded_uncontended_2(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1000); - - for i in 0..1000 { - tx.try_send(i).unwrap(); - } - - for i in 0..1000 { - // No need to create a task, because poll is not going to park. - assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap()); - } - }) - } - - #[bench] - fn contended_unbounded_tx(b: &mut Bencher) { - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..4 { - let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for mut tx in rx.iter() { - for i in 0..1_000 { - tx.try_send(i).unwrap(); - } - } - })); - } - - b.iter(|| { - // TODO make unbounded - let (tx, rx) = channel::<i32>(1_000_000); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(4 * 1_000); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } - } - - #[bench] - fn contended_bounded_tx(b: &mut Bencher) { - const THREADS: usize = 4; - const ITERS: usize = 100; - - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..THREADS { - let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for tx in rx.iter() { - let mut tx = tx.wait(); - for i in 0..ITERS { - tx.send(i as i32).unwrap(); - } - } - })); - } - - b.iter(|| { - let (tx, rx) = channel::<i32>(1); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(THREADS * ITERS); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } - } -} - -mod legacy { - use futures::sync::mpsc::*; - use futures::{future, Async, Future, Sink, Stream}; - use std::thread; - use test::{self, Bencher}; - - #[bench] - fn bounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::<super::Medium>(1_000)); - }) - } - - #[bench] - fn unbounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded::<super::Medium>()); - }) - } - - #[bench] - fn bounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::<super::Large>(1_000)); - }) - } - - #[bench] - fn unbounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded::<super::Large>()); - }) - } - - #[bench] - fn send_one_message(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1_000); - - // Send - tx.try_send(1).unwrap(); - - // Receive - assert_eq!(Ok(Async::Ready(Some(1))), rx.poll()); - }) - } - - #[bench] - fn send_one_message_large(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel::<super::Large>(1_000); - - // Send - let _ = tx.try_send([[0; 64]; 64]); - - // Receive - let _ = test::black_box(&rx.poll()); - }) - } - - #[bench] - fn bounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = channel::<i32>(1_000); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::<i32>(0); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_not_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::<i32>(0); - tx.try_send(1).unwrap(); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = unbounded::<i32>(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready_x5(b: &mut Bencher) { - let (_tx, mut rx) = unbounded::<i32>(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_uncontended_1(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = unbounded(); - - for i in 0..1000 { - UnboundedSender::unbounded_send(&tx, i).expect("send"); - // No need to create a task, because poll is not going to park. - assert_eq!(Ok(Async::Ready(Some(i))), rx.poll()); - } - }) - } - - #[bench] - fn unbounded_uncontended_1_large(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = unbounded::<super::Large>(); - - for i in 0..1000 { - let _ = UnboundedSender::unbounded_send(&tx, [[i; 64]; 64]); - // No need to create a task, because poll is not going to park. - let _ = test::black_box(&rx.poll()); - } - }) - } - - #[bench] - fn unbounded_uncontended_2(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = unbounded(); - - for i in 0..1000 { - UnboundedSender::unbounded_send(&tx, i).expect("send"); - } - - for i in 0..1000 { - // No need to create a task, because poll is not going to park. - assert_eq!(Ok(Async::Ready(Some(i))), rx.poll()); - } - }) - } - - #[bench] - fn multi_thread_unbounded_tx(b: &mut Bencher) { - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..4 { - let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for mut tx in rx.iter() { - for i in 0..1_000 { - tx.try_send(i).unwrap(); - } - } - })); - } - - b.iter(|| { - let (tx, rx) = channel::<i32>(1_000_000); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(4 * 1_000); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } - } - - #[bench] - fn contended_bounded_tx(b: &mut Bencher) { - const THREADS: usize = 4; - const ITERS: usize = 100; - - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..THREADS { - let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for tx in rx.iter() { - let mut tx = tx.wait(); - for i in 0..ITERS { - tx.send(i as i32).unwrap(); - } - } - })); - } - - b.iter(|| { - let (tx, rx) = channel::<i32>(1); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(THREADS * ITERS); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } - } -} diff --git a/tokio-sync/benches/oneshot.rs b/tokio-sync/benches/oneshot.rs deleted file mode 100644 index b2f37805..00000000 --- a/tokio-sync/benches/oneshot.rs +++ /dev/null @@ -1,239 +0,0 @@ -#![cfg(feature = "broken")] -#![feature(test)] -#![warn(rust_2018_idioms)] - -extern crate test; - -mod tokio { - use futures::{future, Async, Future}; - use test::Bencher; - use tokio_sync::oneshot; - - #[bench] - fn new(b: &mut Bencher) { - b.iter(|| { - let _ = ::test::black_box(&oneshot::channel::<i32>()); - }) - } - - #[bench] - fn same_thread_send_recv(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = oneshot::channel(); - - let _ = tx.send(1); - - assert_eq!(Async::Ready(1), rx.poll().unwrap()); - }); - } - - #[bench] - fn same_thread_recv_multi_send_recv(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = oneshot::channel(); - - future::lazy(|| { - let _ = rx.poll(); - let _ = rx.poll(); - let _ = rx.poll(); - let _ = rx.poll(); - - let _ = tx.send(1); - assert_eq!(Async::Ready(1), rx.poll().unwrap()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }); - } - - #[bench] - fn multi_thread_send_recv(b: &mut Bencher) { - const MAX: usize = 10_000_000; - - use std::thread; - - fn spin<F: Future>(mut f: F) -> Result<F::Item, F::Error> { - use futures::Async::Ready; - loop { - match f.poll() { - Ok(Ready(v)) => return Ok(v), - Ok(_) => {} - Err(e) => return Err(e), - } - } - } - - let mut ping_txs = vec![]; - let mut ping_rxs = vec![]; - let mut pong_txs = vec![]; - let mut pong_rxs = vec![]; - - for _ in 0..MAX { - let (tx, rx) = oneshot::channel::<()>(); - - ping_txs.push(Some(tx)); - ping_rxs.push(Some(rx)); - - let (tx, rx) = oneshot::channel::<()>(); - - pong_txs.push(Some(tx)); - pong_rxs.push(Some(rx)); - } - - thread::spawn(move || { - future::lazy(|| { - for i in 0..MAX { - let ping_rx = ping_rxs[i].take().unwrap(); - let pong_tx = pong_txs[i].take().unwrap(); - - if spin(ping_rx).is_err() { - return Ok(()); - } - - pong_tx.send(()).unwrap(); - } - - Ok::<(), ()>(()) - }) - .wait() - .unwrap(); - }); - - future::lazy(|| { - let mut i = 0; - - b.iter(|| { - let ping_tx = ping_txs[i].take().unwrap(); - let pong_rx = pong_rxs[i].take().unwrap(); - - ping_tx.send(()).unwrap(); - spin(pong_rx).unwrap(); - - i += 1; - }); - - Ok::<(), ()>(()) - }) - .wait() - .unwrap(); - } -} - -mod legacy { - use futures::sync::oneshot; - use futures::{future, Async, Future}; - use test::Bencher; - - #[bench] - fn new(b: &mut Bencher) { - b.iter(|| { - let _ = ::test::black_box(&oneshot::channel::<i32>()); - }) - } - - #[bench] - fn same_thread_send_recv(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = oneshot::channel(); - - let _ = tx.send(1); - - assert_eq!(Async::Ready(1), rx.poll().unwrap()); - }); - } - - #[bench] - fn same_thread_recv_multi_send_recv(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = oneshot::channel(); - - future::lazy(|| { - let _ = rx.poll(); - let _ = rx.poll(); - let _ = rx.poll(); - let _ = rx.poll(); - - let _ = tx.send(1); - assert_eq!(Async::Ready(1), rx.poll().unwrap()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }); - } - - #[bench] - fn multi_thread_send_recv(b: &mut Bencher) { - const MAX: usize = 10_000_000; - - use std::thread; - - fn spin<F: Future>(mut f: F) -> Result<F::Item, F::Error> { - use futures::Async::Ready; - loop { - match f.poll() { - Ok(Ready(v)) => return Ok(v), - Ok(_) => {} - Err(e) => return Err(e), - } - } - } - - let mut ping_txs = vec![]; - let mut ping_rxs = vec![]; - let mut pong_txs = vec![]; - let mut pong_rxs = vec![]; - - for _ in 0..MAX { - let (tx, rx) = oneshot::channel::<()>(); - - ping_txs.push(Some(tx)); - ping_rxs.push(Some(rx)); - - let (tx, rx) = oneshot::channel::<()>(); - - pong_txs.push(Some(tx)); - pong_rxs.push(Some(rx)); - } - - thread::spawn(move || { - future::lazy(|| { - for i in 0..MAX { - let ping_rx = ping_rxs[i].take().unwrap(); - let pong_tx = pong_txs[i].take().unwrap(); - - if spin(ping_rx).is_err() { - return Ok(()); - } - - pong_tx.send(()).unwrap(); - } - - Ok::<(), ()>(()) - }) - .wait() - .unwrap(); - }); - - future::lazy(|| { - let mut i = 0; - - b.iter(|| { - let ping_tx = ping_txs[i].take().unwrap(); - let pong_rx = pong_rxs[i].take().unwrap(); - - ping_tx.send(()).unwrap(); - spin(pong_rx).unwrap(); - - i += 1; - }); - - Ok::<(), ()>(()) - }) - .wait() - .unwrap(); - } -} |