summaryrefslogtreecommitdiffstats
path: root/tokio/tests
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-11-12 20:07:43 -0800
committerGitHub <noreply@github.com>2020-11-12 20:07:43 -0800
commit02b1117dca1c1e1fcc700bff4d6a93c33bfbc7d8 (patch)
tree95788ea2d89de4af6021befa70a3f5d80034578a /tokio/tests
parent685da8dadd8821d1053ce7ecaf01ab5ee231bef9 (diff)
net: add TcpStream::ready and non-blocking ops (#3130)
Adds function to await for readiness on the TcpStream and non-blocking read/write functions. `async fn TcpStream::ready(Interest)` waits for socket readiness satisfying **any** of the specified interest. There are also two shorthand functions, `readable()` and `writable()`. Once the stream is in a ready state, the caller may perform non-blocking operations on it using `try_read()` and `try_write()`. These function return `WouldBlock` if the stream is not, in fact, ready. The await readiness function are similar to `AsyncFd`, but do not require a guard. The guard in `AsyncFd` protect against a potential race between receiving the readiness notification and clearing it. The guard is needed as Tokio does not control the operations. With `TcpStream`, the `try_read()` and `try_write()` function handle clearing stream readiness as needed. This also exposes `Interest` and `Ready`, both defined in Tokio as wrappers for Mio types. These types will also be useful for fixing #3072 . Other I/O types, such as `TcpListener`, `UdpSocket`, `Unix*` should get similar functions, but this is left for later PRs. Refs: #3130
Diffstat (limited to 'tokio/tests')
-rw-r--r--tokio/tests/tcp_stream.rs112
1 files changed, 112 insertions, 0 deletions
diff --git a/tokio/tests/tcp_stream.rs b/tokio/tests/tcp_stream.rs
new file mode 100644
index 00000000..784ade8a
--- /dev/null
+++ b/tokio/tests/tcp_stream.rs
@@ -0,0 +1,112 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+use tokio::io::Interest;
+use tokio::net::{TcpListener, TcpStream};
+use tokio_test::task;
+use tokio_test::{assert_pending, assert_ready_ok};
+
+use std::io;
+
+#[tokio::test]
+async fn try_read_write() {
+ const DATA: &[u8] = b"this is some data to write to the socket";
+
+ // Create listener
+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+
+ // Create socket pair
+ let client = TcpStream::connect(listener.local_addr().unwrap())
+ .await
+ .unwrap();
+ let (server, _) = listener.accept().await.unwrap();
+ let mut written = DATA.to_vec();
+
+ // Track the server receiving data
+ let mut readable = task::spawn(server.readable());
+ assert_pending!(readable.poll());
+
+ // Write data.
+ client.writable().await.unwrap();
+ assert_eq!(DATA.len(), client.try_write(DATA).unwrap());
+
+ // The task should be notified
+ while !readable.is_woken() {
+ tokio::task::yield_now().await;
+ }
+
+ // Fill the write buffer
+ loop {
+ // Still ready
+ let mut writable = task::spawn(client.writable());
+ assert_ready_ok!(writable.poll());
+
+ match client.try_write(DATA) {
+ Ok(n) => written.extend(&DATA[..n]),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ break;
+ }
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ {
+ // Write buffer full
+ let mut writable = task::spawn(client.writable());
+ assert_pending!(writable.poll());
+
+ // Drain the socket from the server end
+ let mut read = vec![0; written.len()];
+ let mut i = 0;
+
+ while i < read.len() {
+ server.readable().await.unwrap();
+
+ match server.try_read(&mut read[i..]) {
+ Ok(n) => i += n,
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ assert_eq!(read, written);
+ }
+
+ // Now, we listen for shutdown
+ drop(client);
+
+ loop {
+ let ready = server.ready(Interest::READABLE).await.unwrap();
+
+ if ready.is_read_closed() {
+ return;
+ } else {
+ tokio::task::yield_now().await;
+ }
+ }
+}
+
+#[test]
+fn buffer_not_included_in_future() {
+ use std::mem;
+
+ const N: usize = 4096;
+
+ let fut = async {
+ let stream = TcpStream::connect("127.0.0.1:8080").await.unwrap();
+
+ loop {
+ stream.readable().await.unwrap();
+
+ let mut buf = [0; N];
+ let n = stream.try_read(&mut buf[..]).unwrap();
+
+ if n == 0 {
+ break;
+ }
+ }
+ };
+
+ let n = mem::size_of_val(&fut);
+ assert!(n < 1000);
+}