summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-12-17 20:52:09 -0800
committerGitHub <noreply@github.com>2019-12-17 20:52:09 -0800
commit41d15ea212525f4be310d65c29c626488af546e1 (patch)
tree63e1a8ee0fb85c4cb4e6e8be2d0a02704b618760
parent8add90210bd626f2afcb955c7ce4106179ae6cdc (diff)
rt: avoid dropping a task in calls to wake() (#1972)
Calls to tasks should not be nested. Currently, while a task is being executed and the runtime is shutting down, a call to wake() can result in the wake target to be dropped. This, in turn, results in the drop handler being called. If the user holds a ref cell borrow, a mutex guard, or any such value, dropping the task inline can result in a deadlock. The fix is to permit tasks to be scheduled during the shutdown process and dropping the tasks once they are popped from the queue. Fixes #1929, #1886
-rw-r--r--tokio/src/runtime/basic_scheduler.rs54
-rw-r--r--tokio/src/task/local.rs11
-rw-r--r--tokio/src/task/queue.rs16
-rw-r--r--tokio/tests/rt_basic.rs36
-rw-r--r--tokio/tests/task_local_set.rs50
5 files changed, 141 insertions, 26 deletions
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs
index 0bce72ac..53b8bcc9 100644
--- a/tokio/src/runtime/basic_scheduler.rs
+++ b/tokio/src/runtime/basic_scheduler.rs
@@ -84,7 +84,7 @@ where
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
- self.scheduler.schedule(task);
+ self.scheduler.schedule(task, true);
handle
}
@@ -161,7 +161,7 @@ impl Spawner {
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
- self.scheduler.schedule(task);
+ self.scheduler.schedule(task, true);
handle
}
@@ -230,6 +230,27 @@ impl SchedulerPriv {
self.queues.push_local(task);
handle
}
+
+ fn schedule(&self, task: Task<Self>, spawn: bool) {
+ let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv);
+
+ if is_current {
+ unsafe {
+ // safety: this function is safe to call only from the
+ // thread the basic scheduler is running on. If `is_current` is
+ // then we are on that thread.
+ self.queues.push_local(task)
+ };
+ } else {
+ let mut lock = self.queues.remote();
+ lock.schedule(task, spawn);
+
+ // while locked, call unpark
+ self.unpark.unpark();
+
+ drop(lock);
+ }
+ }
}
impl Schedule for SchedulerPriv {
@@ -260,24 +281,7 @@ impl Schedule for SchedulerPriv {
}
fn schedule(&self, task: Task<Self>) {
- let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv);
-
- if is_current {
- unsafe {
- // safety: this function is safe to call only from the
- // thread the basic scheduler is running on. If `is_current` is
- // then we are on that thread.
- self.queues.push_local(task)
- };
- } else {
- let mut lock = self.queues.remote();
- lock.schedule(task);
-
- // while locked, call unpark
- self.unpark.unpark();
-
- drop(lock);
- }
+ SchedulerPriv::schedule(self, task, false);
}
}
@@ -298,10 +302,16 @@ where
}
// Wait until all tasks have been released.
- while unsafe { self.scheduler.queues.has_tasks_remaining() } {
- self.local.park.park().ok().expect("park failed");
+ loop {
unsafe {
self.scheduler.queues.drain_pending_drop();
+ self.scheduler.queues.drain_queues();
+
+ if !self.scheduler.queues.has_tasks_remaining() {
+ break;
+ }
+
+ self.local.park.park().ok().expect("park failed");
}
}
}
diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs
index 203a2f39..d43187a8 100644
--- a/tokio/src/task/local.rs
+++ b/tokio/src/task/local.rs
@@ -345,7 +345,7 @@ impl Schedule for Scheduler {
unsafe { self.queues.push_local(task) };
} else {
let mut lock = self.queues.remote();
- lock.schedule(task);
+ lock.schedule(task, false);
self.waker.wake();
@@ -437,8 +437,15 @@ impl Drop for Scheduler {
// Wait until all tasks have been released.
// XXX: this is a busy loop, but we don't really have any way to park
// the thread here?
- while self.queues.has_tasks_remaining() {
+ loop {
self.queues.drain_pending_drop();
+ self.queues.drain_queues();
+
+ if !self.queues.has_tasks_remaining() {
+ break;
+ }
+
+ std::thread::yield_now();
}
}
}
diff --git a/tokio/src/task/queue.rs b/tokio/src/task/queue.rs
index 6a004fc7..a2badc8a 100644
--- a/tokio/src/task/queue.rs
+++ b/tokio/src/task/queue.rs
@@ -233,6 +233,16 @@ where
self.drain_pending_drop();
}
+ /// Drain both the local and remote run queues, shutting down any tasks.
+ ///
+ /// # Safety
+ ///
+ /// This *must* be called only from the thread that owns the scheduler.
+ pub(crate) unsafe fn drain_queues(&self) {
+ self.close_local();
+ self.close_remote();
+ }
+
/// Shut down the scheduler's owned task list.
///
/// # Safety
@@ -300,8 +310,10 @@ where
/// If the queue is open to accept new tasks, the task is pushed to the back
/// of the queue. Otherwise, if the queue is closed (the scheduler is
/// shutting down), the new task will be shut down immediately.
- pub(crate) fn schedule(&mut self, task: Task<S>) {
- if self.open {
+ ///
+ /// `spawn` should be set if the caller is spawning a new task.
+ pub(crate) fn schedule(&mut self, task: Task<S>, spawn: bool) {
+ if !spawn || self.open {
self.queue.push_back(task);
} else {
task.shutdown();
diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs
index 68e09ef3..4f0beab9 100644
--- a/tokio/tests/rt_basic.rs
+++ b/tokio/tests/rt_basic.rs
@@ -27,6 +27,42 @@ fn spawned_task_does_not_progress_without_block_on() {
assert_eq!(out, "hello");
}
+#[test]
+fn acquire_mutex_in_drop() {
+ use futures::future::pending;
+ use tokio::task;
+
+ let (tx1, rx1) = oneshot::channel();
+ let (tx2, rx2) = oneshot::channel();
+
+ let mut rt = rt();
+
+ rt.spawn(async move {
+ let _ = rx2.await;
+ unreachable!();
+ });
+
+ rt.spawn(async move {
+ let _ = rx1.await;
+ let _ = tx2.send(()).unwrap();
+ unreachable!();
+ });
+
+ // Spawn a task that will never notify
+ rt.spawn(async move {
+ pending::<()>().await;
+ tx1.send(()).unwrap();
+ });
+
+ // Tick the loop
+ rt.block_on(async {
+ task::yield_now().await;
+ });
+
+ // Drop the rt
+ drop(rt);
+}
+
fn rt() -> Runtime {
tokio::runtime::Builder::new()
.basic_scheduler()
diff --git a/tokio/tests/task_local_set.rs b/tokio/tests/task_local_set.rs
new file mode 100644
index 00000000..f5014275
--- /dev/null
+++ b/tokio/tests/task_local_set.rs
@@ -0,0 +1,50 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+use tokio::runtime::Runtime;
+use tokio::sync::oneshot;
+use tokio::task::{self, LocalSet};
+
+#[test]
+fn acquire_mutex_in_drop() {
+ use futures::future::pending;
+
+ let (tx1, rx1) = oneshot::channel();
+ let (tx2, rx2) = oneshot::channel();
+
+ let mut rt = rt();
+ let local = LocalSet::new();
+
+ local.spawn_local(async move {
+ let _ = rx2.await;
+ unreachable!();
+ });
+
+ local.spawn_local(async move {
+ let _ = rx1.await;
+ let _ = tx2.send(()).unwrap();
+ unreachable!();
+ });
+
+ // Spawn a task that will never notify
+ local.spawn_local(async move {
+ pending::<()>().await;
+ tx1.send(()).unwrap();
+ });
+
+ // Tick the loop
+ local.block_on(&mut rt, async {
+ task::yield_now().await;
+ });
+
+ // Drop the LocalSet
+ drop(local);
+}
+
+fn rt() -> Runtime {
+ tokio::runtime::Builder::new()
+ .basic_scheduler()
+ .enable_all()
+ .build()
+ .unwrap()
+}