diff options
author | Blas Rodriguez Irizar <rodrigblas@gmail.com> | 2020-12-08 06:12:22 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-07 21:12:22 -0800 |
commit | e01391351bcb0715f737cefe94e1bc99f19af226 (patch) | |
tree | 5575f27e36e49b887062119225e1d61335a01b9a /stress-test/examples | |
parent | 57dffb9dfe9e4c0f12429246540add3975f4a754 (diff) |
Add stress test (#3222)
Created a simple echo TCP server that on two different runtimes that is
called from a GitHub action using Valgrind to ensure that there are
no memory leaks.
Fixes: #3022
Diffstat (limited to 'stress-test/examples')
-rw-r--r-- | stress-test/examples/simple_echo_tcp.rs | 58 |
1 files changed, 58 insertions, 0 deletions
diff --git a/stress-test/examples/simple_echo_tcp.rs b/stress-test/examples/simple_echo_tcp.rs new file mode 100644 index 00000000..099523fd --- /dev/null +++ b/stress-test/examples/simple_echo_tcp.rs @@ -0,0 +1,58 @@ +//! Simple TCP echo server to check memory leaks using Valgrind. +use std::{thread::sleep, time::Duration}; + +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::{TcpListener, TcpSocket}, + runtime::Builder, + sync::oneshot, +}; + +const TCP_ENDPOINT: &str = "127.0.0.1:8080"; +const NUM_MSGS: usize = 10_000; +const MSG_SIZE: usize = 1024; + +fn main() { + let rt = Builder::new_multi_thread().enable_io().build().unwrap(); + let rt2 = Builder::new_multi_thread().enable_io().build().unwrap(); + + rt.spawn(async { + let listener = TcpListener::bind(TCP_ENDPOINT).await.unwrap(); + let (mut socket, _) = listener.accept().await.unwrap(); + let (mut rd, mut wr) = socket.split(); + while tokio::io::copy(&mut rd, &mut wr).await.is_ok() {} + }); + + // wait a bit so that the listener binds. + sleep(Duration::from_millis(100)); + + // create a channel to let the main thread know that all the messages were sent and received. + let (tx, mut rx) = oneshot::channel(); + + rt2.spawn(async { + let addr = TCP_ENDPOINT.parse().unwrap(); + let socket = TcpSocket::new_v4().unwrap(); + let mut stream = socket.connect(addr).await.unwrap(); + + let mut buff = [0; MSG_SIZE]; + for _ in 0..NUM_MSGS { + let one_mega_random_bytes: Vec<u8> = + (0..MSG_SIZE).map(|_| rand::random::<u8>()).collect(); + stream + .write_all(one_mega_random_bytes.as_slice()) + .await + .unwrap(); + stream.read(&mut buff).await.unwrap(); + } + tx.send(()).unwrap(); + }); + + loop { + // check that we're done. + match rx.try_recv() { + Err(oneshot::error::TryRecvError::Empty) => (), + Err(oneshot::error::TryRecvError::Closed) => panic!("channel got closed..."), + Ok(()) => break, + } + } +} |