From e01391351bcb0715f737cefe94e1bc99f19af226 Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Tue, 8 Dec 2020 06:12:22 +0100 Subject: Add stress test (#3222) Created a simple echo TCP server that on two different runtimes that is called from a GitHub action using Valgrind to ensure that there are no memory leaks. Fixes: #3022 --- stress-test/Cargo.toml | 14 ++++++++ stress-test/examples/simple_echo_tcp.rs | 58 +++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 stress-test/Cargo.toml create mode 100644 stress-test/examples/simple_echo_tcp.rs (limited to 'stress-test') diff --git a/stress-test/Cargo.toml b/stress-test/Cargo.toml new file mode 100644 index 00000000..5f7910a8 --- /dev/null +++ b/stress-test/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "stress-test" +version = "0.1.0" +authors = ["Tokio Contributors "] +edition = "2018" +publish = false + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = {path = "../tokio/", features = ["full"]} + +[dev-dependencies] +rand = "0.7.3" diff --git a/stress-test/examples/simple_echo_tcp.rs b/stress-test/examples/simple_echo_tcp.rs new file mode 100644 index 00000000..099523fd --- /dev/null +++ b/stress-test/examples/simple_echo_tcp.rs @@ -0,0 +1,58 @@ +//! Simple TCP echo server to check memory leaks using Valgrind. +use std::{thread::sleep, time::Duration}; + +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::{TcpListener, TcpSocket}, + runtime::Builder, + sync::oneshot, +}; + +const TCP_ENDPOINT: &str = "127.0.0.1:8080"; +const NUM_MSGS: usize = 10_000; +const MSG_SIZE: usize = 1024; + +fn main() { + let rt = Builder::new_multi_thread().enable_io().build().unwrap(); + let rt2 = Builder::new_multi_thread().enable_io().build().unwrap(); + + rt.spawn(async { + let listener = TcpListener::bind(TCP_ENDPOINT).await.unwrap(); + let (mut socket, _) = listener.accept().await.unwrap(); + let (mut rd, mut wr) = socket.split(); + while tokio::io::copy(&mut rd, &mut wr).await.is_ok() {} + }); + + // wait a bit so that the listener binds. + sleep(Duration::from_millis(100)); + + // create a channel to let the main thread know that all the messages were sent and received. + let (tx, mut rx) = oneshot::channel(); + + rt2.spawn(async { + let addr = TCP_ENDPOINT.parse().unwrap(); + let socket = TcpSocket::new_v4().unwrap(); + let mut stream = socket.connect(addr).await.unwrap(); + + let mut buff = [0; MSG_SIZE]; + for _ in 0..NUM_MSGS { + let one_mega_random_bytes: Vec = + (0..MSG_SIZE).map(|_| rand::random::()).collect(); + stream + .write_all(one_mega_random_bytes.as_slice()) + .await + .unwrap(); + stream.read(&mut buff).await.unwrap(); + } + tx.send(()).unwrap(); + }); + + loop { + // check that we're done. + match rx.try_recv() { + Err(oneshot::error::TryRecvError::Empty) => (), + Err(oneshot::error::TryRecvError::Closed) => panic!("channel got closed..."), + Ok(()) => break, + } + } +} -- cgit v1.2.3