diff options
Diffstat (limited to 'benches/tcp.rs')
-rw-r--r-- | benches/tcp.rs | 117 |
1 files changed, 65 insertions, 52 deletions
diff --git a/benches/tcp.rs b/benches/tcp.rs index fde72ce0..1872790d 100644 --- a/benches/tcp.rs +++ b/benches/tcp.rs @@ -11,18 +11,18 @@ pub extern crate test; mod prelude { pub use futures::*; - pub use tokio::reactor::Reactor; pub use tokio::net::{TcpListener, TcpStream}; + pub use tokio::reactor::Reactor; pub use tokio_io::io::read_to_end; - pub use test::{self, Bencher}; + pub use std::io::{self, Read, Write}; pub use std::thread; pub use std::time::Duration; - pub use std::io::{self, Read, Write}; + pub use test::{self, Bencher}; } mod connect_churn { - use ::prelude::*; + use prelude::*; const NUM: usize = 300; const CONCURRENT: usize = 8; @@ -36,25 +36,29 @@ mod connect_churn { let addr = listener.local_addr().unwrap(); // Spawn a single future that accepts & drops connections - let serve_incomings = listener.incoming() + let serve_incomings = listener + .incoming() .map_err(|e| panic!("server err: {:?}", e)) .for_each(|_| Ok(())); let connects = stream::iter_result((0..NUM).map(|_| { - Ok(TcpStream::connect(&addr) - .and_then(|sock| { - sock.set_linger(Some(Duration::from_secs(0))).unwrap(); - read_to_end(sock, vec![]) - })) + Ok(TcpStream::connect(&addr).and_then(|sock| { + sock.set_linger(Some(Duration::from_secs(0))).unwrap(); + read_to_end(sock, vec![]) + })) })); - let connects_concurrent = connects.buffer_unordered(CONCURRENT) + let connects_concurrent = connects + .buffer_unordered(CONCURRENT) .map_err(|e| panic!("client err: {:?}", e)) .for_each(|_| Ok(())); - serve_incomings.select(connects_concurrent) - .map(|_| ()).map_err(|_| ()) - .wait().unwrap(); + serve_incomings + .select(connects_concurrent) + .map(|_| ()) + .map_err(|_| ()) + .wait() + .unwrap(); }); } @@ -65,8 +69,7 @@ mod connect_churn { // Spawn reactor thread let server_thread = thread::spawn(move || { // Bind the TCP listener - let listener = TcpListener::bind( - &"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); // Get the address being listened on. let addr = listener.local_addr().unwrap(); @@ -75,47 +78,56 @@ mod connect_churn { addr_tx.send(addr).unwrap(); // Spawn a single future that accepts & drops connections - let serve_incomings = listener.incoming() + let serve_incomings = listener + .incoming() .map_err(|e| panic!("server err: {:?}", e)) .for_each(|_| Ok(())); // Run server - serve_incomings.select(shutdown_rx) - .map(|_| ()).map_err(|_| ()) - .wait().unwrap(); + serve_incomings + .select(shutdown_rx) + .map(|_| ()) + .map_err(|_| ()) + .wait() + .unwrap(); }); // Get the bind addr of the server let addr = addr_rx.wait().unwrap(); b.iter(move || { - use std::sync::{Barrier, Arc}; + use std::sync::{Arc, Barrier}; // Create a barrier to coordinate threads let barrier = Arc::new(Barrier::new(n + 1)); // Spawn worker threads - let threads: Vec<_> = (0..n).map(|_| { - let barrier = barrier.clone(); - let addr = addr.clone(); - - thread::spawn(move || { - let connects = stream::iter_result((0..(NUM / n)).map(|_| { - Ok(TcpStream::connect(&addr) - .map_err(|e| panic!("connect err: {:?}", e)) - .and_then(|sock| { - sock.set_linger(Some(Duration::from_secs(0))).unwrap(); - read_to_end(sock, vec![]) - })) - })); - - barrier.wait(); - - connects.buffer_unordered(CONCURRENT) - .map_err(|e| panic!("client err: {:?}", e)) - .for_each(|_| Ok(())).wait().unwrap(); + let threads: Vec<_> = (0..n) + .map(|_| { + let barrier = barrier.clone(); + let addr = addr.clone(); + + thread::spawn(move || { + let connects = stream::iter_result((0..(NUM / n)).map(|_| { + Ok(TcpStream::connect(&addr) + .map_err(|e| panic!("connect err: {:?}", e)) + .and_then(|sock| { + sock.set_linger(Some(Duration::from_secs(0))).unwrap(); + read_to_end(sock, vec![]) + })) + })); + + barrier.wait(); + + connects + .buffer_unordered(CONCURRENT) + .map_err(|e| panic!("client err: {:?}", e)) + .for_each(|_| Ok(())) + .wait() + .unwrap(); + }) }) - }).collect(); + .collect(); barrier.wait(); @@ -141,7 +153,7 @@ mod connect_churn { } mod transfer { - use ::prelude::*; + use prelude::*; use std::{cmp, mem}; const MB: usize = 3 * 1024 * 1024; @@ -200,7 +212,8 @@ mod transfer { let addr = listener.local_addr().unwrap(); // Spawn a single future that accepts 1 connection, Drain it and drops - let server = listener.incoming() + let server = listener + .incoming() .into_future() // take the first connection .map_err(|(e, _other_incomings)| e) .map(|(connection, _other_incomings)| connection.unwrap()) @@ -210,17 +223,17 @@ mod transfer { sock: sock, chunk: read_size, }; - drain.map(|_| ()).map_err(|e| panic!("server error: {:?}", e)) + drain + .map(|_| ()) + .map_err(|e| panic!("server error: {:?}", e)) }) .map_err(|e| panic!("server err: {:?}", e)); let client = TcpStream::connect(&addr) - .and_then(move |sock| { - Transfer { - sock: sock, - rem: MB, - chunk: write_size, - } + .and_then(move |sock| Transfer { + sock: sock, + rem: MB, + chunk: write_size, }) .map_err(|e| panic!("client err: {:?}", e)); @@ -229,7 +242,7 @@ mod transfer { } mod small_chunks { - use ::prelude::*; + use prelude::*; #[bench] fn one_thread(b: &mut Bencher) { @@ -238,7 +251,7 @@ mod transfer { } mod big_chunks { - use ::prelude::*; + use prelude::*; #[bench] fn one_thread(b: &mut Bencher) { |