diff options
author | Carl Lerche <me@carllerche.com> | 2019-05-14 10:27:36 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-05-14 10:27:36 -0700 |
commit | cb4aea394e6851ae8cc45a68beeaf2c93cc9a0c0 (patch) | |
tree | 2158eab230c8717d3b35717e50f14fda6ca0edf1 /benches | |
parent | 79d88200500f6e6c9970e1ad26469276c1a2f71f (diff) |
Update Tokio to Rust 2018 (#1082)
Diffstat (limited to 'benches')
-rw-r--r-- | benches/latency.rs | 115 | ||||
-rw-r--r-- | benches/mio-ops.rs | 57 | ||||
-rw-r--r-- | benches/tcp.rs | 261 |
3 files changed, 0 insertions, 433 deletions
diff --git a/benches/latency.rs b/benches/latency.rs deleted file mode 100644 index d9ace03a..00000000 --- a/benches/latency.rs +++ /dev/null @@ -1,115 +0,0 @@ -#![feature(test)] -#![deny(warnings)] - -extern crate test; -#[macro_use] -extern crate futures; -extern crate tokio; - -use std::io; -use std::net::SocketAddr; -use std::thread; - -use futures::sync::mpsc; -use futures::sync::oneshot; -use futures::{Future, Poll, Sink, Stream}; -use test::Bencher; -use tokio::net::UdpSocket; - -/// UDP echo server -struct EchoServer { - socket: UdpSocket, - buf: Vec<u8>, - to_send: Option<(usize, SocketAddr)>, -} - -impl EchoServer { - fn new(s: UdpSocket) -> Self { - EchoServer { - socket: s, - to_send: None, - buf: vec![0u8; 1600], - } - } -} - -impl Future for EchoServer { - type Item = (); - type Error = io::Error; - - fn poll(&mut self) -> Poll<(), io::Error> { - loop { - if let Some(&(size, peer)) = self.to_send.as_ref() { - try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer)); - self.to_send = None; - } - self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf))); - } - } -} - -#[bench] -fn udp_echo_latency(b: &mut Bencher) { - let any_addr = "127.0.0.1:0".to_string(); - let any_addr = any_addr.parse::<SocketAddr>().unwrap(); - - let (stop_c, stop_p) = oneshot::channel::<()>(); - let (tx, rx) = oneshot::channel(); - - let child = thread::spawn(move || { - let socket = tokio::net::UdpSocket::bind(&any_addr).unwrap(); - tx.send(socket.local_addr().unwrap()).unwrap(); - - let server = EchoServer::new(socket); - let server = server.select(stop_p.map_err(|_| panic!())); - let server = server.map_err(|_| ()); - server.wait().unwrap(); - }); - - let client = std::net::UdpSocket::bind(&any_addr).unwrap(); - - let server_addr = rx.wait().unwrap(); - let mut buf = [0u8; 1000]; - - // warmup phase; for some reason initial couple of - // runs are much slower - // - // TODO: Describe the exact reasons; caching? branch predictor? lazy closures? - for _ in 0..8 { - client.send_to(&buf, &server_addr).unwrap(); - let _ = client.recv_from(&mut buf).unwrap(); - } - - b.iter(|| { - client.send_to(&buf, &server_addr).unwrap(); - let _ = client.recv_from(&mut buf).unwrap(); - }); - - stop_c.send(()).unwrap(); - child.join().unwrap(); -} - -#[bench] -fn futures_channel_latency(b: &mut Bencher) { - let (mut in_tx, in_rx) = mpsc::channel(32); - let (out_tx, out_rx) = mpsc::channel::<_>(32); - - let child = thread::spawn(|| out_tx.send_all(in_rx.then(|r| r.unwrap())).wait()); - let mut rx_iter = out_rx.wait(); - - // warmup phase; for some reason initial couple of runs are much slower - // - // TODO: Describe the exact reasons; caching? branch predictor? lazy closures? - for _ in 0..8 { - in_tx.start_send(Ok(1usize)).unwrap(); - let _ = rx_iter.next(); - } - - b.iter(|| { - in_tx.start_send(Ok(1usize)).unwrap(); - let _ = rx_iter.next(); - }); - - drop(in_tx); - child.join().unwrap().unwrap(); -} diff --git a/benches/mio-ops.rs b/benches/mio-ops.rs deleted file mode 100644 index be4db7f7..00000000 --- a/benches/mio-ops.rs +++ /dev/null @@ -1,57 +0,0 @@ -// Measure cost of different operations -// to get a sense of performance tradeoffs -#![feature(test)] -#![deny(warnings)] - -extern crate mio; -extern crate test; - -use test::Bencher; - -use mio::tcp::TcpListener; -use mio::{PollOpt, Ready, Token}; - -#[bench] -fn mio_register_deregister(b: &mut Bencher) { - let addr = "127.0.0.1:0".parse().unwrap(); - // Setup the server socket - let sock = TcpListener::bind(&addr).unwrap(); - let poll = mio::Poll::new().unwrap(); - - const CLIENT: Token = Token(1); - - b.iter(|| { - poll.register(&sock, CLIENT, Ready::readable(), PollOpt::edge()) - .unwrap(); - poll.deregister(&sock).unwrap(); - }); -} - -#[bench] -fn mio_reregister(b: &mut Bencher) { - let addr = "127.0.0.1:0".parse().unwrap(); - // Setup the server socket - let sock = TcpListener::bind(&addr).unwrap(); - let poll = mio::Poll::new().unwrap(); - - const CLIENT: Token = Token(1); - poll.register(&sock, CLIENT, Ready::readable(), PollOpt::edge()) - .unwrap(); - - b.iter(|| { - poll.reregister(&sock, CLIENT, Ready::readable(), PollOpt::edge()) - .unwrap(); - }); - poll.deregister(&sock).unwrap(); -} - -#[bench] -fn mio_poll(b: &mut Bencher) { - let poll = mio::Poll::new().unwrap(); - let timeout = std::time::Duration::new(0, 0); - let mut events = mio::Events::with_capacity(1024); - - b.iter(|| { - poll.poll(&mut events, Some(timeout)).unwrap(); - }); -} diff --git a/benches/tcp.rs b/benches/tcp.rs deleted file mode 100644 index 1872790d..00000000 --- a/benches/tcp.rs +++ /dev/null @@ -1,261 +0,0 @@ -#![feature(test)] -#![deny(warnings)] - -extern crate futures; -extern crate tokio; - -#[macro_use] -extern crate tokio_io; - -pub extern crate test; - -mod prelude { - pub use futures::*; - pub use tokio::net::{TcpListener, TcpStream}; - pub use tokio::reactor::Reactor; - pub use tokio_io::io::read_to_end; - - pub use std::io::{self, Read, Write}; - pub use std::thread; - pub use std::time::Duration; - pub use test::{self, Bencher}; -} - -mod connect_churn { - use prelude::*; - - const NUM: usize = 300; - const CONCURRENT: usize = 8; - - #[bench] - fn one_thread(b: &mut Bencher) { - let addr = "127.0.0.1:0".parse().unwrap(); - - b.iter(move || { - let listener = TcpListener::bind(&addr).unwrap(); - let addr = listener.local_addr().unwrap(); - - // Spawn a single future that accepts & drops connections - let serve_incomings = listener - .incoming() - .map_err(|e| panic!("server err: {:?}", e)) - .for_each(|_| Ok(())); - - let connects = stream::iter_result((0..NUM).map(|_| { - Ok(TcpStream::connect(&addr).and_then(|sock| { - sock.set_linger(Some(Duration::from_secs(0))).unwrap(); - read_to_end(sock, vec![]) - })) - })); - - let connects_concurrent = connects - .buffer_unordered(CONCURRENT) - .map_err(|e| panic!("client err: {:?}", e)) - .for_each(|_| Ok(())); - - serve_incomings - .select(connects_concurrent) - .map(|_| ()) - .map_err(|_| ()) - .wait() - .unwrap(); - }); - } - - fn n_workers(n: usize, b: &mut Bencher) { - let (shutdown_tx, shutdown_rx) = sync::oneshot::channel(); - let (addr_tx, addr_rx) = sync::oneshot::channel(); - - // Spawn reactor thread - let server_thread = thread::spawn(move || { - // Bind the TCP listener - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); - - // Get the address being listened on. - let addr = listener.local_addr().unwrap(); - - // Send the remote & address back to the main thread - addr_tx.send(addr).unwrap(); - - // Spawn a single future that accepts & drops connections - let serve_incomings = listener - .incoming() - .map_err(|e| panic!("server err: {:?}", e)) - .for_each(|_| Ok(())); - - // Run server - serve_incomings - .select(shutdown_rx) - .map(|_| ()) - .map_err(|_| ()) - .wait() - .unwrap(); - }); - - // Get the bind addr of the server - let addr = addr_rx.wait().unwrap(); - - b.iter(move || { - use std::sync::{Arc, Barrier}; - - // Create a barrier to coordinate threads - let barrier = Arc::new(Barrier::new(n + 1)); - - // Spawn worker threads - let threads: Vec<_> = (0..n) - .map(|_| { - let barrier = barrier.clone(); - let addr = addr.clone(); - - thread::spawn(move || { - let connects = stream::iter_result((0..(NUM / n)).map(|_| { - Ok(TcpStream::connect(&addr) - .map_err(|e| panic!("connect err: {:?}", e)) - .and_then(|sock| { - sock.set_linger(Some(Duration::from_secs(0))).unwrap(); - read_to_end(sock, vec![]) - })) - })); - - barrier.wait(); - - connects - .buffer_unordered(CONCURRENT) - .map_err(|e| panic!("client err: {:?}", e)) - .for_each(|_| Ok(())) - .wait() - .unwrap(); - }) - }) - .collect(); - - barrier.wait(); - - for th in threads { - th.join().unwrap(); - } - }); - - // Shutdown the server - shutdown_tx.send(()).unwrap(); - server_thread.join().unwrap(); - } - - #[bench] - fn two_threads(b: &mut Bencher) { - n_workers(1, b); - } - - #[bench] - fn multi_threads(b: &mut Bencher) { - n_workers(4, b); - } -} - -mod transfer { - use prelude::*; - use std::{cmp, mem}; - - const MB: usize = 3 * 1024 * 1024; - - struct Drain { - sock: TcpStream, - chunk: usize, - } - - impl Future for Drain { - type Item = (); - type Error = io::Error; - - fn poll(&mut self) -> Poll<(), io::Error> { - let mut buf: [u8; 1024] = unsafe { mem::uninitialized() }; - - loop { - match try_nb!(self.sock.read(&mut buf[..self.chunk])) { - 0 => return Ok(Async::Ready(())), - _ => {} - } - } - } - } - - struct Transfer { - sock: TcpStream, - rem: usize, - chunk: usize, - } - - impl Future for Transfer { - type Item = (); - type Error = io::Error; - - fn poll(&mut self) -> Poll<(), io::Error> { - while self.rem > 0 { - let len = cmp::min(self.rem, self.chunk); - let buf = &DATA[..len]; - - let n = try_nb!(self.sock.write(&buf)); - self.rem -= n; - } - - Ok(Async::Ready(())) - } - } - - static DATA: [u8; 1024] = [0; 1024]; - - fn one_thread(b: &mut Bencher, read_size: usize, write_size: usize) { - let addr = "127.0.0.1:0".parse().unwrap(); - - b.iter(move || { - let listener = TcpListener::bind(&addr).unwrap(); - let addr = listener.local_addr().unwrap(); - - // Spawn a single future that accepts 1 connection, Drain it and drops - let server = listener - .incoming() - .into_future() // take the first connection - .map_err(|(e, _other_incomings)| e) - .map(|(connection, _other_incomings)| connection.unwrap()) - .and_then(|sock| { - sock.set_linger(Some(Duration::from_secs(0))).unwrap(); - let drain = Drain { - sock: sock, - chunk: read_size, - }; - drain - .map(|_| ()) - .map_err(|e| panic!("server error: {:?}", e)) - }) - .map_err(|e| panic!("server err: {:?}", e)); - - let client = TcpStream::connect(&addr) - .and_then(move |sock| Transfer { - sock: sock, - rem: MB, - chunk: write_size, - }) - .map_err(|e| panic!("client err: {:?}", e)); - - server.join(client).wait().unwrap(); - }); - } - - mod small_chunks { - use prelude::*; - - #[bench] - fn one_thread(b: &mut Bencher) { - super::one_thread(b, 32, 32); - } - } - - mod big_chunks { - use prelude::*; - - #[bench] - fn one_thread(b: &mut Bencher) { - super::one_thread(b, 1_024, 1_024); - } - } -} |