diff options
34 files changed, 89 insertions, 1041 deletions
diff --git a/examples/chat.rs b/examples/chat.rs index 2553cc5e..91589072 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -49,7 +49,9 @@ async fn main() -> Result<(), Box<dyn Error>> { // client connection. let state = Arc::new(Mutex::new(Shared::new())); - let addr = env::args().nth(1).unwrap_or("127.0.0.1:6142".to_string()); + let addr = env::args() + .nth(1) + .unwrap_or_else(|| "127.0.0.1:6142".to_string()); // Bind a TCP listener to the socket address. // diff --git a/examples/connect.rs b/examples/connect.rs index cdd18e19..d51af88c 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -36,10 +36,9 @@ async fn main() -> Result<(), Box<dyn Error>> { }; // Parse what address we're going to connect to - let addr = match args.first() { - Some(addr) => addr, - None => Err("this program requires at least one argument")?, - }; + let addr = args + .first() + .ok_or("this program requires at least one argument")?; let addr = addr.parse::<SocketAddr>()?; let stdin = FramedRead::new(io::stdin(), codec::Bytes); @@ -163,7 +162,7 @@ mod codec { type Error = io::Error; fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Vec<u8>>> { - if buf.len() > 0 { + if !buf.is_empty() { let len = buf.len(); Ok(Some(buf.split_to(len).into_iter().collect())) } else { diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs index f1e8134d..d8b2af9c 100644 --- a/examples/echo-udp.rs +++ b/examples/echo-udp.rs @@ -51,7 +51,9 @@ impl Server { #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { - let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = env::args() + .nth(1) + .unwrap_or_else(|| "127.0.0.1:8080".to_string()); let socket = UdpSocket::bind(&addr).await?; println!("Listening on: {}", socket.local_addr()?); diff --git a/examples/echo.rs b/examples/echo.rs index 455aebde..35b12279 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -33,7 +33,9 @@ async fn main() -> Result<(), Box<dyn Error>> { // Allow passing an address to listen on as the first argument of this // program, but otherwise we'll just set up our TCP listener on // 127.0.0.1:8080 for connections. - let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = env::args() + .nth(1) + .unwrap_or_else(|| "127.0.0.1:8080".to_string()); // Next up we create a TCP listener which will listen for incoming // connections. This TCP listener is bound to the address we determined diff --git a/examples/print_each_packet.rs b/examples/print_each_packet.rs index f056db4a..4604139b 100644 --- a/examples/print_each_packet.rs +++ b/examples/print_each_packet.rs @@ -65,7 +65,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { // Allow passing an address to listen on as the first argument of this // program, but otherwise we'll just set up our TCP listener on // 127.0.0.1:8080 for connections. - let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = env::args() + .nth(1) + .unwrap_or_else(|| "127.0.0.1:8080".to_string()); // Next up we create a TCP listener which will listen for incoming // connections. This TCP listener is bound to the address we determined diff --git a/examples/proxy.rs b/examples/proxy.rs index 48f8f057..f7a9111f 100644 --- a/examples/proxy.rs +++ b/examples/proxy.rs @@ -32,8 +32,12 @@ use std::error::Error; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { - let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string()); - let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string()); + let listen_addr = env::args() + .nth(1) + .unwrap_or_else(|| "127.0.0.1:8081".to_string()); + let server_addr = env::args() + .nth(2) + .unwrap_or_else(|| "127.0.0.1:8080".to_string()); println!("Listening on: {}", listen_addr); println!("Proxying to: {}", server_addr); diff --git a/examples/tinydb.rs b/examples/tinydb.rs index 3fc88f6b..cf867a0a 100644 --- a/examples/tinydb.rs +++ b/examples/tinydb.rs @@ -84,7 +84,9 @@ enum Response { async fn main() -> Result<(), Box<dyn Error>> { // Parse the address we're going to run this server on // and set up our TCP listener to accept connections. - let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = env::args() + .nth(1) + .unwrap_or_else(|| "127.0.0.1:8080".to_string()); let mut listener = TcpListener::bind(&addr).await?; println!("Listening on: {}", addr); @@ -175,15 +177,12 @@ fn handle_request(line: &str, db: &Arc<Database>) -> Response { impl Request { fn parse(input: &str) -> Result<Request, String> { - let mut parts = input.splitn(3, " "); + let mut parts = input.splitn(3, ' '); match parts.next() { Some("GET") => { - let key = match parts.next() { - Some(key) => key, - None => return Err(format!("GET must be followed by a key")), - }; + let key = parts.next().ok_or("GET must be followed by a key")?; if parts.next().is_some() { - return Err(format!("GET's key must not be followed by anything")); + return Err("GET's key must not be followed by anything".into()); } Ok(Request::Get { key: key.to_string(), @@ -192,11 +191,11 @@ impl Request { Some("SET") => { let key = match parts.next() { Some(key) => key, - None => return Err(format!("SET must be followed by a key")), + None => return Err("SET must be followed by a key".into()), }; let value = match parts.next() { Some(value) => value, - None => return Err(format!("SET needs a value")), + None => return Err("SET needs a value".into()), }; Ok(Request::Set { key: key.to_string(), @@ -204,7 +203,7 @@ impl Request { }) } Some(cmd) => Err(format!("unknown command: {}", cmd)), - None => Err(format!("empty input")), + None => Err("empty input".into()), } } } diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs index f8731b9f..5ddf0d48 100644 --- a/examples/tinyhttp.rs +++ b/examples/tinyhttp.rs @@ -27,7 +27,9 @@ use tokio_util::codec::{Decoder, Encoder, Framed}; async fn main() -> Result<(), Box<dyn Error>> { // Parse the arguments, bind the TCP socket we'll be listening to, spin up // our worker threads, and start shipping sockets to those worker threads. - let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = env::args() + .nth(1) + .unwrap_or_else(|| "127.0.0.1:8080".to_string()); let mut server = TcpListener::bind(&addr).await?; let mut incoming = server.incoming(); println!("Listening on: {}", addr); diff --git a/examples/udp-client.rs b/examples/udp-client.rs index 5437daf6..a191033d 100644 --- a/examples/udp-client.rs +++ b/examples/udp-client.rs @@ -44,7 +44,7 @@ fn get_stdin_data() -> Result<Vec<u8>, Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn Error>> { let remote_addr: SocketAddr = env::args() .nth(1) - .unwrap_or("127.0.0.1:8080".into()) + .unwrap_or_else(|| "127.0.0.1:8080".into()) .parse()?; // We use port 0 to let the operating system allocate an available port for us. diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index 0c9dbf76..6b3f84a0 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -22,7 +22,9 @@ use std::time::Duration; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { - let addr = env::args().nth(1).unwrap_or("127.0.0.1:0".to_string()); + let addr = env::args() + .nth(1) + .unwrap_or_else(|| "127.0.0.1:0".to_string()); // Bind both our sockets and then figure out what ports we got. let a = UdpSocket::bind(&addr).await?; diff --git a/tokio-test/src/task.rs b/tokio-test/src/task.rs index c21e31a5..4790de54 100644 --- a/tokio-test/src/task.rs +++ b/tokio-test/src/task.rs @@ -1,5 +1,7 @@ //! Futures task based helpers +#![allow(clippy::mutex_atomic)] + use futures_core::Stream; use std::future::Future; use std::mem; diff --git a/tokio-test/tests/block_on.rs b/tokio-test/tests/block_on.rs index 7aec82cc..d640a13c 100644 --- a/tokio-test/tests/block_on.rs +++ b/tokio-test/tests/block_on.rs @@ -20,10 +20,8 @@ fn async_fn() { #[test] fn test_delay() { let deadline = Instant::now() + Duration::from_millis(100); - assert_eq!( - (), - block_on(async { - delay_until(deadline).await; - }) - ); + + block_on(async { + delay_until(deadline).await; + }); } diff --git a/tokio-util/tests/framed_write.rs b/tokio-util/tests/framed_write.rs index 706e6792..b2970c39 100644 --- a/tokio-util/tests/framed_write.rs +++ b/tokio-util/tests/framed_write.rs @@ -82,7 +82,7 @@ fn write_hits_backpressure() { // Append to the end match mock.calls.back_mut().unwrap() { - &mut Ok(ref mut data) => { + Ok(ref mut data) => { // Write in 2kb chunks if data.len() < ITER { data.extend_from_slice(&b[..]); diff --git a/tokio/benches/latency.rs b/tokio/benches/latency.rs deleted file mode 100644 index b44335fd..00000000 --- a/tokio/benches/latency.rs +++ /dev/null @@ -1,114 +0,0 @@ -#![cfg(feature = "broken")] -#![feature(test)] -#![warn(rust_2018_idioms)] - -extern crate test; - -use std::io; -use std::net::SocketAddr; -use std::thread; - -use futures::sync::mpsc; -use futures::sync::oneshot; -use futures::try_ready; -use futures::{Future, Poll, Sink, Stream}; -use test::Bencher; -use tokio::net::UdpSocket; - -/// UDP echo server -struct EchoServer { - socket: UdpSocket, - buf: Vec<u8>, - to_send: Option<(usize, SocketAddr)>, -} - -impl EchoServer { - fn new(s: UdpSocket) -> Self { - EchoServer { - socket: s, - to_send: None, - buf: vec![0u8; 1600], - } - } -} - -impl Future for EchoServer { - type Item = (); - type Error = io::Error; - - fn poll(&mut self) -> Poll<(), io::Error> { - loop { - if let Some(&(size, peer)) = self.to_send.as_ref() { - try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer)); - self.to_send = None; - } - self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf))); - } - } -} - -#[bench] -fn udp_echo_latency(b: &mut Bencher) { - let any_addr = "127.0.0.1:0".to_string(); - let any_addr = any_addr.parse::<SocketAddr>().unwrap(); - - let (stop_c, stop_p) = oneshot::channel::<()>(); - let (tx, rx) = oneshot::channel(); - - let child = thread::spawn(move || { - let socket = tokio::net::UdpSocket::bind(&any_addr).unwrap(); - tx.send(socket.local_addr().unwrap()).unwrap(); - - let server = EchoServer::new(socket); - let server = server.select(stop_p.map_err(|_| panic!())); - let server = server.map_err(|_| ()); - server.wait().unwrap(); - }); - - let client = std::net::UdpSocket::bind(&any_addr).unwrap(); - - let server_addr = rx.wait().unwrap(); - let mut buf = [0u8; 1000]; - - // warmup phase; for some reason initial couple of - // runs are much slower - // - // TODO: Describe the exact reasons; caching? branch predictor? lazy closures? - for _ in 0..8 { - client.send_to(&buf, &server_addr).unwrap(); - let _ = client.recv_from(&mut buf).unwrap(); - } - - b.iter(|| { - client.send_to(&buf, &server_addr).unwrap(); - let _ = client.recv_from(&mut buf).unwrap(); - }); - - stop_c.send(()).unwrap(); - child.join().unwrap(); -} - -#[bench] -fn futures_channel_latency(b: &mut Bencher) { - let (mut in_tx, in_rx) = mpsc::channel(32); - let (out_tx, out_rx) = mpsc::channel::<_>(32); - - let child = thread::spawn(|| out_tx.send_all(in_rx.then(|r| r.unwrap())).wait()); - let mut rx_iter = out_rx.wait(); - - // warmup phase; for some reason initial couple of runs are much slower - // - // TODO: Describe the exact reasons; caching? branch predictor? lazy closures? - for _ in 0..8 { - in_tx.start_send(Ok(1usize)).unwrap(); - let _ = rx_iter.next(); - } - - b.iter(|| { - in_tx.start_send(Ok(1usize)).unwrap(); - let _ = rx_iter.next(); - }); - - drop(in_tx); - child.join().unwrap().unwrap(); -} diff --git a/tokio/benches/mio-ops.rs b/tokio/benches/mio-ops.rs deleted file mode 100644 index 8aedbfd8..00000000 --- a/tokio/benches/mio-ops.rs +++ /dev/null @@ -1,57 +0,0 @@ -// Measure cost of different operations -// to get a sense of performance tradeoffs -#![cfg(feature = "broken")] -#![feature(test)] -#![warn(rust_2018_idioms)] - -extern crate test; - -use test::Bencher; - -use mio::tcp::TcpListener; -use mio::{PollOpt, Ready, Token}; - -#[bench] -fn mio_register_deregister(b: &mut Bencher) { - let addr = "127.0.0.1:0".parse().unwrap(); - // Setup the server socket - let sock = TcpListener::bind(&addr).unwrap(); - let poll = mio::Poll::new().unwrap(); - - const CLIENT: Token = Token(1); - - b.iter(|| { - poll.register(&sock, CLIENT, Ready::readable(), PollOpt::edge()) - .unwrap(); - poll.deregister(&sock).unwrap(); - }); -} - -#[bench] -fn mio_reregister(b: &mut Bencher) { - let addr = "127.0.0.1:0".parse().unwrap(); - // Setup the server socket - let sock = TcpListener::bind(&addr).unwrap(); - let poll = mio::Poll::new().unwrap(); - - const CLIENT: Token = Token(1); - poll.register(&sock, CLIENT, Ready::readable(), PollOpt::edge()) - .unwrap(); - - b.iter(|| { - poll.reregister(&sock, CLIENT, Ready::readable(), PollOpt::edge()) - .unwrap(); - }); - poll.deregister(&sock).unwrap(); -} - -#[bench] -fn mio_poll(b: &mut Bencher) { - let poll = mio::Poll::new().unwrap(); - let timeout = std::time::Duration::new(0, 0); - let mut events = mio::Events::with_capacity(1024); - - b.iter(|| { - poll.poll(&mut events, Some(timeout)).unwrap(); - }); -} diff --git a/tokio/benches/mpsc.rs b/tokio/benches/mpsc.rs deleted file mode 100644 index 0b97d55d..00000000 --- a/tokio/benches/mpsc.rs +++ /dev/null @@ -1,270 +0,0 @@ -#![feature(test)] -#![warn(rust_2018_idioms)] - -extern crate test; - -use tokio::sync::mpsc::*; - -use futures::{future, Async, Future, Sink, Stream}; -use std::thread; -use test::Bencher; - -type Medium = [usize; 64]; -type Large = [Medium; 64]; - -#[bench] -fn bounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::<Medium>(1_000)); - }) -} - -#[bench] -fn unbounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded_channel::<Medium>()); - }) -} -#[bench] -fn bounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::<Large>(1_000)); - }) -} - -#[bench] -fn unbounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded_channel::<Large>()); - }) -} - -#[bench] -fn send_one_message(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1_000); - - // Send - tx.try_send(1).unwrap(); - - // Receive - assert_eq!(Async::Ready(Some(1)), rx.poll().unwrap()); - }) -} - -#[bench] -fn send_one_message_large(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel::<Large>(1_000); - - // Send - let _ = tx.try_send([[0; 64]; 64]); - - // Receive - let _ = test::black_box(&rx.poll()); - }) -} - -#[bench] -fn bounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = channel::<i32>(1_000); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) -} - -#[bench] -fn bounded_tx_poll_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::<i32>(1); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) -} - -#[bench] -fn bounded_tx_poll_not_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::<i32>(1); - tx.try_send(1).unwrap(); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) -} - -#[bench] -fn unbounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = unbounded_channel::<i32>(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) -} - -#[bench] -fn unbounded_rx_not_ready_x5(b: &mut Bencher) { - let (_tx, mut rx) = unbounded_channel::<i32>(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) -} - -#[bench] -fn bounded_uncontended_1(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1_000); - - for i in 0..1000 { - tx.try_send(i).unwrap(); - // No need to create a task, because poll is not going to park. - assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap()); - } - }) -} - -#[bench] -fn bounded_uncontended_1_large(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel::<Large>(1_000); - - for i in 0..1000 { - let _ = tx.try_send([[i; 64]; 64]); - // No need to create a task, because poll is not going to park. - let _ = test::black_box(&rx.poll()); - } - }) -} - -#[bench] -fn bounded_uncontended_2(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1000); - - for i in 0..1000 { - tx.try_send(i).unwrap(); - } - - for i in 0..1000 { - // No need to create a task, because poll is not going to park. - assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap()); - } - }) -} - -#[bench] -fn contended_unbounded_tx(b: &mut Bencher) { - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..4 { - let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for mut tx in rx.iter() { - for i in 0..1_000 { - tx.try_send(i).unwrap(); - } - } - })); - } - - b.iter(|| { - // TODO make unbounded - let (tx, rx) = channel::<i32>(1_000_000); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(4 * 1_000); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } -} - -#[bench] -fn contended_bounded_tx(b: &mut Bencher) { - const THREADS: usize = 4; - const ITERS: usize = 100; - - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..THREADS { - let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for tx in rx.iter() { - let mut tx = tx.wait(); - for i in 0..ITERS { - tx.send(i as i32).unwrap(); - } - } - })); - } - - b.iter(|| { - let (tx, rx) = channel::<i32>(1); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(THREADS * ITERS); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } -} diff --git a/tokio/benches/oneshot.rs b/tokio/benches/oneshot.rs deleted file mode 100644 index a7f43c2f..00000000 --- a/tokio/benches/oneshot.rs +++ /dev/null @@ -1,120 +0,0 @@ -#![feature(test)] -#![warn(rust_2018_idioms)] - -extern crate test; - -use tokio::sync::oneshot; - -use futures::{future, Async, Future}; -use test::Bencher; - -#[bench] -fn new(b: &mut Bencher) { - b.iter(|| { - let _ = ::test::black_box(&oneshot::channel::<i32>()); - }) -} - -#[bench] -fn same_thread_send_recv(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = oneshot::channel(); - - let _ = tx.send(1); - - assert_eq!(Async::Ready(1), rx.poll().unwrap()); - }); -} - -#[bench] -fn same_thread_recv_multi_send_recv(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = oneshot::channel(); - - future::lazy(|| { - let _ = rx.poll(); - let _ = rx.poll(); - let _ = rx.poll(); - let _ = rx.poll(); - - let _ = tx.send(1); - assert_eq!(Async::Ready(1), rx.poll().unwrap()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }); -} - -#[bench] -fn multi_thread_send_recv(b: &mut Bencher) { - const MAX: usize = 10_000_000; - - use std::thread; - - fn spin<F: Future>(mut f: F) -> Result<F::Item, F::Error> { - use futures::Async::Ready; - loop { - match f.poll() { - Ok(Ready(v)) => return Ok(v), - Ok(_) => {} - Err(e) => return Err(e), - } - } - } - - let mut ping_txs = vec![]; - let mut ping_rxs = vec![]; - let mut pong_txs = vec![]; - let mut pong_rxs = vec![]; - - for _ in 0..MAX { - let (tx, rx) = oneshot::channel::<()>(); - - ping_txs.push(Some(tx)); - ping_rxs.push(Some(rx)); - - let (tx, rx) = oneshot::channel::<()>(); - - pong_txs.push(Some(tx)); - pong_rxs.push(Some(rx)); - } - - thread::spawn(move || { - future::lazy(|| { - for i in 0..MAX { - let ping_rx = ping_rxs[i].take().unwrap(); - let pong_tx = pong_txs[i].take().unwrap(); - - if spin(ping_rx).is_err() { - return Ok(()); - } - - pong_tx.send(()).unwrap(); - } - - Ok::<(), ()>(()) - }) - .wait() - .unwrap(); - }); - - future::lazy(|| { - let mut i = 0; - - b.iter(|| { - let ping_tx = ping_txs[i].take().unwrap(); - let pong_rx = pong_rxs[i].take().unwrap(); - - ping_tx.send(()).unwrap(); - spin(pong_rx).unwrap(); - - i += 1; - }); - - Ok::<(), ()>(()) - }) - .wait() - .unwrap(); -} diff --git a/tokio/benches/tcp.rs b/tokio/benches/tcp.rs deleted file mode 100644 index f9a4a03b..00000000 --- a/tokio/benches/tcp.rs +++ /dev/null @@ -1,257 +0,0 @@ -#![cfg(feature = "broken")] -#![feature(test)] -#![warn(rust_2018_idioms)] - -pub extern crate test; - -mod prelude { - pub use futures::*; - pub use tokio::net::{TcpListener, TcpStream}; - pub use tokio::reactor::Reactor; - pub use tokio_io::io::read_to_end; - - pub use std::io::{self, Read, Write}; - pub use std::thread; - pub use std::time::Duration; - pub use test::{self, Bencher}; -} - -mod connect_churn { - use crate::prelude::*; - - const NUM: usize = 300; - const CONCURRENT: usize = 8; - - #[bench] - fn one_thread(b: &mut Bencher) { - let addr = "127.0.0.1:0".parse().unwrap(); - - b.iter(move || { - let listener = TcpListener::bind(&addr).unwrap(); - let addr = listener.local_addr().unwrap(); - - // Spawn a single future that accepts & drops connections - let serve_incomings = listener - .incoming() - .map_err(|e| panic!("server err: {:?}", e)) - .for_each(|_| Ok(())); - - let connects = stream::iter_result((0..NUM).map(|_| { - Ok(TcpStream::connect(&addr).and_then(|sock| { - sock.set_linger(Some(Duration::from_secs(0))).unwrap(); - read_to_end(sock, vec![]) - |