summaryrefslogtreecommitdiffstats
path: root/tokio-sync/benches
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-sync/benches')
-rw-r--r--tokio-sync/benches/mpsc.rs536
-rw-r--r--tokio-sync/benches/oneshot.rs239
2 files changed, 0 insertions, 775 deletions
diff --git a/tokio-sync/benches/mpsc.rs b/tokio-sync/benches/mpsc.rs
deleted file mode 100644
index 95d8e5c7..00000000
--- a/tokio-sync/benches/mpsc.rs
+++ /dev/null
@@ -1,536 +0,0 @@
-#![cfg(feature = "broken")]
-#![feature(test)]
-#![warn(rust_2018_idioms)]
-
-extern crate test;
-
-type Medium = [usize; 64];
-type Large = [Medium; 64];
-
-mod tokio {
- use futures::{future, Async, Future, Sink, Stream};
- use std::thread;
- use test::{self, Bencher};
- use tokio_sync::mpsc::*;
-
- #[bench]
- fn bounded_new_medium(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&channel::<super::Medium>(1_000));
- })
- }
-
- #[bench]
- fn unbounded_new_medium(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&unbounded_channel::<super::Medium>());
- })
- }
- #[bench]
- fn bounded_new_large(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&channel::<super::Large>(1_000));
- })
- }
-
- #[bench]
- fn unbounded_new_large(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&unbounded_channel::<super::Large>());
- })
- }
-
- #[bench]
- fn send_one_message(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel(1_000);
-
- // Send
- tx.try_send(1).unwrap();
-
- // Receive
- assert_eq!(Async::Ready(Some(1)), rx.poll().unwrap());
- })
- }
-
- #[bench]
- fn send_one_message_large(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel::<super::Large>(1_000);
-
- // Send
- let _ = tx.try_send([[0; 64]; 64]);
-
- // Receive
- let _ = test::black_box(&rx.poll());
- })
- }
-
- #[bench]
- fn bounded_rx_not_ready(b: &mut Bencher) {
- let (_tx, mut rx) = channel::<i32>(1_000);
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn bounded_tx_poll_ready(b: &mut Bencher) {
- let (mut tx, _rx) = channel::<i32>(1);
- b.iter(|| {
- future::lazy(|| {
- assert!(tx.poll_ready().unwrap().is_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn bounded_tx_poll_not_ready(b: &mut Bencher) {
- let (mut tx, _rx) = channel::<i32>(1);
- tx.try_send(1).unwrap();
- b.iter(|| {
- future::lazy(|| {
- assert!(tx.poll_ready().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn unbounded_rx_not_ready(b: &mut Bencher) {
- let (_tx, mut rx) = unbounded_channel::<i32>();
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn unbounded_rx_not_ready_x5(b: &mut Bencher) {
- let (_tx, mut rx) = unbounded_channel::<i32>();
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn bounded_uncontended_1(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel(1_000);
-
- for i in 0..1000 {
- tx.try_send(i).unwrap();
- // No need to create a task, because poll is not going to park.
- assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap());
- }
- })
- }
-
- #[bench]
- fn bounded_uncontended_1_large(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel::<super::Large>(1_000);
-
- for i in 0..1000 {
- let _ = tx.try_send([[i; 64]; 64]);
- // No need to create a task, because poll is not going to park.
- let _ = test::black_box(&rx.poll());
- }
- })
- }
-
- #[bench]
- fn bounded_uncontended_2(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel(1000);
-
- for i in 0..1000 {
- tx.try_send(i).unwrap();
- }
-
- for i in 0..1000 {
- // No need to create a task, because poll is not going to park.
- assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap());
- }
- })
- }
-
- #[bench]
- fn contended_unbounded_tx(b: &mut Bencher) {
- let mut threads = vec![];
- let mut txs = vec![];
-
- for _ in 0..4 {
- let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
- txs.push(tx);
-
- threads.push(thread::spawn(move || {
- for mut tx in rx.iter() {
- for i in 0..1_000 {
- tx.try_send(i).unwrap();
- }
- }
- }));
- }
-
- b.iter(|| {
- // TODO make unbounded
- let (tx, rx) = channel::<i32>(1_000_000);
-
- for th in &txs {
- th.send(tx.clone()).unwrap();
- }
-
- drop(tx);
-
- let rx = rx.wait().take(4 * 1_000);
-
- for v in rx {
- let _ = test::black_box(v);
- }
- });
-
- drop(txs);
-
- for th in threads {
- th.join().unwrap();
- }
- }
-
- #[bench]
- fn contended_bounded_tx(b: &mut Bencher) {
- const THREADS: usize = 4;
- const ITERS: usize = 100;
-
- let mut threads = vec![];
- let mut txs = vec![];
-
- for _ in 0..THREADS {
- let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
- txs.push(tx);
-
- threads.push(thread::spawn(move || {
- for tx in rx.iter() {
- let mut tx = tx.wait();
- for i in 0..ITERS {
- tx.send(i as i32).unwrap();
- }
- }
- }));
- }
-
- b.iter(|| {
- let (tx, rx) = channel::<i32>(1);
-
- for th in &txs {
- th.send(tx.clone()).unwrap();
- }
-
- drop(tx);
-
- let rx = rx.wait().take(THREADS * ITERS);
-
- for v in rx {
- let _ = test::black_box(v);
- }
- });
-
- drop(txs);
-
- for th in threads {
- th.join().unwrap();
- }
- }
-}
-
-mod legacy {
- use futures::sync::mpsc::*;
- use futures::{future, Async, Future, Sink, Stream};
- use std::thread;
- use test::{self, Bencher};
-
- #[bench]
- fn bounded_new_medium(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&channel::<super::Medium>(1_000));
- })
- }
-
- #[bench]
- fn unbounded_new_medium(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&unbounded::<super::Medium>());
- })
- }
-
- #[bench]
- fn bounded_new_large(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&channel::<super::Large>(1_000));
- })
- }
-
- #[bench]
- fn unbounded_new_large(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&unbounded::<super::Large>());
- })
- }
-
- #[bench]
- fn send_one_message(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel(1_000);
-
- // Send
- tx.try_send(1).unwrap();
-
- // Receive
- assert_eq!(Ok(Async::Ready(Some(1))), rx.poll());
- })
- }
-
- #[bench]
- fn send_one_message_large(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel::<super::Large>(1_000);
-
- // Send
- let _ = tx.try_send([[0; 64]; 64]);
-
- // Receive
- let _ = test::black_box(&rx.poll());
- })
- }
-
- #[bench]
- fn bounded_rx_not_ready(b: &mut Bencher) {
- let (_tx, mut rx) = channel::<i32>(1_000);
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn bounded_tx_poll_ready(b: &mut Bencher) {
- let (mut tx, _rx) = channel::<i32>(0);
- b.iter(|| {
- future::lazy(|| {
- assert!(tx.poll_ready().unwrap().is_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn bounded_tx_poll_not_ready(b: &mut Bencher) {
- let (mut tx, _rx) = channel::<i32>(0);
- tx.try_send(1).unwrap();
- b.iter(|| {
- future::lazy(|| {
- assert!(tx.poll_ready().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn unbounded_rx_not_ready(b: &mut Bencher) {
- let (_tx, mut rx) = unbounded::<i32>();
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn unbounded_rx_not_ready_x5(b: &mut Bencher) {
- let (_tx, mut rx) = unbounded::<i32>();
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
- }
-
- #[bench]
- fn unbounded_uncontended_1(b: &mut Bencher) {
- b.iter(|| {
- let (tx, mut rx) = unbounded();
-
- for i in 0..1000 {
- UnboundedSender::unbounded_send(&tx, i).expect("send");
- // No need to create a task, because poll is not going to park.
- assert_eq!(Ok(Async::Ready(Some(i))), rx.poll());
- }
- })
- }
-
- #[bench]
- fn unbounded_uncontended_1_large(b: &mut Bencher) {
- b.iter(|| {
- let (tx, mut rx) = unbounded::<super::Large>();
-
- for i in 0..1000 {
- let _ = UnboundedSender::unbounded_send(&tx, [[i; 64]; 64]);
- // No need to create a task, because poll is not going to park.
- let _ = test::black_box(&rx.poll());
- }
- })
- }
-
- #[bench]
- fn unbounded_uncontended_2(b: &mut Bencher) {
- b.iter(|| {
- let (tx, mut rx) = unbounded();
-
- for i in 0..1000 {
- UnboundedSender::unbounded_send(&tx, i).expect("send");
- }
-
- for i in 0..1000 {
- // No need to create a task, because poll is not going to park.
- assert_eq!(Ok(Async::Ready(Some(i))), rx.poll());
- }
- })
- }
-
- #[bench]
- fn multi_thread_unbounded_tx(b: &mut Bencher) {
- let mut threads = vec![];
- let mut txs = vec![];
-
- for _ in 0..4 {
- let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
- txs.push(tx);
-
- threads.push(thread::spawn(move || {
- for mut tx in rx.iter() {
- for i in 0..1_000 {
- tx.try_send(i).unwrap();
- }
- }
- }));
- }
-
- b.iter(|| {
- let (tx, rx) = channel::<i32>(1_000_000);
-
- for th in &txs {
- th.send(tx.clone()).unwrap();
- }
-
- drop(tx);
-
- let rx = rx.wait().take(4 * 1_000);
-
- for v in rx {
- let _ = test::black_box(v);
- }
- });
-
- drop(txs);
-
- for th in threads {
- th.join().unwrap();
- }
- }
-
- #[bench]
- fn contended_bounded_tx(b: &mut Bencher) {
- const THREADS: usize = 4;
- const ITERS: usize = 100;
-
- let mut threads = vec![];
- let mut txs = vec![];
-
- for _ in 0..THREADS {
- let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
- txs.push(tx);
-
- threads.push(thread::spawn(move || {
- for tx in rx.iter() {
- let mut tx = tx.wait();
- for i in 0..ITERS {
- tx.send(i as i32).unwrap();
- }
- }
- }));
- }
-
- b.iter(|| {
- let (tx, rx) = channel::<i32>(1);
-
- for th in &txs {
- th.send(tx.clone()).unwrap();
- }
-
- drop(tx);
-
- let rx = rx.wait().take(THREADS * ITERS);
-
- for v in rx {
- let _ = test::black_box(v);
- }
- });
-
- drop(txs);
-
- for th in threads {
- th.join().unwrap();
- }
- }
-}
diff --git a/tokio-sync/benches/oneshot.rs b/tokio-sync/benches/oneshot.rs
deleted file mode 100644
index b2f37805..00000000
--- a/tokio-sync/benches/oneshot.rs
+++ /dev/null
@@ -1,239 +0,0 @@
-#![cfg(feature = "broken")]
-#![feature(test)]
-#![warn(rust_2018_idioms)]
-
-extern crate test;
-
-mod tokio {
- use futures::{future, Async, Future};
- use test::Bencher;
- use tokio_sync::oneshot;
-
- #[bench]
- fn new(b: &mut Bencher) {
- b.iter(|| {
- let _ = ::test::black_box(&oneshot::channel::<i32>());
- })
- }
-
- #[bench]
- fn same_thread_send_recv(b: &mut Bencher) {
- b.iter(|| {
- let (tx, mut rx) = oneshot::channel();
-
- let _ = tx.send(1);
-
- assert_eq!(Async::Ready(1), rx.poll().unwrap());
- });
- }
-
- #[bench]
- fn same_thread_recv_multi_send_recv(b: &mut Bencher) {
- b.iter(|| {
- let (tx, mut rx) = oneshot::channel();
-
- future::lazy(|| {
- let _ = rx.poll();
- let _ = rx.poll();
- let _ = rx.poll();
- let _ = rx.poll();
-
- let _ = tx.send(1);
- assert_eq!(Async::Ready(1), rx.poll().unwrap());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- });
- }
-
- #[bench]
- fn multi_thread_send_recv(b: &mut Bencher) {
- const MAX: usize = 10_000_000;
-
- use std::thread;
-
- fn spin<F: Future>(mut f: F) -> Result<F::Item, F::Error> {
- use futures::Async::Ready;
- loop {
- match f.poll() {
- Ok(Ready(v)) => return Ok(v),
- Ok(_) => {}
- Err(e) => return Err(e),
- }
- }
- }
-
- let mut ping_txs = vec![];
- let mut ping_rxs = vec![];
- let mut pong_txs = vec![];
- let mut pong_rxs = vec![];
-
- for _ in 0..MAX {
- let (tx, rx) = oneshot::channel::<()>();
-
- ping_txs.push(Some(tx));
- ping_rxs.push(Some(rx));
-
- let (tx, rx) = oneshot::channel::<()>();
-
- pong_txs.push(Some(tx));
- pong_rxs.push(Some(rx));
- }
-
- thread::spawn(move || {
- future::lazy(|| {
- for i in 0..MAX {
- let ping_rx = ping_rxs[i].take().unwrap();
- let pong_tx = pong_txs[i].take().unwrap();
-
- if spin(ping_rx).is_err() {
- return Ok(());
- }
-
- pong_tx.send(()).unwrap();
- }
-
- Ok::<(), ()>(())
- })
- .wait()
- .unwrap();
- });
-
- future::lazy(|| {
- let mut i = 0;
-
- b.iter(|| {
- let ping_tx = ping_txs[i].take().unwrap();
- let pong_rx = pong_rxs[i].take().unwrap();
-
- ping_tx.send(()).unwrap();
- spin(pong_rx).unwrap();
-
- i += 1;
- });
-
- Ok::<(), ()>(())
- })
- .wait()
- .unwrap();
- }
-}
-
-mod legacy {
- use futures::sync::oneshot;
- use futures::{future, Async, Future};
- use test::Bencher;
-
- #[bench]
- fn new(b: &mut Bencher) {
- b.iter(|| {
- let _ = ::test::black_box(&oneshot::channel::<i32>());
- })
- }
-
- #[bench]
- fn same_thread_send_recv(b: &mut Bencher) {
- b.iter(|| {
- let (tx, mut rx) = oneshot::channel();
-
- let _ = tx.send(1);
-
- assert_eq!(Async::Ready(1), rx.poll().unwrap());
- });
- }
-
- #[bench]
- fn same_thread_recv_multi_send_recv(b: &mut Bencher) {
- b.iter(|| {
- let (tx, mut rx) = oneshot::channel();
-
- future::lazy(|| {
- let _ = rx.poll();
- let _ = rx.poll();
- let _ = rx.poll();
- let _ = rx.poll();
-
- let _ = tx.send(1);
- assert_eq!(Async::Ready(1), rx.poll().unwrap());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- });
- }
-
- #[bench]
- fn multi_thread_send_recv(b: &mut Bencher) {
- const MAX: usize = 10_000_000;
-
- use std::thread;
-
- fn spin<F: Future>(mut f: F) -> Result<F::Item, F::Error> {
- use futures::Async::Ready;
- loop {
- match f.poll() {
- Ok(Ready(v)) => return Ok(v),
- Ok(_) => {}
- Err(e) => return Err(e),
- }
- }
- }
-
- let mut ping_txs = vec![];
- let mut ping_rxs = vec![];
- let mut pong_txs = vec![];
- let mut pong_rxs = vec![];
-
- for _ in 0..MAX {
- let (tx, rx) = oneshot::channel::<()>();
-
- ping_txs.push(Some(tx));
- ping_rxs.push(Some(rx));
-
- let (tx, rx) = oneshot::channel::<()>();
-
- pong_txs.push(Some(tx));
- pong_rxs.push(Some(rx));
- }
-
- thread::spawn(move || {
- future::lazy(|| {
- for i in 0..MAX {
- let ping_rx = ping_rxs[i].take().unwrap();
- let pong_tx = pong_txs[i].take().unwrap();
-
- if spin(ping_rx).is_err() {
- return Ok(());
- }
-
- pong_tx.send(()).unwrap();
- }
-
- Ok::<(), ()>(())
- })
- .wait()
- .unwrap();
- });
-
- future::lazy(|| {
- let mut i = 0;
-
- b.iter(|| {
- let ping_tx = ping_txs[i].take().unwrap();
- let pong_rx = pong_rxs[i].take().unwrap();
-
- ping_tx.send(()).unwrap();
- spin(pong_rx).unwrap();
-
- i += 1;
- });
-
- Ok::<(), ()>(())
- })
- .wait()
- .unwrap();
- }
-}