summaryrefslogtreecommitdiffstats
path: root/benches
diff options
context:
space:
mode:
authorRoman <humbug@deeptown.org>2018-02-13 22:02:06 +0400
committerCarl Lerche <me@carllerche.com>2018-02-13 10:02:06 -0800
commit8605d5d24325a85938991686960387c9d8a75317 (patch)
tree0aa31919df6d731eb3a210eb1c135cf84c71c535 /benches
parent4704f612776159ca99618099ffcdb06dc7b9bbfa (diff)
Make benches compilable again (#133)
Diffstat (limited to 'benches')
-rw-r--r--benches/latency.rs12
-rw-r--r--benches/tcp.rs213
2 files changed, 58 insertions, 167 deletions
diff --git a/benches/latency.rs b/benches/latency.rs
index 246d033b..f35cb793 100644
--- a/benches/latency.rs
+++ b/benches/latency.rs
@@ -2,7 +2,6 @@
extern crate test;
extern crate futures;
-#[macro_use]
extern crate tokio;
#[macro_use]
extern crate tokio_io;
@@ -16,7 +15,6 @@ use futures::sync::mpsc;
use futures::{Future, Poll, Sink, Stream};
use test::Bencher;
use tokio::net::UdpSocket;
-use tokio::reactor::Reactor;
/// UDP echo server
struct EchoServer {
@@ -59,16 +57,14 @@ fn udp_echo_latency(b: &mut Bencher) {
let (tx, rx) = oneshot::channel();
let child = thread::spawn(move || {
- let mut l = Reactor::new().unwrap();
- let handle = l.handle();
- let socket = tokio::net::UdpSocket::bind(&any_addr, &handle).unwrap();
- tx.complete(socket.local_addr().unwrap());
+ let socket = tokio::net::UdpSocket::bind(&any_addr).unwrap();
+ tx.send(socket.local_addr().unwrap()).unwrap();
let server = EchoServer::new(socket);
let server = server.select(stop_p.map_err(|_| panic!()));
let server = server.map_err(|_| ());
- l.run(server).unwrap()
+ server.wait().unwrap();
});
@@ -91,7 +87,7 @@ fn udp_echo_latency(b: &mut Bencher) {
let _ = client.recv_from(&mut buf).unwrap();
});
- stop_c.complete(());
+ stop_c.send(()).unwrap();
child.join().unwrap();
}
diff --git a/benches/tcp.rs b/benches/tcp.rs
index 82dc6835..37467d8a 100644
--- a/benches/tcp.rs
+++ b/benches/tcp.rs
@@ -12,6 +12,7 @@ mod prelude {
pub use futures::*;
pub use tokio::reactor::Reactor;
pub use tokio::net::{TcpListener, TcpStream};
+ pub use tokio::executor::current_thread;
pub use tokio_io::io::read_to_end;
pub use test::{self, Bencher};
@@ -29,68 +30,63 @@ mod connect_churn {
#[bench]
fn one_thread(b: &mut Bencher) {
let addr = "127.0.0.1:0".parse().unwrap();
- let mut core = Reactor::new().unwrap();
- let handle = core.handle();
- let listener = TcpListener::bind(&addr, &handle).unwrap();
- let addr = listener.local_addr().unwrap();
-
- // Spawn a single task that accepts & drops connections
- handle.spawn(
- listener.incoming()
- .map_err(|e| panic!("server err: {:?}", e))
- .for_each(|_| Ok(())));
b.iter(move || {
- let connects = stream::iter((0..NUM).map(|_| {
- Ok(TcpStream::connect(&addr, &handle)
+ let listener = TcpListener::bind(&addr).unwrap();
+ let addr = listener.local_addr().unwrap();
+
+ // Spawn a single future that accepts & drops connections
+ let serve_incomings = listener.incoming()
+ .map_err(|e| panic!("server err: {:?}", e))
+ .for_each(|_| Ok(()));
+
+ let connects = stream::iter_result((0..NUM).map(|_| {
+ Ok(TcpStream::connect(&addr)
.and_then(|sock| {
sock.set_linger(Some(Duration::from_secs(0))).unwrap();
read_to_end(sock, vec![])
}))
}));
- core.run(
- connects.buffer_unordered(CONCURRENT)
- .map_err(|e| panic!("client err: {:?}", e))
- .for_each(|_| Ok(()))).unwrap();
+ let connects_concurrent = connects.buffer_unordered(CONCURRENT)
+ .map_err(|e| panic!("client err: {:?}", e))
+ .for_each(|_| Ok(()));
+
+ serve_incomings.select(connects_concurrent)
+ .map(|_| ()).map_err(|_| ())
+ .wait().unwrap();
});
}
fn n_workers(n: usize, b: &mut Bencher) {
let (shutdown_tx, shutdown_rx) = sync::oneshot::channel();
- let (remote_tx, remote_rx) = ::std::sync::mpsc::channel();
+ let (addr_tx, addr_rx) = sync::oneshot::channel();
// Spawn reactor thread
- thread::spawn(move || {
- // Create the core
- let mut core = Reactor::new().unwrap();
-
- // Reactor handles
- let handle = core.handle();
- let remote = handle.remote().clone();
-
+ let server_thread = thread::spawn(move || {
// Bind the TCP listener
let listener = TcpListener::bind(
- &"127.0.0.1:0".parse().unwrap(), &handle).unwrap();
+ &"127.0.0.1:0".parse().unwrap()).unwrap();
// Get the address being listened on.
let addr = listener.local_addr().unwrap();
// Send the remote & address back to the main thread
- remote_tx.send((remote, addr)).unwrap();
+ addr_tx.send(addr).unwrap();
- // Spawn a single task that accepts & drops connections
- handle.spawn(
- listener.incoming()
- .map_err(|e| panic!("server err: {:?}", e))
- .for_each(|_| Ok(())));
+ // Spawn a single future that accepts & drops connections
+ let serve_incomings = listener.incoming()
+ .map_err(|e| panic!("server err: {:?}", e))
+ .for_each(|_| Ok(()));
- // Run the reactor
- core.run(shutdown_rx).unwrap();
+ // Run server
+ serve_incomings.select(shutdown_rx)
+ .map(|_| ()).map_err(|_| ())
+ .wait().unwrap();
});
- // Get the remote info
- let (remote, addr) = remote_rx.recv().unwrap();
+ // Get the bind addr of the server
+ let addr = addr_rx.wait().unwrap();
b.iter(move || {
use std::sync::{Barrier, Arc};
@@ -101,24 +97,12 @@ mod connect_churn {
// Spawn worker threads
let threads: Vec<_> = (0..n).map(|_| {
let barrier = barrier.clone();
- let remote = remote.clone();
let addr = addr.clone();
thread::spawn(move || {
- let connects = stream::iter((0..(NUM / n)).map(|_| {
- // TODO: Once `Handle` is `Send / Sync`, update this
-
- let (socket_tx, socket_rx) = sync::oneshot::channel();
-
- remote.spawn(move |handle| {
- TcpStream::connect(&addr, &handle)
- .map_err(|e| panic!("connect err: {:?}", e))
- .then(|res| socket_tx.send(res))
- .map_err(|_| ())
- });
-
- Ok(socket_rx
- .then(|res| res.unwrap())
+ let connects = stream::iter_result((0..(NUM / n)).map(|_| {
+ Ok(TcpStream::connect(&addr)
+ .map_err(|e| panic!("connect err: {:?}", e))
.and_then(|sock| {
sock.set_linger(Some(Duration::from_secs(0))).unwrap();
read_to_end(sock, vec![])
@@ -140,8 +124,9 @@ mod connect_churn {
}
});
- // Shutdown the reactor
+ // Shutdown the server
shutdown_tx.send(()).unwrap();
+ server_thread.join().unwrap();
}
#[bench]
@@ -209,118 +194,38 @@ mod transfer {
fn one_thread(b: &mut Bencher, read_size: usize, write_size: usize) {
let addr = "127.0.0.1:0".parse().unwrap();
- let mut core = Reactor::new().unwrap();
- let handle = core.handle();
- let listener = TcpListener::bind(&addr, &handle).unwrap();
- let addr = listener.local_addr().unwrap();
- let h2 = handle.clone();
+ b.iter(move || {
+ let listener = TcpListener::bind(&addr).unwrap();
+ let addr = listener.local_addr().unwrap();
- // Spawn a single task that accepts & drops connections
- handle.spawn(
- listener.incoming()
- .map_err(|e| panic!("server err: {:?}", e))
- .for_each(move |(sock, _)| {
+ // Spawn a single future that accepts 1 connection, Drain it and drops
+ let server = listener.incoming()
+ .into_future() // take the first connection
+ .map_err(|(e, _other_incomings)| e)
+ .map(|(connection, _other_incomings)| connection.unwrap())
+ .and_then(|sock| {
sock.set_linger(Some(Duration::from_secs(0))).unwrap();
let drain = Drain {
sock: sock,
chunk: read_size,
};
+ drain.map(|_| ()).map_err(|e| panic!("server error: {:?}", e))
+ })
+ .map_err(|e| panic!("server err: {:?}", e));
- h2.spawn(drain.map_err(|e| panic!("server error: {:?}", e)));
-
- Ok(())
- }));
-
- b.iter(move || {
- let client = TcpStream::connect(&addr, &handle)
- .and_then(|sock| {
+ let client = TcpStream::connect(&addr)
+ .and_then(move |sock| {
Transfer {
sock: sock,
rem: MB,
chunk: write_size,
}
- });
-
- core.run(
- client.map_err(|e| panic!("client err: {:?}", e))
- ).unwrap();
- });
- }
-
- fn cross_thread(b: &mut Bencher, read_size: usize, write_size: usize) {
- let (shutdown_tx, shutdown_rx) = sync::oneshot::channel();
- let (remote_tx, remote_rx) = ::std::sync::mpsc::channel();
-
- // Spawn reactor thread
- thread::spawn(move || {
- // Create the core
- let mut core = Reactor::new().unwrap();
-
- // Reactor handles
- let handle = core.handle();
- let remote = handle.remote().clone();
-
- remote_tx.send(remote).unwrap();
- core.run(shutdown_rx).unwrap();
- });
-
- let remote = remote_rx.recv().unwrap();
-
- b.iter(move || {
- let (server_tx, server_rx) = sync::oneshot::channel();
- let (client_tx, client_rx) = sync::oneshot::channel();
-
- remote.spawn(|handle| {
- let sock = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &handle).unwrap();
- server_tx.send(sock).unwrap();
- Ok(())
- });
-
- let remote2 = remote.clone();
-
- server_rx.and_then(move |server| {
- let addr = server.local_addr().unwrap();
-
- remote2.spawn(move |handle| {
- let fut = TcpStream::connect(&addr, &handle);
- client_tx.send(fut).ok().unwrap();
- Ok(())
- });
-
- let client = client_rx
- .then(|res| res.unwrap())
- .and_then(move |sock| {
- Transfer {
- sock: sock,
- rem: MB,
- chunk: write_size,
- }
- });
-
- let server = server.incoming().into_future()
- .map_err(|(e, _)| e)
- .and_then(move |(sock, _)| {
- let sock = sock.unwrap().0;
- sock.set_linger(Some(Duration::from_secs(0))).unwrap();
+ })
+ .map_err(|e| panic!("client err: {:?}", e));
- Drain {
- sock: sock,
- chunk: read_size,
- }
- });
-
- client
- .join(server)
- .then(|res| {
- let _ = res.unwrap();
- Ok(())
- })
- }).wait().unwrap();
+ server.join(client).wait().unwrap();
});
-
- // Shutdown the reactor
- shutdown_tx.send(()).unwrap();
}
mod small_chunks {
@@ -330,11 +235,6 @@ mod transfer {
fn one_thread(b: &mut Bencher) {
super::one_thread(b, 32, 32);
}
-
- #[bench]
- fn cross_thread(b: &mut Bencher) {
- super::cross_thread(b, 32, 32);
- }
}
mod big_chunks {
@@ -344,10 +244,5 @@ mod transfer {
fn one_thread(b: &mut Bencher) {
super::one_thread(b, 1_024, 1_024);
}
-
- #[bench]
- fn cross_thread(b: &mut Bencher) {
- super::cross_thread(b, 1_024, 1_024);
- }
}
}