From 97c2c4203cd7c42960cac895987c43a17dff052e Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 13 Nov 2020 19:30:52 -0800 Subject: chore: automate running benchmarks (#3140) Uses Github actions to run benchmarks. --- benches/Cargo.toml | 8 +- benches/mpsc.rs | 175 ------------------------------------------- benches/rt_multi_threaded.rs | 151 +++++++++++++++++++++++++++++++++++++ benches/scheduler.rs | 151 ------------------------------------- benches/sync_mpsc.rs | 175 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 330 insertions(+), 330 deletions(-) delete mode 100644 benches/mpsc.rs create mode 100644 benches/rt_multi_threaded.rs delete mode 100644 benches/scheduler.rs create mode 100644 benches/sync_mpsc.rs (limited to 'benches') diff --git a/benches/Cargo.toml b/benches/Cargo.toml index cca0ece5..d0383f36 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -17,13 +17,13 @@ path = "spawn.rs" harness = false [[bench]] -name = "mpsc" -path = "mpsc.rs" +name = "sync_mpsc" +path = "sync_mpsc.rs" harness = false [[bench]] -name = "scheduler" -path = "scheduler.rs" +name = "rt_multi_threaded" +path = "rt_multi_threaded.rs" harness = false diff --git a/benches/mpsc.rs b/benches/mpsc.rs deleted file mode 100644 index 3f7e3fca..00000000 --- a/benches/mpsc.rs +++ /dev/null @@ -1,175 +0,0 @@ -use bencher::{black_box, Bencher}; -use tokio::sync::mpsc; - -type Medium = [usize; 64]; -type Large = [Medium; 64]; - -fn rt() -> tokio::runtime::Runtime { - tokio::runtime::Builder::new_multi_thread() - .worker_threads(6) - .build() - .unwrap() -} - -fn create_1_medium(b: &mut Bencher) { - b.iter(|| { - black_box(&mpsc::channel::(1)); - }); -} - -fn create_100_medium(b: &mut Bencher) { - b.iter(|| { - black_box(&mpsc::channel::(100)); - }); -} - -fn create_100_000_medium(b: &mut Bencher) { - b.iter(|| { - black_box(&mpsc::channel::(100_000)); - }); -} - -fn send_medium(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = mpsc::channel::(1000); - - let _ = tx.try_send([0; 64]); - - rx.try_recv().unwrap(); - }); -} - -fn send_large(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = mpsc::channel::(1000); - - let _ = tx.try_send([[0; 64]; 64]); - - rx.try_recv().unwrap(); - }); -} - -fn contention_bounded(b: &mut Bencher) { - let rt = rt(); - - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::channel::(1_000_000); - - for _ in 0..5 { - let tx = tx.clone(); - tokio::spawn(async move { - for i in 0..1000 { - tx.send(i).await.unwrap(); - } - }); - } - - for _ in 0..1_000 * 5 { - let _ = rx.recv().await; - } - }) - }); -} - -fn contention_bounded_full(b: &mut Bencher) { - let rt = rt(); - - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::channel::(100); - - for _ in 0..5 { - let tx = tx.clone(); - tokio::spawn(async move { - for i in 0..1000 { - tx.send(i).await.unwrap(); - } - }); - } - - for _ in 0..1_000 * 5 { - let _ = rx.recv().await; - } - }) - }); -} - -fn contention_unbounded(b: &mut Bencher) { - let rt = rt(); - - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::unbounded_channel::(); - - for _ in 0..5 { - let tx = tx.clone(); - tokio::spawn(async move { - for i in 0..1000 { - tx.send(i).unwrap(); - } - }); - } - - for _ in 0..1_000 * 5 { - let _ = rx.recv().await; - } - }) - }); -} - -fn uncontented_bounded(b: &mut Bencher) { - let rt = rt(); - - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::channel::(1_000_000); - - for i in 0..5000 { - tx.send(i).await.unwrap(); - } - - for _ in 0..5_000 { - let _ = rx.recv().await; - } - }) - }); -} - -fn uncontented_unbounded(b: &mut Bencher) { - let rt = rt(); - - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::unbounded_channel::(); - - for i in 0..5000 { - tx.send(i).unwrap(); - } - - for _ in 0..5_000 { - let _ = rx.recv().await; - } - }) - }); -} - -bencher::benchmark_group!( - create, - create_1_medium, - create_100_medium, - create_100_000_medium -); - -bencher::benchmark_group!(send, send_medium, send_large); - -bencher::benchmark_group!( - contention, - contention_bounded, - contention_bounded_full, - contention_unbounded, - uncontented_bounded, - uncontented_unbounded -); - -bencher::benchmark_main!(create, send, contention); diff --git a/benches/rt_multi_threaded.rs b/benches/rt_multi_threaded.rs new file mode 100644 index 00000000..68a6d6a4 --- /dev/null +++ b/benches/rt_multi_threaded.rs @@ -0,0 +1,151 @@ +//! Benchmark implementation details of the theaded scheduler. These benches are +//! intended to be used as a form of regression testing and not as a general +//! purpose benchmark demonstrating real-world performance. + +use tokio::runtime::{self, Runtime}; +use tokio::sync::oneshot; + +use bencher::{benchmark_group, benchmark_main, Bencher}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::{mpsc, Arc}; + +fn spawn_many(b: &mut Bencher) { + const NUM_SPAWN: usize = 10_000; + + let rt = rt(); + + let (tx, rx) = mpsc::sync_channel(1000); + let rem = Arc::new(AtomicUsize::new(0)); + + b.iter(|| { + rem.store(NUM_SPAWN, Relaxed); + + rt.block_on(async { + for _ in 0..NUM_SPAWN { + let tx = tx.clone(); + let rem = rem.clone(); + + tokio::spawn(async move { + if 1 == rem.fetch_sub(1, Relaxed) { + tx.send(()).unwrap(); + } + }); + } + + let _ = rx.recv().unwrap(); + }); + }); +} + +fn yield_many(b: &mut Bencher) { + const NUM_YIELD: usize = 1_000; + const TASKS: usize = 200; + + let rt = rt(); + + let (tx, rx) = mpsc::sync_channel(TASKS); + + b.iter(move || { + for _ in 0..TASKS { + let tx = tx.clone(); + + rt.spawn(async move { + for _ in 0..NUM_YIELD { + tokio::task::yield_now().await; + } + + tx.send(()).unwrap(); + }); + } + + for _ in 0..TASKS { + let _ = rx.recv().unwrap(); + } + }); +} + +fn ping_pong(b: &mut Bencher) { + const NUM_PINGS: usize = 1_000; + + let rt = rt(); + + let (done_tx, done_rx) = mpsc::sync_channel(1000); + let rem = Arc::new(AtomicUsize::new(0)); + + b.iter(|| { + let done_tx = done_tx.clone(); + let rem = rem.clone(); + rem.store(NUM_PINGS, Relaxed); + + rt.block_on(async { + tokio::spawn(async move { + for _ in 0..NUM_PINGS { + let rem = rem.clone(); + let done_tx = done_tx.clone(); + + tokio::spawn(async move { + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + tokio::spawn(async move { + rx1.await.unwrap(); + tx2.send(()).unwrap(); + }); + + tx1.send(()).unwrap(); + rx2.await.unwrap(); + + if 1 == rem.fetch_sub(1, Relaxed) { + done_tx.send(()).unwrap(); + } + }); + } + }); + + done_rx.recv().unwrap(); + }); + }); +} + +fn chained_spawn(b: &mut Bencher) { + const ITER: usize = 1_000; + + let rt = rt(); + + fn iter(done_tx: mpsc::SyncSender<()>, n: usize) { + if n == 0 { + done_tx.send(()).unwrap(); + } else { + tokio::spawn(async move { + iter(done_tx, n - 1); + }); + } + } + + let (done_tx, done_rx) = mpsc::sync_channel(1000); + + b.iter(move || { + let done_tx = done_tx.clone(); + + rt.block_on(async { + tokio::spawn(async move { + iter(done_tx, ITER); + }); + + done_rx.recv().unwrap(); + }); + }); +} + +fn rt() -> Runtime { + runtime::Builder::new_multi_thread() + .worker_threads(4) + .enable_all() + .build() + .unwrap() +} + +benchmark_group!(scheduler, spawn_many, ping_pong, yield_many, chained_spawn,); + +benchmark_main!(scheduler); diff --git a/benches/scheduler.rs b/benches/scheduler.rs deleted file mode 100644 index 68a6d6a4..00000000 --- a/benches/scheduler.rs +++ /dev/null @@ -1,151 +0,0 @@ -//! Benchmark implementation details of the theaded scheduler. These benches are -//! intended to be used as a form of regression testing and not as a general -//! purpose benchmark demonstrating real-world performance. - -use tokio::runtime::{self, Runtime}; -use tokio::sync::oneshot; - -use bencher::{benchmark_group, benchmark_main, Bencher}; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::Relaxed; -use std::sync::{mpsc, Arc}; - -fn spawn_many(b: &mut Bencher) { - const NUM_SPAWN: usize = 10_000; - - let rt = rt(); - - let (tx, rx) = mpsc::sync_channel(1000); - let rem = Arc::new(AtomicUsize::new(0)); - - b.iter(|| { - rem.store(NUM_SPAWN, Relaxed); - - rt.block_on(async { - for _ in 0..NUM_SPAWN { - let tx = tx.clone(); - let rem = rem.clone(); - - tokio::spawn(async move { - if 1 == rem.fetch_sub(1, Relaxed) { - tx.send(()).unwrap(); - } - }); - } - - let _ = rx.recv().unwrap(); - }); - }); -} - -fn yield_many(b: &mut Bencher) { - const NUM_YIELD: usize = 1_000; - const TASKS: usize = 200; - - let rt = rt(); - - let (tx, rx) = mpsc::sync_channel(TASKS); - - b.iter(move || { - for _ in 0..TASKS { - let tx = tx.clone(); - - rt.spawn(async move { - for _ in 0..NUM_YIELD { - tokio::task::yield_now().await; - } - - tx.send(()).unwrap(); - }); - } - - for _ in 0..TASKS { - let _ = rx.recv().unwrap(); - } - }); -} - -fn ping_pong(b: &mut Bencher) { - const NUM_PINGS: usize = 1_000; - - let rt = rt(); - - let (done_tx, done_rx) = mpsc::sync_channel(1000); - let rem = Arc::new(AtomicUsize::new(0)); - - b.iter(|| { - let done_tx = done_tx.clone(); - let rem = rem.clone(); - rem.store(NUM_PINGS, Relaxed); - - rt.block_on(async { - tokio::spawn(async move { - for _ in 0..NUM_PINGS { - let rem = rem.clone(); - let done_tx = done_tx.clone(); - - tokio::spawn(async move { - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - - tokio::spawn(async move { - rx1.await.unwrap(); - tx2.send(()).unwrap(); - }); - - tx1.send(()).unwrap(); - rx2.await.unwrap(); - - if 1 == rem.fetch_sub(1, Relaxed) { - done_tx.send(()).unwrap(); - } - }); - } - }); - - done_rx.recv().unwrap(); - }); - }); -} - -fn chained_spawn(b: &mut Bencher) { - const ITER: usize = 1_000; - - let rt = rt(); - - fn iter(done_tx: mpsc::SyncSender<()>, n: usize) { - if n == 0 { - done_tx.send(()).unwrap(); - } else { - tokio::spawn(async move { - iter(done_tx, n - 1); - }); - } - } - - let (done_tx, done_rx) = mpsc::sync_channel(1000); - - b.iter(move || { - let done_tx = done_tx.clone(); - - rt.block_on(async { - tokio::spawn(async move { - iter(done_tx, ITER); - }); - - done_rx.recv().unwrap(); - }); - }); -} - -fn rt() -> Runtime { - runtime::Builder::new_multi_thread() - .worker_threads(4) - .enable_all() - .build() - .unwrap() -} - -benchmark_group!(scheduler, spawn_many, ping_pong, yield_many, chained_spawn,); - -benchmark_main!(scheduler); diff --git a/benches/sync_mpsc.rs b/benches/sync_mpsc.rs new file mode 100644 index 00000000..3f7e3fca --- /dev/null +++ b/benches/sync_mpsc.rs @@ -0,0 +1,175 @@ +use bencher::{black_box, Bencher}; +use tokio::sync::mpsc; + +type Medium = [usize; 64]; +type Large = [Medium; 64]; + +fn rt() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(6) + .build() + .unwrap() +} + +fn create_1_medium(b: &mut Bencher) { + b.iter(|| { + black_box(&mpsc::channel::(1)); + }); +} + +fn create_100_medium(b: &mut Bencher) { + b.iter(|| { + black_box(&mpsc::channel::(100)); + }); +} + +fn create_100_000_medium(b: &mut Bencher) { + b.iter(|| { + black_box(&mpsc::channel::(100_000)); + }); +} + +fn send_medium(b: &mut Bencher) { + b.iter(|| { + let (tx, mut rx) = mpsc::channel::(1000); + + let _ = tx.try_send([0; 64]); + + rx.try_recv().unwrap(); + }); +} + +fn send_large(b: &mut Bencher) { + b.iter(|| { + let (tx, mut rx) = mpsc::channel::(1000); + + let _ = tx.try_send([[0; 64]; 64]); + + rx.try_recv().unwrap(); + }); +} + +fn contention_bounded(b: &mut Bencher) { + let rt = rt(); + + b.iter(|| { + rt.block_on(async move { + let (tx, mut rx) = mpsc::channel::(1_000_000); + + for _ in 0..5 { + let tx = tx.clone(); + tokio::spawn(async move { + for i in 0..1000 { + tx.send(i).await.unwrap(); + } + }); + } + + for _ in 0..1_000 * 5 { + let _ = rx.recv().await; + } + }) + }); +} + +fn contention_bounded_full(b: &mut Bencher) { + let rt = rt(); + + b.iter(|| { + rt.block_on(async move { + let (tx, mut rx) = mpsc::channel::(100); + + for _ in 0..5 { + let tx = tx.clone(); + tokio::spawn(async move { + for i in 0..1000 { + tx.send(i).await.unwrap(); + } + }); + } + + for _ in 0..1_000 * 5 { + let _ = rx.recv().await; + } + }) + }); +} + +fn contention_unbounded(b: &mut Bencher) { + let rt = rt(); + + b.iter(|| { + rt.block_on(async move { + let (tx, mut rx) = mpsc::unbounded_channel::(); + + for _ in 0..5 { + let tx = tx.clone(); + tokio::spawn(async move { + for i in 0..1000 { + tx.send(i).unwrap(); + } + }); + } + + for _ in 0..1_000 * 5 { + let _ = rx.recv().await; + } + }) + }); +} + +fn uncontented_bounded(b: &mut Bencher) { + let rt = rt(); + + b.iter(|| { + rt.block_on(async move { + let (tx, mut rx) = mpsc::channel::(1_000_000); + + for i in 0..5000 { + tx.send(i).await.unwrap(); + } + + for _ in 0..5_000 { + let _ = rx.recv().await; + } + }) + }); +} + +fn uncontented_unbounded(b: &mut Bencher) { + let rt = rt(); + + b.iter(|| { + rt.block_on(async move { + let (tx, mut rx) = mpsc::unbounded_channel::(); + + for i in 0..5000 { + tx.send(i).unwrap(); + } + + for _ in 0..5_000 { + let _ = rx.recv().await; + } + }) + }); +} + +bencher::benchmark_group!( + create, + create_1_medium, + create_100_medium, + create_100_000_medium +); + +bencher::benchmark_group!(send, send_medium, send_large); + +bencher::benchmark_group!( + contention, + contention_bounded, + contention_bounded_full, + contention_unbounded, + uncontented_bounded, + uncontented_unbounded +); + +bencher::benchmark_main!(create, send, contention); -- cgit v1.2.3