summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/basic_scheduler.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-12-17 20:52:09 -0800
committerGitHub <noreply@github.com>2019-12-17 20:52:09 -0800
commit41d15ea212525f4be310d65c29c626488af546e1 (patch)
tree63e1a8ee0fb85c4cb4e6e8be2d0a02704b618760 /tokio/src/runtime/basic_scheduler.rs
parent8add90210bd626f2afcb955c7ce4106179ae6cdc (diff)
rt: avoid dropping a task in calls to wake() (#1972)
Calls to tasks should not be nested. Currently, while a task is being executed and the runtime is shutting down, a call to wake() can result in the wake target to be dropped. This, in turn, results in the drop handler being called. If the user holds a ref cell borrow, a mutex guard, or any such value, dropping the task inline can result in a deadlock. The fix is to permit tasks to be scheduled during the shutdown process and dropping the tasks once they are popped from the queue. Fixes #1929, #1886
Diffstat (limited to 'tokio/src/runtime/basic_scheduler.rs')
-rw-r--r--tokio/src/runtime/basic_scheduler.rs54
1 files changed, 32 insertions, 22 deletions
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs
index 0bce72ac..53b8bcc9 100644
--- a/tokio/src/runtime/basic_scheduler.rs
+++ b/tokio/src/runtime/basic_scheduler.rs
@@ -84,7 +84,7 @@ where
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
- self.scheduler.schedule(task);
+ self.scheduler.schedule(task, true);
handle
}
@@ -161,7 +161,7 @@ impl Spawner {
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
- self.scheduler.schedule(task);
+ self.scheduler.schedule(task, true);
handle
}
@@ -230,6 +230,27 @@ impl SchedulerPriv {
self.queues.push_local(task);
handle
}
+
+ fn schedule(&self, task: Task<Self>, spawn: bool) {
+ let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv);
+
+ if is_current {
+ unsafe {
+ // safety: this function is safe to call only from the
+ // thread the basic scheduler is running on. If `is_current` is
+ // then we are on that thread.
+ self.queues.push_local(task)
+ };
+ } else {
+ let mut lock = self.queues.remote();
+ lock.schedule(task, spawn);
+
+ // while locked, call unpark
+ self.unpark.unpark();
+
+ drop(lock);
+ }
+ }
}
impl Schedule for SchedulerPriv {
@@ -260,24 +281,7 @@ impl Schedule for SchedulerPriv {
}
fn schedule(&self, task: Task<Self>) {
- let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv);
-
- if is_current {
- unsafe {
- // safety: this function is safe to call only from the
- // thread the basic scheduler is running on. If `is_current` is
- // then we are on that thread.
- self.queues.push_local(task)
- };
- } else {
- let mut lock = self.queues.remote();
- lock.schedule(task);
-
- // while locked, call unpark
- self.unpark.unpark();
-
- drop(lock);
- }
+ SchedulerPriv::schedule(self, task, false);
}
}
@@ -298,10 +302,16 @@ where
}
// Wait until all tasks have been released.
- while unsafe { self.scheduler.queues.has_tasks_remaining() } {
- self.local.park.park().ok().expect("park failed");
+ loop {
unsafe {
self.scheduler.queues.drain_pending_drop();
+ self.scheduler.queues.drain_queues();
+
+ if !self.scheduler.queues.has_tasks_remaining() {
+ break;
+ }
+
+ self.local.park.park().ok().expect("park failed");
}
}
}