summaryrefslogtreecommitdiffstats
path: root/benches
diff options
context:
space:
mode:
authorDawid Ciężarkiewicz <dpc@dpc.pw>2016-11-08 19:01:37 -0800
committerDawid Ciężarkiewicz <dpc@dpc.pw>2016-11-08 19:01:37 -0800
commitd1a2a9d3242ab9e93bde1fa43219f1159f2eee4d (patch)
tree6d273341685b3562ffde13c2f28b4d7678c6ae19 /benches
parent8e7ed8ae21e448c664d42b48091472d86bbbc2c7 (diff)
Add `channel_lantency` and fix previous issues.
Warmup round before actually performing the test seems to eliminate variance.
Diffstat (limited to 'benches')
-rw-r--r--benches/latency.rs123
1 files changed, 118 insertions, 5 deletions
diff --git a/benches/latency.rs b/benches/latency.rs
index a4b91bb5..4de7d2ee 100644
--- a/benches/latency.rs
+++ b/benches/latency.rs
@@ -18,17 +18,19 @@ use test::Bencher;
use std::thread;
use std::time::Duration;
+use futures::stream::Stream;
+
/// UDP echo server
-struct Server {
+struct EchoServer {
socket : UdpSocket,
buf : Vec<u8>,
to_send : Option<(usize, SocketAddr)>,
stop : Arc<AtomicBool>,
}
-impl Server {
+impl EchoServer {
fn new(s : UdpSocket, stop : Arc<AtomicBool>) -> Self {
- Server {
+ EchoServer {
socket: s,
to_send: None,
buf: vec![0u8; 1600],
@@ -37,7 +39,7 @@ impl Server {
}
}
-impl Future for Server {
+impl Future for EchoServer {
type Item = ();
type Error = io::Error;
@@ -63,6 +65,55 @@ impl Future for Server {
}
}
}
+
+/// UDP echo server
+struct ChanServer {
+ rx : tokio_core::channel::Receiver<usize>,
+ tx : tokio_core::channel::Sender<usize>,
+ buf : Option<usize>,
+ stop : Arc<AtomicBool>,
+}
+
+impl ChanServer {
+ fn new(tx : tokio_core::channel::Sender<usize>, rx : tokio_core::channel::Receiver<usize>, stop : Arc<AtomicBool>) -> Self {
+ ChanServer {
+ rx: rx,
+ tx: tx,
+ buf: None,
+ stop: stop,
+ }
+ }
+}
+
+impl Future for ChanServer {
+ type Item = ();
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(), io::Error> {
+ loop {
+ if self.stop.load(Ordering::SeqCst) {
+ return Ok(futures::Async::Ready(()))
+ }
+
+ if let Some(u) = self.buf.take() {
+ match self.tx.send(u) {
+ Err(e) => {
+ self.buf = Some(u);
+ return try_nb!(Err(e));
+ },
+ Ok(_) => { }
+ }
+ }
+
+ match self.rx.poll() {
+ Ok(futures::Async::Ready(None)) => return Ok(futures::Async::Ready(())),
+ Ok(futures::Async::Ready(Some(t))) => { self.buf = Some(t) },
+ Ok(futures::Async::NotReady) => return Ok(futures::Async::NotReady),
+ Err(e) => return try_nb!(Err(e)),
+ }
+ }
+ }
+}
#[bench]
fn udp_echo_latency(b: &mut Bencher) {
let server_addr= "127.0.0.1:7398".to_string();
@@ -79,7 +130,7 @@ fn udp_echo_latency(b: &mut Bencher) {
let socket = tokio_core::net::UdpSocket::bind(&server_addr, &handle).unwrap();
- let server = Server::new(socket, stop);
+ let server = EchoServer::new(socket, stop);
l.run(server).unwrap();
});
@@ -91,6 +142,16 @@ fn udp_echo_latency(b: &mut Bencher) {
let client = std::net::UdpSocket::bind(client_addr).unwrap();
let mut buf = [0u8; 1000];
+
+ // warmup phase; for some reason initial couple of
+ // rounds is much slower
+ //
+ // TODO: Describe the exact reasons; caching? branch predictor? lazy closures?
+ for _ in 0..1000 {
+ client.send_to(&buf, &server_addr).unwrap();
+ let _ = client.recv_from(&mut buf).unwrap();
+ }
+
b.iter(|| {
client.send_to(&buf, &server_addr).unwrap();
let _ = client.recv_from(&mut buf).unwrap();
@@ -103,3 +164,55 @@ fn udp_echo_latency(b: &mut Bencher) {
child.join().unwrap();
}
+
+#[bench]
+fn channel_latency(b: &mut Bencher) {
+ let stop = Arc::new(AtomicBool::new(false));
+ let stop2 = stop.clone();
+
+ // TODO: Any way to start Loop on a separate thread and yet get
+ // a tokio_core::channel to it?
+ let (tx, rx) = std::sync::mpsc::channel();
+
+ let child = thread::spawn(move || {
+ let mut l = Core::new().unwrap();
+ let handle = l.handle();
+
+ let (in_tx, in_rx) = tokio_core::channel::channel(&handle).unwrap();
+ let (out_tx, out_rx) = tokio_core::channel::channel(&handle).unwrap();
+
+ let server = ChanServer::new(out_tx, in_rx, stop);
+
+ tx.send((in_tx, out_rx)).unwrap();
+ l.run(server).unwrap();
+ });
+
+ let (in_tx, out_rx) = rx.recv().unwrap();
+
+ // TODO: More reliable way to bind server socket and start server
+ // first
+ thread::sleep(Duration::from_millis(100));
+
+ let mut rx_iter = out_rx.wait();
+
+ // warmup phase; for some reason initial couple of
+ // rounds is much slower
+ //
+ // TODO: Describe the exact reasons; caching? branch predictor? lazy closures?
+ for _ in 0..1000 {
+ in_tx.send(1usize).unwrap();
+ let _ = rx_iter.next();
+ }
+
+ b.iter(|| {
+ in_tx.send(1usize).unwrap();
+ let _ = rx_iter.next();
+ });
+
+ // Stop the server; TODO: Use better method
+ stop2.store(true, Ordering::SeqCst);
+ thread::sleep(Duration::from_millis(1));
+ in_tx.send(1usize).unwrap();
+
+ child.join().unwrap();
+}