diff options
author | Carl Lerche <me@carllerche.com> | 2020-01-08 21:23:10 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-08 21:23:10 -0800 |
commit | 6406328176cdecf15cad69b327597a4d4d0b8e20 (patch) | |
tree | b802f2ac711188d4338169d7ff175bf917147126 /tokio/tests/rt_common.rs | |
parent | f28c9f0d17a4dca2003bbee57a09f62c3795c2d2 (diff) |
rt: fix threaded scheduler shutdown deadlock (#2074)
Previously, if an IO event was received during the runtime shutdown
process, it was possible to enter a deadlock. This was due to the
scheduler shutdown logic not expecting tasks to get scheduled once the
worker was in the shutdown process.
This patch fixes the deadlock by checking the queues for new tasks after
each call to park. If a new task is received, it is forcefully shutdown.
Fixes #2061
Diffstat (limited to 'tokio/tests/rt_common.rs')
-rw-r--r-- | tokio/tests/rt_common.rs | 94 |
1 files changed, 93 insertions, 1 deletions
diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 15f5de6c..31edd10a 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -41,7 +41,7 @@ fn send_sync_bound() { } rt_test! { - use tokio::net::{TcpListener, TcpStream}; + use tokio::net::{TcpListener, TcpStream, UdpSocket}; use tokio::prelude::*; use tokio::runtime::Runtime; use tokio::sync::oneshot; @@ -618,6 +618,98 @@ rt_test! { } #[test] + fn wake_while_rt_is_dropping() { + use tokio::task; + + struct OnDrop<F: FnMut()>(F); + + impl<F: FnMut()> Drop for OnDrop<F> { + fn drop(&mut self) { + (self.0)() + } + } + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + let (tx3, rx3) = oneshot::channel(); + + let mut rt = rt(); + + let h1 = rt.handle().clone(); + + rt.handle().spawn(async move { + // Ensure a waker gets stored in oneshot 1. + let _ = rx1.await; + tx3.send(()).unwrap(); + }); + + rt.handle().spawn(async move { + // When this task is dropped, we'll be "closing remotes". + // We spawn a new task that owns the `tx1`, to move its Drop + // out of here. + // + // Importantly, the oneshot 1 has a waker already stored, so + // the eventual drop here will try to re-schedule again. + let mut opt_tx1 = Some(tx1); + let _d = OnDrop(move || { + let tx1 = opt_tx1.take().unwrap(); + h1.spawn(async move { + tx1.send(()).unwrap(); + }); + }); + let _ = rx2.await; + }); + + rt.handle().spawn(async move { + let _ = rx3.await; + // We'll never get here, but once task 3 drops, this will + // force task 2 to re-schedule since it's waiting on oneshot 2. + tx2.send(()).unwrap(); + }); + + // Tick the loop + rt.block_on(async { + task::yield_now().await; + }); + + // Drop the rt + drop(rt); + } + + #[test] + fn io_notify_while_shutting_down() { + use std::net::Ipv6Addr; + + for _ in 1..100 { + let mut runtime = rt(); + + runtime.block_on(async { + let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap(); + let addr = socket.local_addr().unwrap(); + let (mut recv_half, mut send_half) = socket.split(); + + tokio::spawn(async move { + let mut buf = [0]; + loop { + recv_half.recv_from(&mut buf).await.unwrap(); + std::thread::sleep(Duration::from_millis(2)); + } + }); + + tokio::spawn(async move { + let buf = [0]; + loop { + send_half.send_to(&buf, &addr).await.unwrap(); + tokio::time::delay_for(Duration::from_millis(1)).await; + } + }); + + tokio::time::delay_for(Duration::from_millis(5)).await; + }); + } + } + + #[test] fn runtime_in_thread_local() { use std::cell::RefCell; use std::thread; |