summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--examples/chat.rs4
-rw-r--r--examples/connect.rs9
-rw-r--r--examples/echo-udp.rs4
-rw-r--r--examples/echo.rs4
-rw-r--r--examples/print_each_packet.rs4
-rw-r--r--examples/proxy.rs8
-rw-r--r--examples/tinydb.rs19
-rw-r--r--examples/tinyhttp.rs4
-rw-r--r--examples/udp-client.rs2
-rw-r--r--examples/udp-codec.rs4
-rw-r--r--tokio-test/src/task.rs2
-rw-r--r--tokio-test/tests/block_on.rs10
-rw-r--r--tokio-util/tests/framed_write.rs2
-rw-r--r--tokio/benches/latency.rs114
-rw-r--r--tokio/benches/mio-ops.rs57
-rw-r--r--tokio/benches/mpsc.rs270
-rw-r--r--tokio/benches/oneshot.rs120
-rw-r--r--tokio/benches/tcp.rs257
-rw-r--r--tokio/benches/thread_pool.rs153
-rw-r--r--tokio/src/lib.rs1
-rw-r--r--tokio/src/macros/assert.rs5
-rw-r--r--tokio/src/runtime/builder.rs2
-rw-r--r--tokio/src/runtime/shell.rs2
-rw-r--r--tokio/src/runtime/thread_pool/tests/queue.rs14
-rw-r--r--tokio/src/signal/registry.rs2
-rw-r--r--tokio/src/time/tests/test_queue.rs44
-rw-r--r--tokio/tests/io_async_read.rs1
-rw-r--r--tokio/tests/process_issue_42.rs4
-rw-r--r--tokio/tests/rt_common.rs1
-rw-r--r--tokio/tests/support/mock_file.rs2
-rw-r--r--tokio/tests/sync_barrier.rs1
-rw-r--r--tokio/tests/sync_mpsc.rs1
-rw-r--r--tokio/tests/sync_watch.rs1
-rw-r--r--tokio/tests/tcp_peek.rs2
34 files changed, 89 insertions, 1041 deletions
diff --git a/examples/chat.rs b/examples/chat.rs
index 2553cc5e..91589072 100644
--- a/examples/chat.rs
+++ b/examples/chat.rs
@@ -49,7 +49,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
// client connection.
let state = Arc::new(Mutex::new(Shared::new()));
- let addr = env::args().nth(1).unwrap_or("127.0.0.1:6142".to_string());
+ let addr = env::args()
+ .nth(1)
+ .unwrap_or_else(|| "127.0.0.1:6142".to_string());
// Bind a TCP listener to the socket address.
//
diff --git a/examples/connect.rs b/examples/connect.rs
index cdd18e19..d51af88c 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -36,10 +36,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
};
// Parse what address we're going to connect to
- let addr = match args.first() {
- Some(addr) => addr,
- None => Err("this program requires at least one argument")?,
- };
+ let addr = args
+ .first()
+ .ok_or("this program requires at least one argument")?;
let addr = addr.parse::<SocketAddr>()?;
let stdin = FramedRead::new(io::stdin(), codec::Bytes);
@@ -163,7 +162,7 @@ mod codec {
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Vec<u8>>> {
- if buf.len() > 0 {
+ if !buf.is_empty() {
let len = buf.len();
Ok(Some(buf.split_to(len).into_iter().collect()))
} else {
diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs
index f1e8134d..d8b2af9c 100644
--- a/examples/echo-udp.rs
+++ b/examples/echo-udp.rs
@@ -51,7 +51,9 @@ impl Server {
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
- let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+ let addr = env::args()
+ .nth(1)
+ .unwrap_or_else(|| "127.0.0.1:8080".to_string());
let socket = UdpSocket::bind(&addr).await?;
println!("Listening on: {}", socket.local_addr()?);
diff --git a/examples/echo.rs b/examples/echo.rs
index 455aebde..35b12279 100644
--- a/examples/echo.rs
+++ b/examples/echo.rs
@@ -33,7 +33,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Allow passing an address to listen on as the first argument of this
// program, but otherwise we'll just set up our TCP listener on
// 127.0.0.1:8080 for connections.
- let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+ let addr = env::args()
+ .nth(1)
+ .unwrap_or_else(|| "127.0.0.1:8080".to_string());
// Next up we create a TCP listener which will listen for incoming
// connections. This TCP listener is bound to the address we determined
diff --git a/examples/print_each_packet.rs b/examples/print_each_packet.rs
index f056db4a..4604139b 100644
--- a/examples/print_each_packet.rs
+++ b/examples/print_each_packet.rs
@@ -65,7 +65,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Allow passing an address to listen on as the first argument of this
// program, but otherwise we'll just set up our TCP listener on
// 127.0.0.1:8080 for connections.
- let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+ let addr = env::args()
+ .nth(1)
+ .unwrap_or_else(|| "127.0.0.1:8080".to_string());
// Next up we create a TCP listener which will listen for incoming
// connections. This TCP listener is bound to the address we determined
diff --git a/examples/proxy.rs b/examples/proxy.rs
index 48f8f057..f7a9111f 100644
--- a/examples/proxy.rs
+++ b/examples/proxy.rs
@@ -32,8 +32,12 @@ use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
- let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string());
- let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string());
+ let listen_addr = env::args()
+ .nth(1)
+ .unwrap_or_else(|| "127.0.0.1:8081".to_string());
+ let server_addr = env::args()
+ .nth(2)
+ .unwrap_or_else(|| "127.0.0.1:8080".to_string());
println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr);
diff --git a/examples/tinydb.rs b/examples/tinydb.rs
index 3fc88f6b..cf867a0a 100644
--- a/examples/tinydb.rs
+++ b/examples/tinydb.rs
@@ -84,7 +84,9 @@ enum Response {
async fn main() -> Result<(), Box<dyn Error>> {
// Parse the address we're going to run this server on
// and set up our TCP listener to accept connections.
- let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+ let addr = env::args()
+ .nth(1)
+ .unwrap_or_else(|| "127.0.0.1:8080".to_string());
let mut listener = TcpListener::bind(&addr).await?;
println!("Listening on: {}", addr);
@@ -175,15 +177,12 @@ fn handle_request(line: &str, db: &Arc<Database>) -> Response {
impl Request {
fn parse(input: &str) -> Result<Request, String> {
- let mut parts = input.splitn(3, " ");
+ let mut parts = input.splitn(3, ' ');
match parts.next() {
Some("GET") => {
- let key = match parts.next() {
- Some(key) => key,
- None => return Err(format!("GET must be followed by a key")),
- };
+ let key = parts.next().ok_or("GET must be followed by a key")?;
if parts.next().is_some() {
- return Err(format!("GET's key must not be followed by anything"));
+ return Err("GET's key must not be followed by anything".into());
}
Ok(Request::Get {
key: key.to_string(),
@@ -192,11 +191,11 @@ impl Request {
Some("SET") => {
let key = match parts.next() {
Some(key) => key,
- None => return Err(format!("SET must be followed by a key")),
+ None => return Err("SET must be followed by a key".into()),
};
let value = match parts.next() {
Some(value) => value,
- None => return Err(format!("SET needs a value")),
+ None => return Err("SET needs a value".into()),
};
Ok(Request::Set {
key: key.to_string(),
@@ -204,7 +203,7 @@ impl Request {
})
}
Some(cmd) => Err(format!("unknown command: {}", cmd)),
- None => Err(format!("empty input")),
+ None => Err("empty input".into()),
}
}
}
diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs
index f8731b9f..5ddf0d48 100644
--- a/examples/tinyhttp.rs
+++ b/examples/tinyhttp.rs
@@ -27,7 +27,9 @@ use tokio_util::codec::{Decoder, Encoder, Framed};
async fn main() -> Result<(), Box<dyn Error>> {
// Parse the arguments, bind the TCP socket we'll be listening to, spin up
// our worker threads, and start shipping sockets to those worker threads.
- let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+ let addr = env::args()
+ .nth(1)
+ .unwrap_or_else(|| "127.0.0.1:8080".to_string());
let mut server = TcpListener::bind(&addr).await?;
let mut incoming = server.incoming();
println!("Listening on: {}", addr);
diff --git a/examples/udp-client.rs b/examples/udp-client.rs
index 5437daf6..a191033d 100644
--- a/examples/udp-client.rs
+++ b/examples/udp-client.rs
@@ -44,7 +44,7 @@ fn get_stdin_data() -> Result<Vec<u8>, Box<dyn std::error::Error>> {
async fn main() -> Result<(), Box<dyn Error>> {
let remote_addr: SocketAddr = env::args()
.nth(1)
- .unwrap_or("127.0.0.1:8080".into())
+ .unwrap_or_else(|| "127.0.0.1:8080".into())
.parse()?;
// We use port 0 to let the operating system allocate an available port for us.
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs
index 0c9dbf76..6b3f84a0 100644
--- a/examples/udp-codec.rs
+++ b/examples/udp-codec.rs
@@ -22,7 +22,9 @@ use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
- let addr = env::args().nth(1).unwrap_or("127.0.0.1:0".to_string());
+ let addr = env::args()
+ .nth(1)
+ .unwrap_or_else(|| "127.0.0.1:0".to_string());
// Bind both our sockets and then figure out what ports we got.
let a = UdpSocket::bind(&addr).await?;
diff --git a/tokio-test/src/task.rs b/tokio-test/src/task.rs
index c21e31a5..4790de54 100644
--- a/tokio-test/src/task.rs
+++ b/tokio-test/src/task.rs
@@ -1,5 +1,7 @@
//! Futures task based helpers
+#![allow(clippy::mutex_atomic)]
+
use futures_core::Stream;
use std::future::Future;
use std::mem;
diff --git a/tokio-test/tests/block_on.rs b/tokio-test/tests/block_on.rs
index 7aec82cc..d640a13c 100644
--- a/tokio-test/tests/block_on.rs
+++ b/tokio-test/tests/block_on.rs
@@ -20,10 +20,8 @@ fn async_fn() {
#[test]
fn test_delay() {
let deadline = Instant::now() + Duration::from_millis(100);
- assert_eq!(
- (),
- block_on(async {
- delay_until(deadline).await;
- })
- );
+
+ block_on(async {
+ delay_until(deadline).await;
+ });
}
diff --git a/tokio-util/tests/framed_write.rs b/tokio-util/tests/framed_write.rs
index 706e6792..b2970c39 100644
--- a/tokio-util/tests/framed_write.rs
+++ b/tokio-util/tests/framed_write.rs
@@ -82,7 +82,7 @@ fn write_hits_backpressure() {
// Append to the end
match mock.calls.back_mut().unwrap() {
- &mut Ok(ref mut data) => {
+ Ok(ref mut data) => {
// Write in 2kb chunks
if data.len() < ITER {
data.extend_from_slice(&b[..]);
diff --git a/tokio/benches/latency.rs b/tokio/benches/latency.rs
deleted file mode 100644
index b44335fd..00000000
--- a/tokio/benches/latency.rs
+++ /dev/null
@@ -1,114 +0,0 @@
-#![cfg(feature = "broken")]
-#![feature(test)]
-#![warn(rust_2018_idioms)]
-
-extern crate test;
-
-use std::io;
-use std::net::SocketAddr;
-use std::thread;
-
-use futures::sync::mpsc;
-use futures::sync::oneshot;
-use futures::try_ready;
-use futures::{Future, Poll, Sink, Stream};
-use test::Bencher;
-use tokio::net::UdpSocket;
-
-/// UDP echo server
-struct EchoServer {
- socket: UdpSocket,
- buf: Vec<u8>,
- to_send: Option<(usize, SocketAddr)>,
-}
-
-impl EchoServer {
- fn new(s: UdpSocket) -> Self {
- EchoServer {
- socket: s,
- to_send: None,
- buf: vec![0u8; 1600],
- }
- }
-}
-
-impl Future for EchoServer {
- type Item = ();
- type Error = io::Error;
-
- fn poll(&mut self) -> Poll<(), io::Error> {
- loop {
- if let Some(&(size, peer)) = self.to_send.as_ref() {
- try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer));
- self.to_send = None;
- }
- self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf)));
- }
- }
-}
-
-#[bench]
-fn udp_echo_latency(b: &mut Bencher) {
- let any_addr = "127.0.0.1:0".to_string();
- let any_addr = any_addr.parse::<SocketAddr>().unwrap();
-
- let (stop_c, stop_p) = oneshot::channel::<()>();
- let (tx, rx) = oneshot::channel();
-
- let child = thread::spawn(move || {
- let socket = tokio::net::UdpSocket::bind(&any_addr).unwrap();
- tx.send(socket.local_addr().unwrap()).unwrap();
-
- let server = EchoServer::new(socket);
- let server = server.select(stop_p.map_err(|_| panic!()));
- let server = server.map_err(|_| ());
- server.wait().unwrap();
- });
-
- let client = std::net::UdpSocket::bind(&any_addr).unwrap();
-
- let server_addr = rx.wait().unwrap();
- let mut buf = [0u8; 1000];
-
- // warmup phase; for some reason initial couple of
- // runs are much slower
- //
- // TODO: Describe the exact reasons; caching? branch predictor? lazy closures?
- for _ in 0..8 {
- client.send_to(&buf, &server_addr).unwrap();
- let _ = client.recv_from(&mut buf).unwrap();
- }
-
- b.iter(|| {
- client.send_to(&buf, &server_addr).unwrap();
- let _ = client.recv_from(&mut buf).unwrap();
- });
-
- stop_c.send(()).unwrap();
- child.join().unwrap();
-}
-
-#[bench]
-fn futures_channel_latency(b: &mut Bencher) {
- let (mut in_tx, in_rx) = mpsc::channel(32);
- let (out_tx, out_rx) = mpsc::channel::<_>(32);
-
- let child = thread::spawn(|| out_tx.send_all(in_rx.then(|r| r.unwrap())).wait());
- let mut rx_iter = out_rx.wait();
-
- // warmup phase; for some reason initial couple of runs are much slower
- //
- // TODO: Describe the exact reasons; caching? branch predictor? lazy closures?
- for _ in 0..8 {
- in_tx.start_send(Ok(1usize)).unwrap();
- let _ = rx_iter.next();
- }
-
- b.iter(|| {
- in_tx.start_send(Ok(1usize)).unwrap();
- let _ = rx_iter.next();
- });
-
- drop(in_tx);
- child.join().unwrap().unwrap();
-}
diff --git a/tokio/benches/mio-ops.rs b/tokio/benches/mio-ops.rs
deleted file mode 100644
index 8aedbfd8..00000000
--- a/tokio/benches/mio-ops.rs
+++ /dev/null
@@ -1,57 +0,0 @@
-// Measure cost of different operations
-// to get a sense of performance tradeoffs
-#![cfg(feature = "broken")]
-#![feature(test)]
-#![warn(rust_2018_idioms)]
-
-extern crate test;
-
-use test::Bencher;
-
-use mio::tcp::TcpListener;
-use mio::{PollOpt, Ready, Token};
-
-#[bench]
-fn mio_register_deregister(b: &mut Bencher) {
- let addr = "127.0.0.1:0".parse().unwrap();
- // Setup the server socket
- let sock = TcpListener::bind(&addr).unwrap();
- let poll = mio::Poll::new().unwrap();
-
- const CLIENT: Token = Token(1);
-
- b.iter(|| {
- poll.register(&sock, CLIENT, Ready::readable(), PollOpt::edge())
- .unwrap();
- poll.deregister(&sock).unwrap();
- });
-}
-
-#[bench]
-fn mio_reregister(b: &mut Bencher) {
- let addr = "127.0.0.1:0".parse().unwrap();
- // Setup the server socket
- let sock = TcpListener::bind(&addr).unwrap();
- let poll = mio::Poll::new().unwrap();
-
- const CLIENT: Token = Token(1);
- poll.register(&sock, CLIENT, Ready::readable(), PollOpt::edge())
- .unwrap();
-
- b.iter(|| {
- poll.reregister(&sock, CLIENT, Ready::readable(), PollOpt::edge())
- .unwrap();
- });
- poll.deregister(&sock).unwrap();
-}
-
-#[bench]
-fn mio_poll(b: &mut Bencher) {
- let poll = mio::Poll::new().unwrap();
- let timeout = std::time::Duration::new(0, 0);
- let mut events = mio::Events::with_capacity(1024);
-
- b.iter(|| {
- poll.poll(&mut events, Some(timeout)).unwrap();
- });
-}
diff --git a/tokio/benches/mpsc.rs b/tokio/benches/mpsc.rs
deleted file mode 100644
index 0b97d55d..00000000
--- a/tokio/benches/mpsc.rs
+++ /dev/null
@@ -1,270 +0,0 @@
-#![feature(test)]
-#![warn(rust_2018_idioms)]
-
-extern crate test;
-
-use tokio::sync::mpsc::*;
-
-use futures::{future, Async, Future, Sink, Stream};
-use std::thread;
-use test::Bencher;
-
-type Medium = [usize; 64];
-type Large = [Medium; 64];
-
-#[bench]
-fn bounded_new_medium(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&channel::<Medium>(1_000));
- })
-}
-
-#[bench]
-fn unbounded_new_medium(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&unbounded_channel::<Medium>());
- })
-}
-#[bench]
-fn bounded_new_large(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&channel::<Large>(1_000));
- })
-}
-
-#[bench]
-fn unbounded_new_large(b: &mut Bencher) {
- b.iter(|| {
- let _ = test::black_box(&unbounded_channel::<Large>());
- })
-}
-
-#[bench]
-fn send_one_message(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel(1_000);
-
- // Send
- tx.try_send(1).unwrap();
-
- // Receive
- assert_eq!(Async::Ready(Some(1)), rx.poll().unwrap());
- })
-}
-
-#[bench]
-fn send_one_message_large(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel::<Large>(1_000);
-
- // Send
- let _ = tx.try_send([[0; 64]; 64]);
-
- // Receive
- let _ = test::black_box(&rx.poll());
- })
-}
-
-#[bench]
-fn bounded_rx_not_ready(b: &mut Bencher) {
- let (_tx, mut rx) = channel::<i32>(1_000);
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
-}
-
-#[bench]
-fn bounded_tx_poll_ready(b: &mut Bencher) {
- let (mut tx, _rx) = channel::<i32>(1);
- b.iter(|| {
- future::lazy(|| {
- assert!(tx.poll_ready().unwrap().is_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
-}
-
-#[bench]
-fn bounded_tx_poll_not_ready(b: &mut Bencher) {
- let (mut tx, _rx) = channel::<i32>(1);
- tx.try_send(1).unwrap();
- b.iter(|| {
- future::lazy(|| {
- assert!(tx.poll_ready().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
-}
-
-#[bench]
-fn unbounded_rx_not_ready(b: &mut Bencher) {
- let (_tx, mut rx) = unbounded_channel::<i32>();
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
-}
-
-#[bench]
-fn unbounded_rx_not_ready_x5(b: &mut Bencher) {
- let (_tx, mut rx) = unbounded_channel::<i32>();
- b.iter(|| {
- future::lazy(|| {
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
- assert!(rx.poll().unwrap().is_not_ready());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- })
-}
-
-#[bench]
-fn bounded_uncontended_1(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel(1_000);
-
- for i in 0..1000 {
- tx.try_send(i).unwrap();
- // No need to create a task, because poll is not going to park.
- assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap());
- }
- })
-}
-
-#[bench]
-fn bounded_uncontended_1_large(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel::<Large>(1_000);
-
- for i in 0..1000 {
- let _ = tx.try_send([[i; 64]; 64]);
- // No need to create a task, because poll is not going to park.
- let _ = test::black_box(&rx.poll());
- }
- })
-}
-
-#[bench]
-fn bounded_uncontended_2(b: &mut Bencher) {
- b.iter(|| {
- let (mut tx, mut rx) = channel(1000);
-
- for i in 0..1000 {
- tx.try_send(i).unwrap();
- }
-
- for i in 0..1000 {
- // No need to create a task, because poll is not going to park.
- assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap());
- }
- })
-}
-
-#[bench]
-fn contended_unbounded_tx(b: &mut Bencher) {
- let mut threads = vec![];
- let mut txs = vec![];
-
- for _ in 0..4 {
- let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
- txs.push(tx);
-
- threads.push(thread::spawn(move || {
- for mut tx in rx.iter() {
- for i in 0..1_000 {
- tx.try_send(i).unwrap();
- }
- }
- }));
- }
-
- b.iter(|| {
- // TODO make unbounded
- let (tx, rx) = channel::<i32>(1_000_000);
-
- for th in &txs {
- th.send(tx.clone()).unwrap();
- }
-
- drop(tx);
-
- let rx = rx.wait().take(4 * 1_000);
-
- for v in rx {
- let _ = test::black_box(v);
- }
- });
-
- drop(txs);
-
- for th in threads {
- th.join().unwrap();
- }
-}
-
-#[bench]
-fn contended_bounded_tx(b: &mut Bencher) {
- const THREADS: usize = 4;
- const ITERS: usize = 100;
-
- let mut threads = vec![];
- let mut txs = vec![];
-
- for _ in 0..THREADS {
- let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
- txs.push(tx);
-
- threads.push(thread::spawn(move || {
- for tx in rx.iter() {
- let mut tx = tx.wait();
- for i in 0..ITERS {
- tx.send(i as i32).unwrap();
- }
- }
- }));
- }
-
- b.iter(|| {
- let (tx, rx) = channel::<i32>(1);
-
- for th in &txs {
- th.send(tx.clone()).unwrap();
- }
-
- drop(tx);
-
- let rx = rx.wait().take(THREADS * ITERS);
-
- for v in rx {
- let _ = test::black_box(v);
- }
- });
-
- drop(txs);
-
- for th in threads {
- th.join().unwrap();
- }
-}
diff --git a/tokio/benches/oneshot.rs b/tokio/benches/oneshot.rs
deleted file mode 100644
index a7f43c2f..00000000
--- a/tokio/benches/oneshot.rs
+++ /dev/null
@@ -1,120 +0,0 @@
-#![feature(test)]
-#![warn(rust_2018_idioms)]
-
-extern crate test;
-
-use tokio::sync::oneshot;
-
-use futures::{future, Async, Future};
-use test::Bencher;
-
-#[bench]
-fn new(b: &mut Bencher) {
- b.iter(|| {
- let _ = ::test::black_box(&oneshot::channel::<i32>());
- })
-}
-
-#[bench]
-fn same_thread_send_recv(b: &mut Bencher) {
- b.iter(|| {
- let (tx, mut rx) = oneshot::channel();
-
- let _ = tx.send(1);
-
- assert_eq!(Async::Ready(1), rx.poll().unwrap());
- });
-}
-
-#[bench]
-fn same_thread_recv_multi_send_recv(b: &mut Bencher) {
- b.iter(|| {
- let (tx, mut rx) = oneshot::channel();
-
- future::lazy(|| {
- let _ = rx.poll();
- let _ = rx.poll();
- let _ = rx.poll();
- let _ = rx.poll();
-
- let _ = tx.send(1);
- assert_eq!(Async::Ready(1), rx.poll().unwrap());
-
- Ok::<_, ()>(())
- })
- .wait()
- .unwrap();
- });
-}
-
-#[bench]
-fn multi_thread_send_recv(b: &mut Bencher) {
- const MAX: usize = 10_000_000;
-
- use std::thread;
-
- fn spin<F: Future>(mut f: F) -> Result<F::Item, F::Error> {
- use futures::Async::Ready;
- loop {
- match f.poll() {
- Ok(Ready(v)) => return Ok(v),
- Ok(_) => {}
- Err(e) => return Err(e),
- }
- }
- }
-
- let mut ping_txs = vec![];
- let mut ping_rxs = vec![];
- let mut pong_txs = vec![];
- let mut pong_rxs = vec![];
-
- for _ in 0..MAX {
- let (tx, rx) = oneshot::channel::<()>();
-
- ping_txs.push(Some(tx));
- ping_rxs.push(Some(rx));
-
- let (tx, rx) = oneshot::channel::<()>();
-
- pong_txs.push(Some(tx));
- pong_rxs.push(Some(rx));
- }
-
- thread::spawn(move || {
- future::lazy(|| {
- for i in 0..MAX {
- let ping_rx = ping_rxs[i].take().unwrap();
- let pong_tx = pong_txs[i].take().unwrap();
-
- if spin(ping_rx).is_err() {
- return Ok(());
- }
-
- pong_tx.send(()).unwrap();
- }
-
- Ok::<(), ()>(())
- })
- .wait()
- .unwrap();
- });
-
- future::lazy(|| {
- let mut i = 0;
-
- b.iter(|| {
- let ping_tx = ping_txs[i].take().unwrap();
- let pong_rx = pong_rxs[i].take().unwrap();
-
- ping_tx.send(()).unwrap();
- spin(pong_rx).unwrap();
-
- i += 1;
- });
-
- Ok::<(), ()>(())
- })
- .wait()
- .unwrap();
-}
diff --git a/tokio/benches/tcp.rs b/tokio/benches/tcp.rs
deleted file mode 100644
index f9a4a03b..00000000
--- a/tokio/benches/tcp.rs
+++ /dev/null
@@ -1,257 +0,0 @@
-#![cfg(feature = "broken")]
-#![feature(test)]
-#![warn(rust_2018_idioms)]
-
-pub extern crate test;
-
-mod prelude {
- pub use futures::*;
- pub use tokio::net::{TcpListener, TcpStream};
- pub use tokio::reactor::Reactor;
- pub use tokio_io::io::read_to_end;
-
- pub use std::io::{self, Read, Write};
- pub use std::thread;
- pub use std::time::Duration;
- pub use test::{self, Bencher};
-}
-
-mod connect_churn {
- use crate::prelude::*;
-
- const NUM: usize = 300;
- const CONCURRENT: usize = 8;
-
- #[bench]
- fn one_thread(b: &mut Bencher) {
- let addr = "127.0.0.1:0".parse().unwrap();
-
- b.iter(move || {
- let listener = TcpListener::bind(&addr).unwrap();
- let addr = listener.local_addr().unwrap();
-
- // Spawn a single future that accepts & drops connections
- let serve_incomings = listener
- .incoming()
- .map_err(|e| panic!("server err: {:?}", e))
- .for_each(|_| Ok(()));
-
- let connects = stream::iter_result((0..NUM).map(|_| {
- Ok(TcpStream::connect(&addr).and_then(|sock| {
- sock.set_linger(Some(Duration::from_secs(0))).unwrap();
- read_to_end(sock, vec![])
-