diff options
author | Roman <humbug@deeptown.org> | 2018-02-13 22:02:06 +0400 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2018-02-13 10:02:06 -0800 |
commit | 8605d5d24325a85938991686960387c9d8a75317 (patch) | |
tree | 0aa31919df6d731eb3a210eb1c135cf84c71c535 /benches | |
parent | 4704f612776159ca99618099ffcdb06dc7b9bbfa (diff) |
Make benches compilable again (#133)
Diffstat (limited to 'benches')
-rw-r--r-- | benches/latency.rs | 12 | ||||
-rw-r--r-- | benches/tcp.rs | 213 |
2 files changed, 58 insertions, 167 deletions
diff --git a/benches/latency.rs b/benches/latency.rs index 246d033b..f35cb793 100644 --- a/benches/latency.rs +++ b/benches/latency.rs @@ -2,7 +2,6 @@ extern crate test; extern crate futures; -#[macro_use] extern crate tokio; #[macro_use] extern crate tokio_io; @@ -16,7 +15,6 @@ use futures::sync::mpsc; use futures::{Future, Poll, Sink, Stream}; use test::Bencher; use tokio::net::UdpSocket; -use tokio::reactor::Reactor; /// UDP echo server struct EchoServer { @@ -59,16 +57,14 @@ fn udp_echo_latency(b: &mut Bencher) { let (tx, rx) = oneshot::channel(); let child = thread::spawn(move || { - let mut l = Reactor::new().unwrap(); - let handle = l.handle(); - let socket = tokio::net::UdpSocket::bind(&any_addr, &handle).unwrap(); - tx.complete(socket.local_addr().unwrap()); + let socket = tokio::net::UdpSocket::bind(&any_addr).unwrap(); + tx.send(socket.local_addr().unwrap()).unwrap(); let server = EchoServer::new(socket); let server = server.select(stop_p.map_err(|_| panic!())); let server = server.map_err(|_| ()); - l.run(server).unwrap() + server.wait().unwrap(); }); @@ -91,7 +87,7 @@ fn udp_echo_latency(b: &mut Bencher) { let _ = client.recv_from(&mut buf).unwrap(); }); - stop_c.complete(()); + stop_c.send(()).unwrap(); child.join().unwrap(); } diff --git a/benches/tcp.rs b/benches/tcp.rs index 82dc6835..37467d8a 100644 --- a/benches/tcp.rs +++ b/benches/tcp.rs @@ -12,6 +12,7 @@ mod prelude { pub use futures::*; pub use tokio::reactor::Reactor; pub use tokio::net::{TcpListener, TcpStream}; + pub use tokio::executor::current_thread; pub use tokio_io::io::read_to_end; pub use test::{self, Bencher}; @@ -29,68 +30,63 @@ mod connect_churn { #[bench] fn one_thread(b: &mut Bencher) { let addr = "127.0.0.1:0".parse().unwrap(); - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); - let listener = TcpListener::bind(&addr, &handle).unwrap(); - let addr = listener.local_addr().unwrap(); - - // Spawn a single task that accepts & drops connections - handle.spawn( - listener.incoming() - .map_err(|e| panic!("server err: {:?}", e)) - .for_each(|_| Ok(()))); b.iter(move || { - let connects = stream::iter((0..NUM).map(|_| { - Ok(TcpStream::connect(&addr, &handle) + let listener = TcpListener::bind(&addr).unwrap(); + let addr = listener.local_addr().unwrap(); + + // Spawn a single future that accepts & drops connections + let serve_incomings = listener.incoming() + .map_err(|e| panic!("server err: {:?}", e)) + .for_each(|_| Ok(())); + + let connects = stream::iter_result((0..NUM).map(|_| { + Ok(TcpStream::connect(&addr) .and_then(|sock| { sock.set_linger(Some(Duration::from_secs(0))).unwrap(); read_to_end(sock, vec![]) })) })); - core.run( - connects.buffer_unordered(CONCURRENT) - .map_err(|e| panic!("client err: {:?}", e)) - .for_each(|_| Ok(()))).unwrap(); + let connects_concurrent = connects.buffer_unordered(CONCURRENT) + .map_err(|e| panic!("client err: {:?}", e)) + .for_each(|_| Ok(())); + + serve_incomings.select(connects_concurrent) + .map(|_| ()).map_err(|_| ()) + .wait().unwrap(); }); } fn n_workers(n: usize, b: &mut Bencher) { let (shutdown_tx, shutdown_rx) = sync::oneshot::channel(); - let (remote_tx, remote_rx) = ::std::sync::mpsc::channel(); + let (addr_tx, addr_rx) = sync::oneshot::channel(); // Spawn reactor thread - thread::spawn(move || { - // Create the core - let mut core = Reactor::new().unwrap(); - - // Reactor handles - let handle = core.handle(); - let remote = handle.remote().clone(); - + let server_thread = thread::spawn(move || { // Bind the TCP listener let listener = TcpListener::bind( - &"127.0.0.1:0".parse().unwrap(), &handle).unwrap(); + &"127.0.0.1:0".parse().unwrap()).unwrap(); // Get the address being listened on. let addr = listener.local_addr().unwrap(); // Send the remote & address back to the main thread - remote_tx.send((remote, addr)).unwrap(); + addr_tx.send(addr).unwrap(); - // Spawn a single task that accepts & drops connections - handle.spawn( - listener.incoming() - .map_err(|e| panic!("server err: {:?}", e)) - .for_each(|_| Ok(()))); + // Spawn a single future that accepts & drops connections + let serve_incomings = listener.incoming() + .map_err(|e| panic!("server err: {:?}", e)) + .for_each(|_| Ok(())); - // Run the reactor - core.run(shutdown_rx).unwrap(); + // Run server + serve_incomings.select(shutdown_rx) + .map(|_| ()).map_err(|_| ()) + .wait().unwrap(); }); - // Get the remote info - let (remote, addr) = remote_rx.recv().unwrap(); + // Get the bind addr of the server + let addr = addr_rx.wait().unwrap(); b.iter(move || { use std::sync::{Barrier, Arc}; @@ -101,24 +97,12 @@ mod connect_churn { // Spawn worker threads let threads: Vec<_> = (0..n).map(|_| { let barrier = barrier.clone(); - let remote = remote.clone(); let addr = addr.clone(); thread::spawn(move || { - let connects = stream::iter((0..(NUM / n)).map(|_| { - // TODO: Once `Handle` is `Send / Sync`, update this - - let (socket_tx, socket_rx) = sync::oneshot::channel(); - - remote.spawn(move |handle| { - TcpStream::connect(&addr, &handle) - .map_err(|e| panic!("connect err: {:?}", e)) - .then(|res| socket_tx.send(res)) - .map_err(|_| ()) - }); - - Ok(socket_rx - .then(|res| res.unwrap()) + let connects = stream::iter_result((0..(NUM / n)).map(|_| { + Ok(TcpStream::connect(&addr) + .map_err(|e| panic!("connect err: {:?}", e)) .and_then(|sock| { sock.set_linger(Some(Duration::from_secs(0))).unwrap(); read_to_end(sock, vec![]) @@ -140,8 +124,9 @@ mod connect_churn { } }); - // Shutdown the reactor + // Shutdown the server shutdown_tx.send(()).unwrap(); + server_thread.join().unwrap(); } #[bench] @@ -209,118 +194,38 @@ mod transfer { fn one_thread(b: &mut Bencher, read_size: usize, write_size: usize) { let addr = "127.0.0.1:0".parse().unwrap(); - let mut core = Reactor::new().unwrap(); - let handle = core.handle(); - let listener = TcpListener::bind(&addr, &handle).unwrap(); - let addr = listener.local_addr().unwrap(); - let h2 = handle.clone(); + b.iter(move || { + let listener = TcpListener::bind(&addr).unwrap(); + let addr = listener.local_addr().unwrap(); - // Spawn a single task that accepts & drops connections - handle.spawn( - listener.incoming() - .map_err(|e| panic!("server err: {:?}", e)) - .for_each(move |(sock, _)| { + // Spawn a single future that accepts 1 connection, Drain it and drops + let server = listener.incoming() + .into_future() // take the first connection + .map_err(|(e, _other_incomings)| e) + .map(|(connection, _other_incomings)| connection.unwrap()) + .and_then(|sock| { sock.set_linger(Some(Duration::from_secs(0))).unwrap(); let drain = Drain { sock: sock, chunk: read_size, }; + drain.map(|_| ()).map_err(|e| panic!("server error: {:?}", e)) + }) + .map_err(|e| panic!("server err: {:?}", e)); - h2.spawn(drain.map_err(|e| panic!("server error: {:?}", e))); - - Ok(()) - })); - - b.iter(move || { - let client = TcpStream::connect(&addr, &handle) - .and_then(|sock| { + let client = TcpStream::connect(&addr) + .and_then(move |sock| { Transfer { sock: sock, rem: MB, chunk: write_size, } - }); - - core.run( - client.map_err(|e| panic!("client err: {:?}", e)) - ).unwrap(); - }); - } - - fn cross_thread(b: &mut Bencher, read_size: usize, write_size: usize) { - let (shutdown_tx, shutdown_rx) = sync::oneshot::channel(); - let (remote_tx, remote_rx) = ::std::sync::mpsc::channel(); - - // Spawn reactor thread - thread::spawn(move || { - // Create the core - let mut core = Reactor::new().unwrap(); - - // Reactor handles - let handle = core.handle(); - let remote = handle.remote().clone(); - - remote_tx.send(remote).unwrap(); - core.run(shutdown_rx).unwrap(); - }); - - let remote = remote_rx.recv().unwrap(); - - b.iter(move || { - let (server_tx, server_rx) = sync::oneshot::channel(); - let (client_tx, client_rx) = sync::oneshot::channel(); - - remote.spawn(|handle| { - let sock = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &handle).unwrap(); - server_tx.send(sock).unwrap(); - Ok(()) - }); - - let remote2 = remote.clone(); - - server_rx.and_then(move |server| { - let addr = server.local_addr().unwrap(); - - remote2.spawn(move |handle| { - let fut = TcpStream::connect(&addr, &handle); - client_tx.send(fut).ok().unwrap(); - Ok(()) - }); - - let client = client_rx - .then(|res| res.unwrap()) - .and_then(move |sock| { - Transfer { - sock: sock, - rem: MB, - chunk: write_size, - } - }); - - let server = server.incoming().into_future() - .map_err(|(e, _)| e) - .and_then(move |(sock, _)| { - let sock = sock.unwrap().0; - sock.set_linger(Some(Duration::from_secs(0))).unwrap(); + }) + .map_err(|e| panic!("client err: {:?}", e)); - Drain { - sock: sock, - chunk: read_size, - } - }); - - client - .join(server) - .then(|res| { - let _ = res.unwrap(); - Ok(()) - }) - }).wait().unwrap(); + server.join(client).wait().unwrap(); }); - - // Shutdown the reactor - shutdown_tx.send(()).unwrap(); } mod small_chunks { @@ -330,11 +235,6 @@ mod transfer { fn one_thread(b: &mut Bencher) { super::one_thread(b, 32, 32); } - - #[bench] - fn cross_thread(b: &mut Bencher) { - super::cross_thread(b, 32, 32); - } } mod big_chunks { @@ -344,10 +244,5 @@ mod transfer { fn one_thread(b: &mut Bencher) { super::one_thread(b, 1_024, 1_024); } - - #[bench] - fn cross_thread(b: &mut Bencher) { - super::cross_thread(b, 1_024, 1_024); - } } } |