summaryrefslogtreecommitdiffstats
path: root/benches
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-05-14 10:27:36 -0700
committerGitHub <noreply@github.com>2019-05-14 10:27:36 -0700
commitcb4aea394e6851ae8cc45a68beeaf2c93cc9a0c0 (patch)
tree2158eab230c8717d3b35717e50f14fda6ca0edf1 /benches
parent79d88200500f6e6c9970e1ad26469276c1a2f71f (diff)
Update Tokio to Rust 2018 (#1082)
Diffstat (limited to 'benches')
-rw-r--r--benches/latency.rs115
-rw-r--r--benches/mio-ops.rs57
-rw-r--r--benches/tcp.rs261
3 files changed, 0 insertions, 433 deletions
diff --git a/benches/latency.rs b/benches/latency.rs
deleted file mode 100644
index d9ace03a..00000000
--- a/benches/latency.rs
+++ /dev/null
@@ -1,115 +0,0 @@
-#![feature(test)]
-#![deny(warnings)]
-
-extern crate test;
-#[macro_use]
-extern crate futures;
-extern crate tokio;
-
-use std::io;
-use std::net::SocketAddr;
-use std::thread;
-
-use futures::sync::mpsc;
-use futures::sync::oneshot;
-use futures::{Future, Poll, Sink, Stream};
-use test::Bencher;
-use tokio::net::UdpSocket;
-
-/// UDP echo server
-struct EchoServer {
- socket: UdpSocket,
- buf: Vec<u8>,
- to_send: Option<(usize, SocketAddr)>,
-}
-
-impl EchoServer {
- fn new(s: UdpSocket) -> Self {
- EchoServer {
- socket: s,
- to_send: None,
- buf: vec![0u8; 1600],
- }
- }
-}
-
-impl Future for EchoServer {
- type Item = ();
- type Error = io::Error;
-
- fn poll(&mut self) -> Poll<(), io::Error> {
- loop {
- if let Some(&(size, peer)) = self.to_send.as_ref() {
- try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer));
- self.to_send = None;
- }
- self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf)));
- }
- }
-}
-
-#[bench]
-fn udp_echo_latency(b: &mut Bencher) {
- let any_addr = "127.0.0.1:0".to_string();
- let any_addr = any_addr.parse::<SocketAddr>().unwrap();
-
- let (stop_c, stop_p) = oneshot::channel::<()>();
- let (tx, rx) = oneshot::channel();
-
- let child = thread::spawn(move || {
- let socket = tokio::net::UdpSocket::bind(&any_addr).unwrap();
- tx.send(socket.local_addr().unwrap()).unwrap();
-
- let server = EchoServer::new(socket);
- let server = server.select(stop_p.map_err(|_| panic!()));
- let server = server.map_err(|_| ());
- server.wait().unwrap();
- });
-
- let client = std::net::UdpSocket::bind(&any_addr).unwrap();
-
- let server_addr = rx.wait().unwrap();
- let mut buf = [0u8; 1000];
-
- // warmup phase; for some reason initial couple of
- // runs are much slower
- //
- // TODO: Describe the exact reasons; caching? branch predictor? lazy closures?
- for _ in 0..8 {
- client.send_to(&buf, &server_addr).unwrap();
- let _ = client.recv_from(&mut buf).unwrap();
- }
-
- b.iter(|| {
- client.send_to(&buf, &server_addr).unwrap();
- let _ = client.recv_from(&mut buf).unwrap();
- });
-
- stop_c.send(()).unwrap();
- child.join().unwrap();
-}
-
-#[bench]
-fn futures_channel_latency(b: &mut Bencher) {
- let (mut in_tx, in_rx) = mpsc::channel(32);
- let (out_tx, out_rx) = mpsc::channel::<_>(32);
-
- let child = thread::spawn(|| out_tx.send_all(in_rx.then(|r| r.unwrap())).wait());
- let mut rx_iter = out_rx.wait();
-
- // warmup phase; for some reason initial couple of runs are much slower
- //
- // TODO: Describe the exact reasons; caching? branch predictor? lazy closures?
- for _ in 0..8 {
- in_tx.start_send(Ok(1usize)).unwrap();
- let _ = rx_iter.next();
- }
-
- b.iter(|| {
- in_tx.start_send(Ok(1usize)).unwrap();
- let _ = rx_iter.next();
- });
-
- drop(in_tx);
- child.join().unwrap().unwrap();
-}
diff --git a/benches/mio-ops.rs b/benches/mio-ops.rs
deleted file mode 100644
index be4db7f7..00000000
--- a/benches/mio-ops.rs
+++ /dev/null
@@ -1,57 +0,0 @@
-// Measure cost of different operations
-// to get a sense of performance tradeoffs
-#![feature(test)]
-#![deny(warnings)]
-
-extern crate mio;
-extern crate test;
-
-use test::Bencher;
-
-use mio::tcp::TcpListener;
-use mio::{PollOpt, Ready, Token};
-
-#[bench]
-fn mio_register_deregister(b: &mut Bencher) {
- let addr = "127.0.0.1:0".parse().unwrap();
- // Setup the server socket
- let sock = TcpListener::bind(&addr).unwrap();
- let poll = mio::Poll::new().unwrap();
-
- const CLIENT: Token = Token(1);
-
- b.iter(|| {
- poll.register(&sock, CLIENT, Ready::readable(), PollOpt::edge())
- .unwrap();
- poll.deregister(&sock).unwrap();
- });
-}
-
-#[bench]
-fn mio_reregister(b: &mut Bencher) {
- let addr = "127.0.0.1:0".parse().unwrap();
- // Setup the server socket
- let sock = TcpListener::bind(&addr).unwrap();
- let poll = mio::Poll::new().unwrap();
-
- const CLIENT: Token = Token(1);
- poll.register(&sock, CLIENT, Ready::readable(), PollOpt::edge())
- .unwrap();
-
- b.iter(|| {
- poll.reregister(&sock, CLIENT, Ready::readable(), PollOpt::edge())
- .unwrap();
- });
- poll.deregister(&sock).unwrap();
-}
-
-#[bench]
-fn mio_poll(b: &mut Bencher) {
- let poll = mio::Poll::new().unwrap();
- let timeout = std::time::Duration::new(0, 0);
- let mut events = mio::Events::with_capacity(1024);
-
- b.iter(|| {
- poll.poll(&mut events, Some(timeout)).unwrap();
- });
-}
diff --git a/benches/tcp.rs b/benches/tcp.rs
deleted file mode 100644
index 1872790d..00000000
--- a/benches/tcp.rs
+++ /dev/null
@@ -1,261 +0,0 @@
-#![feature(test)]
-#![deny(warnings)]
-
-extern crate futures;
-extern crate tokio;
-
-#[macro_use]
-extern crate tokio_io;
-
-pub extern crate test;
-
-mod prelude {
- pub use futures::*;
- pub use tokio::net::{TcpListener, TcpStream};
- pub use tokio::reactor::Reactor;
- pub use tokio_io::io::read_to_end;
-
- pub use std::io::{self, Read, Write};
- pub use std::thread;
- pub use std::time::Duration;
- pub use test::{self, Bencher};
-}
-
-mod connect_churn {
- use prelude::*;
-
- const NUM: usize = 300;
- const CONCURRENT: usize = 8;
-
- #[bench]
- fn one_thread(b: &mut Bencher) {
- let addr = "127.0.0.1:0".parse().unwrap();
-
- b.iter(move || {
- let listener = TcpListener::bind(&addr).unwrap();
- let addr = listener.local_addr().unwrap();
-
- // Spawn a single future that accepts & drops connections
- let serve_incomings = listener
- .incoming()
- .map_err(|e| panic!("server err: {:?}", e))
- .for_each(|_| Ok(()));
-
- let connects = stream::iter_result((0..NUM).map(|_| {
- Ok(TcpStream::connect(&addr).and_then(|sock| {
- sock.set_linger(Some(Duration::from_secs(0))).unwrap();
- read_to_end(sock, vec![])
- }))
- }));
-
- let connects_concurrent = connects
- .buffer_unordered(CONCURRENT)
- .map_err(|e| panic!("client err: {:?}", e))
- .for_each(|_| Ok(()));
-
- serve_incomings
- .select(connects_concurrent)
- .map(|_| ())
- .map_err(|_| ())
- .wait()
- .unwrap();
- });
- }
-
- fn n_workers(n: usize, b: &mut Bencher) {
- let (shutdown_tx, shutdown_rx) = sync::oneshot::channel();
- let (addr_tx, addr_rx) = sync::oneshot::channel();
-
- // Spawn reactor thread
- let server_thread = thread::spawn(move || {
- // Bind the TCP listener
- let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
-
- // Get the address being listened on.
- let addr = listener.local_addr().unwrap();
-
- // Send the remote & address back to the main thread
- addr_tx.send(addr).unwrap();
-
- // Spawn a single future that accepts & drops connections
- let serve_incomings = listener
- .incoming()
- .map_err(|e| panic!("server err: {:?}", e))
- .for_each(|_| Ok(()));
-
- // Run server
- serve_incomings
- .select(shutdown_rx)
- .map(|_| ())
- .map_err(|_| ())
- .wait()
- .unwrap();
- });
-
- // Get the bind addr of the server
- let addr = addr_rx.wait().unwrap();
-
- b.iter(move || {
- use std::sync::{Arc, Barrier};
-
- // Create a barrier to coordinate threads
- let barrier = Arc::new(Barrier::new(n + 1));
-
- // Spawn worker threads
- let threads: Vec<_> = (0..n)
- .map(|_| {
- let barrier = barrier.clone();
- let addr = addr.clone();
-
- thread::spawn(move || {
- let connects = stream::iter_result((0..(NUM / n)).map(|_| {
- Ok(TcpStream::connect(&addr)
- .map_err(|e| panic!("connect err: {:?}", e))
- .and_then(|sock| {
- sock.set_linger(Some(Duration::from_secs(0))).unwrap();
- read_to_end(sock, vec![])
- }))
- }));
-
- barrier.wait();
-
- connects
- .buffer_unordered(CONCURRENT)
- .map_err(|e| panic!("client err: {:?}", e))
- .for_each(|_| Ok(()))
- .wait()
- .unwrap();
- })
- })
- .collect();
-
- barrier.wait();
-
- for th in threads {
- th.join().unwrap();
- }
- });
-
- // Shutdown the server
- shutdown_tx.send(()).unwrap();
- server_thread.join().unwrap();
- }
-
- #[bench]
- fn two_threads(b: &mut Bencher) {
- n_workers(1, b);
- }
-
- #[bench]
- fn multi_threads(b: &mut Bencher) {
- n_workers(4, b);
- }
-}
-
-mod transfer {
- use prelude::*;
- use std::{cmp, mem};
-
- const MB: usize = 3 * 1024 * 1024;
-
- struct Drain {
- sock: TcpStream,
- chunk: usize,
- }
-
- impl Future for Drain {
- type Item = ();
- type Error = io::Error;
-
- fn poll(&mut self) -> Poll<(), io::Error> {
- let mut buf: [u8; 1024] = unsafe { mem::uninitialized() };
-
- loop {
- match try_nb!(self.sock.read(&mut buf[..self.chunk])) {
- 0 => return Ok(Async::Ready(())),
- _ => {}
- }
- }
- }
- }
-
- struct Transfer {
- sock: TcpStream,
- rem: usize,
- chunk: usize,
- }
-
- impl Future for Transfer {
- type Item = ();
- type Error = io::Error;
-
- fn poll(&mut self) -> Poll<(), io::Error> {
- while self.rem > 0 {
- let len = cmp::min(self.rem, self.chunk);
- let buf = &DATA[..len];
-
- let n = try_nb!(self.sock.write(&buf));
- self.rem -= n;
- }
-
- Ok(Async::Ready(()))
- }
- }
-
- static DATA: [u8; 1024] = [0; 1024];
-
- fn one_thread(b: &mut Bencher, read_size: usize, write_size: usize) {
- let addr = "127.0.0.1:0".parse().unwrap();
-
- b.iter(move || {
- let listener = TcpListener::bind(&addr).unwrap();
- let addr = listener.local_addr().unwrap();
-
- // Spawn a single future that accepts 1 connection, Drain it and drops
- let server = listener
- .incoming()
- .into_future() // take the first connection
- .map_err(|(e, _other_incomings)| e)
- .map(|(connection, _other_incomings)| connection.unwrap())
- .and_then(|sock| {
- sock.set_linger(Some(Duration::from_secs(0))).unwrap();
- let drain = Drain {
- sock: sock,
- chunk: read_size,
- };
- drain
- .map(|_| ())
- .map_err(|e| panic!("server error: {:?}", e))
- })
- .map_err(|e| panic!("server err: {:?}", e));
-
- let client = TcpStream::connect(&addr)
- .and_then(move |sock| Transfer {
- sock: sock,
- rem: MB,
- chunk: write_size,
- })
- .map_err(|e| panic!("client err: {:?}", e));
-
- server.join(client).wait().unwrap();
- });
- }
-
- mod small_chunks {
- use prelude::*;
-
- #[bench]
- fn one_thread(b: &mut Bencher) {
- super::one_thread(b, 32, 32);
- }
- }
-
- mod big_chunks {
- use prelude::*;
-
- #[bench]
- fn one_thread(b: &mut Bencher) {
- super::one_thread(b, 1_024, 1_024);
- }
- }
-}