summaryrefslogtreecommitdiffstats
path: root/tokio/tests/io_async_fd.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/tests/io_async_fd.rs')
-rw-r--r--tokio/tests/io_async_fd.rs604
1 files changed, 604 insertions, 0 deletions
diff --git a/tokio/tests/io_async_fd.rs b/tokio/tests/io_async_fd.rs
new file mode 100644
index 00000000..0303eff6
--- /dev/null
+++ b/tokio/tests/io_async_fd.rs
@@ -0,0 +1,604 @@
+#![warn(rust_2018_idioms)]
+#![cfg(all(unix, feature = "full"))]
+
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::sync::{
+ atomic::{AtomicBool, Ordering},
+ Arc,
+};
+use std::time::Duration;
+use std::{
+ future::Future,
+ io::{self, ErrorKind, Read, Write},
+ task::{Context, Waker},
+};
+
+use nix::errno::Errno;
+use nix::unistd::{close, read, write};
+
+use futures::{poll, FutureExt};
+
+use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard};
+use tokio_test::{assert_err, assert_pending};
+
+struct TestWaker {
+ inner: Arc<TestWakerInner>,
+ waker: Waker,
+}
+
+#[derive(Default)]
+struct TestWakerInner {
+ awoken: AtomicBool,
+}
+
+impl futures::task::ArcWake for TestWakerInner {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ arc_self.awoken.store(true, Ordering::SeqCst);
+ }
+}
+
+impl TestWaker {
+ fn new() -> Self {
+ let inner: Arc<TestWakerInner> = Default::default();
+
+ Self {
+ inner: inner.clone(),
+ waker: futures::task::waker(inner),
+ }
+ }
+
+ fn awoken(&self) -> bool {
+ self.inner.awoken.swap(false, Ordering::SeqCst)
+ }
+
+ fn context(&self) -> Context<'_> {
+ Context::from_waker(&self.waker)
+ }
+}
+
+fn is_blocking(e: &nix::Error) -> bool {
+ Some(Errno::EAGAIN) == e.as_errno()
+}
+
+#[derive(Debug)]
+struct FileDescriptor {
+ fd: RawFd,
+}
+
+impl AsRawFd for FileDescriptor {
+ fn as_raw_fd(&self) -> RawFd {
+ self.fd
+ }
+}
+
+impl Read for &FileDescriptor {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ match read(self.fd, buf) {
+ Ok(n) => Ok(n),
+ Err(e) if is_blocking(&e) => Err(ErrorKind::WouldBlock.into()),
+ Err(e) => Err(io::Error::new(ErrorKind::Other, e)),
+ }
+ }
+}
+
+impl Read for FileDescriptor {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ (self as &Self).read(buf)
+ }
+}
+
+impl Write for &FileDescriptor {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ match write(self.fd, buf) {
+ Ok(n) => Ok(n),
+ Err(e) if is_blocking(&e) => Err(ErrorKind::WouldBlock.into()),
+ Err(e) => Err(io::Error::new(ErrorKind::Other, e)),
+ }
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+impl Write for FileDescriptor {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ (self as &Self).write(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ (self as &Self).flush()
+ }
+}
+
+impl Drop for FileDescriptor {
+ fn drop(&mut self) {
+ let _ = close(self.fd);
+ }
+}
+
+fn set_nonblocking(fd: RawFd) {
+ use nix::fcntl::{OFlag, F_GETFL, F_SETFL};
+
+ let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)");
+
+ if flags < 0 {
+ panic!(
+ "bad return value from fcntl(F_GETFL): {} ({:?})",
+ flags,
+ nix::Error::last()
+ );
+ }
+
+ let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
+
+ nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)");
+}
+
+fn socketpair() -> (FileDescriptor, FileDescriptor) {
+ use nix::sys::socket::{self, AddressFamily, SockFlag, SockType};
+
+ let (fd_a, fd_b) = socket::socketpair(
+ AddressFamily::Unix,
+ SockType::Stream,
+ None,
+ SockFlag::empty(),
+ )
+ .expect("socketpair");
+ let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b });
+
+ set_nonblocking(fds.0.fd);
+ set_nonblocking(fds.1.fd);
+
+ fds
+}
+
+fn drain(mut fd: &FileDescriptor) {
+ let mut buf = [0u8; 512];
+
+ loop {
+ match fd.read(&mut buf[..]) {
+ Err(e) if e.kind() == ErrorKind::WouldBlock => break,
+ Ok(0) => panic!("unexpected EOF"),
+ Err(e) => panic!("unexpected error: {:?}", e),
+ Ok(_) => continue,
+ }
+ }
+}
+
+#[tokio::test]
+async fn initially_writable() {
+ let (a, b) = socketpair();
+
+ let afd_a = AsyncFd::new(a).unwrap();
+ let afd_b = AsyncFd::new(b).unwrap();
+
+ afd_a.writable().await.unwrap().clear_ready();
+ afd_b.writable().await.unwrap().clear_ready();
+
+ futures::select_biased! {
+ _ = tokio::time::sleep(Duration::from_millis(10)).fuse() => {},
+ _ = afd_a.readable().fuse() => panic!("Unexpected readable state"),
+ _ = afd_b.readable().fuse() => panic!("Unexpected readable state"),
+ }
+}
+
+#[tokio::test]
+async fn reset_readable() {
+ let (a, mut b) = socketpair();
+
+ let afd_a = AsyncFd::new(a).unwrap();
+
+ let readable = afd_a.readable();
+ tokio::pin!(readable);
+
+ tokio::select! {
+ _ = readable.as_mut() => panic!(),
+ _ = tokio::time::sleep(Duration::from_millis(10)) => {}
+ }
+
+ b.write_all(b"0").unwrap();
+
+ let mut guard = readable.await.unwrap();
+
+ guard.with_io(|| afd_a.get_ref().read(&mut [0])).unwrap();
+
+ // `a` is not readable, but the reactor still thinks it is
+ // (because we have not observed a not-ready error yet)
+ afd_a.readable().await.unwrap().retain_ready();
+
+ // Explicitly clear the ready state
+ guard.clear_ready();
+
+ let readable = afd_a.readable();
+ tokio::pin!(readable);
+
+ tokio::select! {
+ _ = readable.as_mut() => panic!(),
+ _ = tokio::time::sleep(Duration::from_millis(10)) => {}
+ }
+
+ b.write_all(b"0").unwrap();
+
+ // We can observe the new readable event
+ afd_a.readable().await.unwrap().clear_ready();
+}
+
+#[tokio::test]
+async fn reset_writable() {
+ let (a, b) = socketpair();
+
+ let afd_a = AsyncFd::new(a).unwrap();
+
+ let mut guard = afd_a.writable().await.unwrap();
+
+ // Write until we get a WouldBlock. This also clears the ready state.
+ loop {
+ if let Err(e) = guard.with_io(|| afd_a.get_ref().write(&[0; 512][..])) {
+ assert_eq!(ErrorKind::WouldBlock, e.kind());
+ break;
+ }
+ }
+
+ // Writable state should be cleared now.
+ let writable = afd_a.writable();
+ tokio::pin!(writable);
+
+ tokio::select! {
+ _ = writable.as_mut() => panic!(),
+ _ = tokio::time::sleep(Duration::from_millis(10)) => {}
+ }
+
+ // Read from the other side; we should become writable now.
+ drain(&b);
+
+ let _ = writable.await.unwrap();
+}
+
+#[derive(Debug)]
+struct ArcFd<T>(Arc<T>);
+impl<T: AsRawFd> AsRawFd for ArcFd<T> {
+ fn as_raw_fd(&self) -> RawFd {
+ self.0.as_raw_fd()
+ }
+}
+
+#[tokio::test]
+async fn drop_closes() {
+ let (a, mut b) = socketpair();
+
+ let afd_a = AsyncFd::new(a).unwrap();
+
+ assert_eq!(
+ ErrorKind::WouldBlock,
+ b.read(&mut [0]).err().unwrap().kind()
+ );
+
+ std::mem::drop(afd_a);
+
+ assert_eq!(0, b.read(&mut [0]).unwrap());
+
+ // into_inner does not close the fd
+
+ let (a, mut b) = socketpair();
+ let afd_a = AsyncFd::new(a).unwrap();
+ let _a: FileDescriptor = afd_a.into_inner();
+
+ assert_eq!(
+ ErrorKind::WouldBlock,
+ b.read(&mut [0]).err().unwrap().kind()
+ );
+
+ // Drop closure behavior is delegated to the inner object
+ let (a, mut b) = socketpair();
+ let arc_fd = Arc::new(a);
+ let afd_a = AsyncFd::new(ArcFd(arc_fd.clone())).unwrap();
+ std::mem::drop(afd_a);
+
+ assert_eq!(
+ ErrorKind::WouldBlock,
+ b.read(&mut [0]).err().unwrap().kind()
+ );
+
+ std::mem::drop(arc_fd); // suppress unnecessary clone clippy warning
+}
+
+#[tokio::test]
+async fn with_poll() {
+ use std::task::Poll;
+
+ let (a, mut b) = socketpair();
+
+ b.write_all(b"0").unwrap();
+
+ let afd_a = AsyncFd::new(a).unwrap();
+
+ let mut guard = afd_a.readable().await.unwrap();
+
+ afd_a.get_ref().read_exact(&mut [0]).unwrap();
+
+ // Should not clear the readable state
+ let _ = guard.with_poll(|| Poll::Ready(()));
+
+ // Still readable...
+ let _ = afd_a.readable().await.unwrap();
+
+ // Should clear the readable state
+ let _ = guard.with_poll(|| Poll::Pending::<()>);
+
+ // Assert not readable
+ let readable = afd_a.readable();
+ tokio::pin!(readable);
+
+ tokio::select! {
+ _ = readable.as_mut() => panic!(),
+ _ = tokio::time::sleep(Duration::from_millis(10)) => {}
+ }
+
+ // Write something down b again and make sure we're reawoken
+ b.write_all(b"0").unwrap();
+ let _ = readable.await.unwrap();
+}
+
+#[tokio::test]
+async fn multiple_waiters() {
+ let (a, mut b) = socketpair();
+ let afd_a = Arc::new(AsyncFd::new(a).unwrap());
+
+ let barrier = Arc::new(tokio::sync::Barrier::new(11));
+
+ let mut tasks = Vec::new();
+ for _ in 0..10 {
+ let afd_a = afd_a.clone();
+ let barrier = barrier.clone();
+
+ let f = async move {
+ let notify_barrier = async {
+ barrier.wait().await;
+ futures::future::pending::<()>().await;
+ };
+
+ futures::select_biased! {
+ guard = afd_a.readable().fuse() => {
+ tokio::task::yield_now().await;
+ guard.unwrap().clear_ready()
+ },
+ _ = notify_barrier.fuse() => unreachable!(),
+ }
+
+ std::mem::drop(afd_a);
+ };
+
+ tasks.push(tokio::spawn(f));
+ }
+
+ let mut all_tasks = futures::future::try_join_all(tasks);
+
+ tokio::select! {
+ r = std::pin::Pin::new(&mut all_tasks) => {
+ r.unwrap(); // propagate panic
+ panic!("Tasks exited unexpectedly")
+ },
+ _ = barrier.wait() => {}
+ };
+
+ b.write_all(b"0").unwrap();
+
+ all_tasks.await.unwrap();
+}
+
+#[tokio::test]
+async fn poll_fns() {
+ let (a, b) = socketpair();
+ let afd_a = Arc::new(AsyncFd::new(a).unwrap());
+ let afd_b = Arc::new(AsyncFd::new(b).unwrap());
+
+ // Fill up the write side of A
+ while afd_a.get_ref().write(&[0; 512]).is_ok() {}
+
+ let waker = TestWaker::new();
+
+ assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context()));
+
+ let afd_a_2 = afd_a.clone();
+ let r_barrier = Arc::new(tokio::sync::Barrier::new(2));
+ let barrier_clone = r_barrier.clone();
+
+ let read_fut = tokio::spawn(async move {
+ // Move waker onto this task first
+ assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
+ .as_ref()
+ .poll_read_ready(cx))));
+ barrier_clone.wait().await;
+
+ let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await;
+ });
+
+ let afd_a_2 = afd_a.clone();
+ let w_barrier = Arc::new(tokio::sync::Barrier::new(2));
+ let barrier_clone = w_barrier.clone();
+
+ let mut write_fut = tokio::spawn(async move {
+ // Move waker onto this task first
+ assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
+ .as_ref()
+ .poll_write_ready(cx))));
+ barrier_clone.wait().await;
+
+ let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await;
+ });
+
+ r_barrier.wait().await;
+ w_barrier.wait().await;
+
+ let readable = afd_a.readable();
+ tokio::pin!(readable);
+
+ tokio::select! {
+ _ = &mut readable => unreachable!(),
+ _ = tokio::task::yield_now() => {}
+ }
+
+ // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly
+ afd_b.get_ref().write_all(b"0").unwrap();
+
+ let _ = tokio::join!(readable, read_fut);
+
+ // Our original waker should _not_ be awoken (poll_read_ready retains only the last context)
+ assert!(!waker.awoken());
+
+ // The writable side should not be awoken
+ tokio::select! {
+ _ = &mut write_fut => unreachable!(),
+ _ = tokio::time::sleep(Duration::from_millis(5)) => {}
+ }
+
+ // Make it writable now
+ drain(afd_b.get_ref());
+
+ // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
+ let _ = write_fut.await;
+}
+
+fn assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>> {
+ let mut pinned = Box::pin(f);
+
+ assert_pending!(pinned
+ .as_mut()
+ .poll(&mut Context::from_waker(futures::task::noop_waker_ref())));
+
+ pinned
+}
+
+fn rt() -> tokio::runtime::Runtime {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+}
+
+#[test]
+fn driver_shutdown_wakes_currently_pending() {
+ let rt = rt();
+
+ let (a, _b) = socketpair();
+ let afd_a = {
+ let _enter = rt.enter();
+ AsyncFd::new(a).unwrap()
+ };
+
+ let readable = assert_pending(afd_a.readable());
+
+ std::mem::drop(rt);
+
+ // Being awoken by a rt drop does not return an error, currently...
+ let _ = futures::executor::block_on(readable).unwrap();
+
+ // However, attempting to initiate a readiness wait when the rt is dropped is an error
+ assert_err!(futures::executor::block_on(afd_a.readable()));
+}
+
+#[test]
+fn driver_shutdown_wakes_future_pending() {
+ let rt = rt();
+
+ let (a, _b) = socketpair();
+ let afd_a = {
+ let _enter = rt.enter();
+ AsyncFd::new(a).unwrap()
+ };
+
+ std::mem::drop(rt);
+
+ assert_err!(futures::executor::block_on(afd_a.readable()));
+}
+
+#[test]
+fn driver_shutdown_wakes_pending_race() {
+ // TODO: make this a loom test
+ for _ in 0..100 {
+ let rt = rt();
+
+ let (a, _b) = socketpair();
+ let afd_a = {
+ let _enter = rt.enter();
+ AsyncFd::new(a).unwrap()
+ };
+
+ let _ = std::thread::spawn(move || std::mem::drop(rt));
+
+ // This may or may not return an error (but will be awoken)
+ let _ = futures::executor::block_on(afd_a.readable());
+
+ // However retrying will always return an error
+ assert_err!(futures::executor::block_on(afd_a.readable()));
+ }
+}
+
+async fn poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
+ futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await
+}
+
+async fn poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
+ futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await
+}
+
+#[test]
+fn driver_shutdown_wakes_currently_pending_polls() {
+ let rt = rt();
+
+ let (a, _b) = socketpair();
+ let afd_a = {
+ let _enter = rt.enter();
+ AsyncFd::new(a).unwrap()
+ };
+
+ while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
+
+ let readable = assert_pending(poll_readable(&afd_a));
+ let writable = assert_pending(poll_writable(&afd_a));
+
+ std::mem::drop(rt);
+
+ // Attempting to poll readiness when the rt is dropped is an error
+ assert_err!(futures::executor::block_on(readable));
+ assert_err!(futures::executor::block_on(writable));
+}
+
+#[test]
+fn driver_shutdown_wakes_poll() {
+ let rt = rt();
+
+ let (a, _b) = socketpair();
+ let afd_a = {
+ let _enter = rt.enter();
+ AsyncFd::new(a).unwrap()
+ };
+
+ std::mem::drop(rt);
+
+ assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
+ assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
+}
+
+#[test]
+fn driver_shutdown_wakes_poll_race() {
+ // TODO: make this a loom test
+ for _ in 0..100 {
+ let rt = rt();
+
+ let (a, _b) = socketpair();
+ let afd_a = {
+ let _enter = rt.enter();
+ AsyncFd::new(a).unwrap()
+ };
+
+ while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
+
+ let _ = std::thread::spawn(move || std::mem::drop(rt));
+
+ // The poll variants will always return an error in this case
+ assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
+ assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
+ }
+}