summaryrefslogtreecommitdiffstats
path: root/tokio/tests
diff options
context:
space:
mode:
authorbdonlan <bdonlan@gmail.com>2020-10-22 14:12:41 -0700
committerGitHub <noreply@github.com>2020-10-22 14:12:41 -0700
commitc1539132110d3f8d20d22efb4b3f6a16fafd0e63 (patch)
tree88dc81a22a3732da92f2327ee5bcc93b05f1cf5a /tokio/tests
parent358e4f9f8029b6b289f8ef5a54bd7c6eae5bf969 (diff)
io: Add AsyncFd, fix io::driver shutdown (#2903)
* io: Add AsyncFd This adds AsyncFd, a unix-only structure to allow for read/writability states to be monitored for arbitrary file descriptors. Issue: #2728 * driver: fix shutdown notification unreliability Previously, there was a race window in which an IO driver shutting down could fail to notify ScheduledIo instances of this state; in particular, notification of outstanding ScheduledIo registrations was driven by `Driver::drop`, but registrations bypass `Driver` and go directly to a `Weak<Inner>`. The `Driver` holds the `Arc<Inner>` keeping `Inner` alive, but it's possible that a new handle could be registered (or a new readiness future created for an existing handle) after the `Driver::drop` handler runs and prior to `Inner` being dropped. This change fixes this in two parts: First, notification of outstanding ScheduledIo handles is pushed down into the drop method of `Inner` instead, and, second, we add state to ScheduledIo to ensure that we remember that the IO driver we're bound to has shut down after the initial shutdown notification, so that subsequent readiness future registrations can immediately return (instead of potentially blocking indefinitely). Fixes: #2924
Diffstat (limited to 'tokio/tests')
-rw-r--r--tokio/tests/io_async_fd.rs604
1 files changed, 604 insertions, 0 deletions
diff --git a/tokio/tests/io_async_fd.rs b/tokio/tests/io_async_fd.rs
new file mode 100644
index 00000000..0303eff6
--- /dev/null
+++ b/tokio/tests/io_async_fd.rs
@@ -0,0 +1,604 @@
+#![warn(rust_2018_idioms)]
+#![cfg(all(unix, feature = "full"))]
+
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::sync::{
+ atomic::{AtomicBool, Ordering},
+ Arc,
+};
+use std::time::Duration;
+use std::{
+ future::Future,
+ io::{self, ErrorKind, Read, Write},
+ task::{Context, Waker},
+};
+
+use nix::errno::Errno;
+use nix::unistd::{close, read, write};
+
+use futures::{poll, FutureExt};
+
+use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard};
+use tokio_test::{assert_err, assert_pending};
+
+struct TestWaker {
+ inner: Arc<TestWakerInner>,
+ waker: Waker,
+}
+
+#[derive(Default)]
+struct TestWakerInner {
+ awoken: AtomicBool,
+}
+
+impl futures::task::ArcWake for TestWakerInner {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ arc_self.awoken.store(true, Ordering::SeqCst);
+ }
+}
+
+impl TestWaker {
+ fn new() -> Self {
+ let inner: Arc<TestWakerInner> = Default::default();
+
+ Self {
+ inner: inner.clone(),
+ waker: futures::task::waker(inner),
+ }
+ }
+
+ fn awoken(&self) -> bool {
+ self.inner.awoken.swap(false, Ordering::SeqCst)
+ }
+
+ fn context(&self) -> Context<'_> {
+ Context::from_waker(&self.waker)
+ }
+}
+
+fn is_blocking(e: &nix::Error) -> bool {
+ Some(Errno::EAGAIN) == e.as_errno()
+}
+
+#[derive(Debug)]
+struct FileDescriptor {
+ fd: RawFd,
+}
+
+impl AsRawFd for FileDescriptor {
+ fn as_raw_fd(&self) -> RawFd {
+ self.fd
+ }
+}
+
+impl Read for &FileDescriptor {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ match read(self.fd, buf) {
+ Ok(n) => Ok(n),
+ Err(e) if is_blocking(&e) => Err(ErrorKind::WouldBlock.into()),
+ Err(e) => Err(io::Error::new(ErrorKind::Other, e)),
+ }
+ }
+}
+
+impl Read for FileDescriptor {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ (self as &Self).read(buf)
+ }
+}
+
+impl Write for &FileDescriptor {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ match write(self.fd, buf) {
+ Ok(n) => Ok(n),
+ Err(e) if is_blocking(&e) => Err(ErrorKind::WouldBlock.into()),
+ Err(e) => Err(io::Error::new(ErrorKind::Other, e)),
+ }
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+impl Write for FileDescriptor {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ (self as &Self).write(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ (self as &Self).flush()
+ }
+}
+
+impl Drop for FileDescriptor {
+ fn drop(&mut self) {
+ let _ = close(self.fd);
+ }
+}
+
+fn set_nonblocking(fd: RawFd) {
+ use nix::fcntl::{OFlag, F_GETFL, F_SETFL};
+
+ let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)");
+
+ if flags < 0 {
+ panic!(
+ "bad return value from fcntl(F_GETFL): {} ({:?})",
+ flags,
+ nix::Error::last()
+ );
+ }
+
+ let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
+
+ nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)");
+}
+
+fn socketpair() -> (FileDescriptor, FileDescriptor) {
+ use nix::sys::socket::{self, AddressFamily, SockFlag, SockType};
+
+ let (fd_a, fd_b) = socket::socketpair(
+ AddressFamily::Unix,
+ SockType::Stream,
+ None,
+ SockFlag::empty(),
+ )
+ .expect("socketpair");
+ let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b });
+
+ set_nonblocking(fds.0.fd);
+ set_nonblocking(fds.1.fd);
+
+ fds
+}
+
+fn drain(mut fd: &FileDescriptor) {
+ let mut buf = [0u8; 512];
+
+ loop {
+ match fd.read(&mut buf[..]) {
+ Err(e) if e.kind() == ErrorKind::WouldBlock => break,
+ Ok(0) => panic!("unexpected EOF"),
+ Err(e) => panic!("unexpected error: {:?}", e),
+ Ok(_) => continue,
+ }
+ }
+}
+
+#[tokio::test]
+async fn initially_writable() {
+ let (a, b) = socketpair();
+
+ let afd_a = AsyncFd::new(a).unwrap();
+ let afd_b = AsyncFd::new(b).unwrap();
+
+ afd_a.writable().await.unwrap().clear_ready();
+ afd_b.writable().await.unwrap().clear_ready();
+
+ futures::select_biased! {
+ _ = tokio::time::sleep(Duration::from_millis(10)).fuse() => {},
+ _ = afd_a.readable().fuse() => panic!("Unexpected readable state"),
+ _ = afd_b.readable().fuse() => panic!("Unexpected readable state"),
+ }
+}
+
+#[tokio::test]
+async fn reset_readable() {
+ let (a, mut b) = socketpair();
+
+ let afd_a = AsyncFd::new(a).unwrap();
+
+ let readable = afd_a.readable();
+ tokio::pin!(readable);
+
+ tokio::select! {
+ _ = readable.as_mut() => panic!(),
+ _ = tokio::time::sleep(Duration::from_millis(10)) => {}
+ }
+
+ b.write_all(b"0").unwrap();
+
+ let mut guard = readable.await.unwrap();
+
+ guard.with_io(|| afd_a.get_ref().read(&mut [0])).unwrap();
+
+ // `a` is not readable, but the reactor still thinks it is
+ // (because we have not observed a not-ready error yet)
+ afd_a.readable().await.unwrap().retain_ready();
+
+ // Explicitly clear the ready state
+ guard.clear_ready();
+
+ let readable = afd_a.readable();
+ tokio::pin!(readable);
+
+ tokio::select! {
+ _ = readable.as_mut() => panic!(),
+ _ = tokio::time::sleep(Duration::from_millis(10)) => {}
+ }
+
+ b.write_all(b"0").unwrap();
+
+ // We can observe the new readable event
+ afd_a.readable().await.unwrap().clear_ready();
+}
+
+#[tokio::test]
+async fn reset_writable() {
+ let (a, b) = socketpair();
+
+ let afd_a = AsyncFd::new(a).unwrap();
+
+ let mut guard = afd_a.writable().await.unwrap();
+
+ // Write until we get a WouldBlock. This also clears the ready state.
+ loop {
+ if let Err(e) = guard.with_io(|| afd_a.get_ref().write(&[0; 512][..])) {
+ assert_eq!(ErrorKind::WouldBlock, e.kind());
+ break;
+ }
+ }
+
+ // Writable state should be cleared now.
+ let writable = afd_a.writable();
+ tokio::pin!(writable);
+
+ tokio::select! {
+ _ = writable.as_mut() => panic!(),
+ _ = tokio::time::sleep(Duration::from_millis(10)) => {}
+ }
+
+ // Read from the other side; we should become writable now.
+ drain(&b);
+
+ let _ = writable.await.unwrap();
+}
+
+#[derive(Debug)]
+struct ArcFd<T>(Arc<T>);
+impl<T: AsRawFd> AsRawFd for ArcFd<T> {
+ fn as_raw_fd(&self) -> RawFd {
+ self.0.as_raw_fd()
+ }
+}
+
+#[tokio::test]
+async fn drop_closes() {
+ let (a, mut b) = socketpair();
+
+ let afd_a = AsyncFd::new(a).unwrap();
+
+ assert_eq!(
+ ErrorKind::WouldBlock,
+ b.read(&mut [0]).err().unwrap().kind()
+ );
+
+ std::mem::drop(afd_a);
+
+ assert_eq!(0, b.read(&mut [0]).unwrap());
+
+ // into_inner does not close the fd
+
+ let (a, mut b) = socketpair();
+ let afd_a = AsyncFd::new(a).unwrap();
+ let _a: FileDescriptor = afd_a.into_inner();
+
+ assert_eq!(
+ ErrorKind::WouldBlock,
+ b.read(&mut [0]).err().unwrap().kind()
+ );
+
+ // Drop closure behavior is delegated to the inner object
+ let (a, mut b) = socketpair();
+ let arc_fd = Arc::new(a);
+ let afd_a = AsyncFd::new(ArcFd(arc_fd.clone())).unwrap();
+ std::mem::drop(afd_a);
+
+ assert_eq!(
+ ErrorKind::WouldBlock,
+ b.read(&mut [0]).err().unwrap().kind()
+ );
+
+ std::mem::drop(arc_fd); // suppress unnecessary clone clippy warning
+}
+
+#[tokio::test]
+async fn with_poll() {
+ use std::task::Poll;
+
+ let (a, mut b) = socketpair();
+
+ b.write_all(b"0").unwrap();
+
+ let afd_a = AsyncFd::new(a).unwrap();
+
+ let mut guard = afd_a.readable().await.unwrap();
+
+ afd_a.get_ref().read_exact(&mut [0]).unwrap();
+
+ // Should not clear the readable state
+ let _ = guard.with_poll(|| Poll::Ready(()));
+
+ // Still readable...
+ let _ = afd_a.readable().await.unwrap();
+
+ // Should clear the readable state
+ let _ = guard.with_poll(|| Poll::Pending::<()>);
+
+ // Assert not readable
+ let readable = afd_a.readable();
+ tokio::pin!(readable);
+
+ tokio::select! {
+ _ = readable.as_mut() => panic!(),
+ _ = tokio::time::sleep(Duration::from_millis(10)) => {}
+ }
+
+ // Write something down b again and make sure we're reawoken
+ b.write_all(b"0").unwrap();
+ let _ = readable.await.unwrap();
+}
+
+#[tokio::test]
+async fn multiple_waiters() {
+ let (a, mut b) = socketpair();
+ let afd_a = Arc::new(AsyncFd::new(a).unwrap());
+
+ let barrier = Arc::new(tokio::sync::Barrier::new(11));
+
+ let mut tasks = Vec::new();
+ for _ in 0..10 {
+ let afd_a = afd_a.clone();
+ let barrier = barrier.clone();
+
+ let f = async move {
+ let notify_barrier = async {
+ barrier.wait().await;
+ futures::future::pending::<()>().await;
+ };
+
+ futures::select_biased! {
+ guard = afd_a.readable().fuse() => {
+ tokio::task::yield_now().await;
+ guard.unwrap().clear_ready()
+ },
+ _ = notify_barrier.fuse() => unreachable!(),
+ }
+
+ std::mem::drop(afd_a);
+ };
+
+ tasks.push(tokio::spawn(f));
+ }
+
+ let mut all_tasks = futures::future::try_join_all(tasks);
+
+ tokio::select! {
+ r = std::pin::Pin::new(&mut all_tasks) => {
+ r.unwrap(); // propagate panic
+ panic!("Tasks exited unexpectedly")
+ },
+ _ = barrier.wait() => {}
+ };
+
+ b.write_all(b"0").unwrap();
+
+ all_tasks.await.unwrap();
+}
+
+#[tokio::test]
+async fn poll_fns() {
+ let (a, b) = socketpair();
+ let afd_a = Arc::new(AsyncFd::new(a).unwrap());
+ let afd_b = Arc::new(AsyncFd::new(b).unwrap());
+
+ // Fill up the write side of A
+ while afd_a.get_ref().write(&[0; 512]).is_ok() {}
+
+ let waker = TestWaker::new();
+
+ assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context()));
+
+ let afd_a_2 = afd_a.clone();
+ let r_barrier = Arc::new(tokio::sync::Barrier::new(2));
+ let barrier_clone = r_barrier.clone();
+
+ let read_fut = tokio::spawn(async move {
+ // Move waker onto this task first
+ assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
+ .as_ref()
+ .poll_read_ready(cx))));
+ barrier_clone.wait().await;
+
+ let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await;
+ });
+
+ let afd_a_2 = afd_a.clone();
+ let w_barrier = Arc::new(tokio::sync::Barrier::new(2));
+ let barrier_clone = w_barrier.clone();
+
+ let mut write_fut = tokio::spawn(async move {
+ // Move waker onto this task first
+ assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
+ .as_ref()
+ .poll_write_ready(cx))));
+ barrier_clone.wait().await;
+
+ let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await;
+ });
+
+ r_barrier.wait().await;
+ w_barrier.wait().await;
+
+ let readable = afd_a.readable();
+ tokio::pin!(readable);
+
+ tokio::select! {
+ _ = &mut readable => unreachable!(),
+ _ = tokio::task::yield_now() => {}
+ }
+
+ // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly
+ afd_b.get_ref().write_all(b"0").unwrap();
+
+ let _ = tokio::join!(readable, read_fut);
+
+ // Our original waker should _not_ be awoken (poll_read_ready retains only the last context)
+ assert!(!waker.awoken());
+
+ // The writable side should not be awoken
+ tokio::select! {
+ _ = &mut write_fut => unreachable!(),
+ _ = tokio::time::sleep(Duration::from_millis(5)) => {}
+ }
+
+ // Make it writable now
+ drain(afd_b.get_ref());
+
+ // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
+ let _ = write_fut.await;
+}
+
+fn assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>> {
+ let mut pinned = Box::pin(f);
+
+ assert_pending!(pinned
+ .as_mut()
+ .poll(&mut Context::from_waker(futures::task::noop_waker_ref())));
+
+ pinned
+}
+
+fn rt() -> tokio::runtime::Runtime {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+}
+
+#[test]
+fn driver_shutdown_wakes_currently_pending() {
+ let rt = rt();
+
+ let (a, _b) = socketpair();
+ let afd_a = {
+ let _enter = rt.enter();
+ AsyncFd::new(a).unwrap()
+ };
+
+ let readable = assert_pending(afd_a.readable());
+
+ std::mem::drop(rt);
+
+ // Being awoken by a rt drop does not return an error, currently...
+ let _ = futures::executor::block_on(readable).unwrap();
+
+ // However, attempting to initiate a readiness wait when the rt is dropped is an error
+ assert_err!(futures::executor::block_on(afd_a.readable()));
+}
+
+#[test]
+fn driver_shutdown_wakes_future_pending() {
+ let rt = rt();
+
+ let (a, _b) = socketpair();
+ let afd_a = {
+ let _enter = rt.enter();
+ AsyncFd::new(a).unwrap()
+ };
+
+ std::mem::drop(rt);
+
+ assert_err!(futures::executor::block_on(afd_a.readable()));
+}
+
+#[test]
+fn driver_shutdown_wakes_pending_race() {
+ // TODO: make this a loom test
+ for _ in 0..100 {
+ let rt = rt();
+
+ let (a, _b) = socketpair();
+ let afd_a = {
+ let _enter = rt.enter();
+ AsyncFd::new(a).unwrap()
+ };
+
+ let _ = std::thread::spawn(move || std::mem::drop(rt));
+
+ // This may or may not return an error (but will be awoken)
+ let _ = futures::executor::block_on(afd_a.readable());
+
+ // However retrying will always return an error
+ assert_err!(futures::executor::block_on(afd_a.readable()));
+ }
+}
+
+async fn poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
+ futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await
+}
+
+async fn poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
+ futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await
+}
+
+#[test]
+fn driver_shutdown_wakes_currently_pending_polls() {
+ let rt = rt();
+
+ let (a, _b) = socketpair();
+ let afd_a = {
+ let _enter = rt.enter();
+ AsyncFd::new(a).unwrap()
+ };
+
+ while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
+
+ let readable = assert_pending(poll_readable(&afd_a));
+ let writable = assert_pending(poll_writable(&afd_a));
+
+ std::mem::drop(rt);
+
+ // Attempting to poll readiness when the rt is dropped is an error
+ assert_err!(futures::executor::block_on(readable));
+ assert_err!(futures::executor::block_on(writable));
+}
+
+#[test]
+fn driver_shutdown_wakes_poll() {
+ let rt = rt();
+
+ let (a, _b) = socketpair();
+ let afd_a = {
+ let _enter = rt.enter();
+ AsyncFd::new(a).unwrap()
+ };
+
+ std::mem::drop(rt);
+
+ assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
+ assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
+}
+
+#[test]
+fn driver_shutdown_wakes_poll_race() {
+ // TODO: make this a loom test
+ for _ in 0..100 {
+ let rt = rt();
+
+ let (a, _b) = socketpair();
+ let afd_a = {
+ let _enter = rt.enter();
+ AsyncFd::new(a).unwrap()
+ };
+
+ while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
+
+ let _ = std::thread::spawn(move || std::mem::drop(rt));
+
+ // The poll variants will always return an error in this case
+ assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
+ assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
+ }
+}