diff options
author | Carl Lerche <me@carllerche.com> | 2019-12-17 20:52:09 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-12-17 20:52:09 -0800 |
commit | 41d15ea212525f4be310d65c29c626488af546e1 (patch) | |
tree | 63e1a8ee0fb85c4cb4e6e8be2d0a02704b618760 | |
parent | 8add90210bd626f2afcb955c7ce4106179ae6cdc (diff) |
rt: avoid dropping a task in calls to wake() (#1972)
Calls to tasks should not be nested. Currently, while a task is being
executed and the runtime is shutting down, a call to wake() can result
in the wake target to be dropped. This, in turn, results in the drop
handler being called.
If the user holds a ref cell borrow, a mutex guard, or any such value,
dropping the task inline can result in a deadlock.
The fix is to permit tasks to be scheduled during the shutdown process
and dropping the tasks once they are popped from the queue.
Fixes #1929, #1886
-rw-r--r-- | tokio/src/runtime/basic_scheduler.rs | 54 | ||||
-rw-r--r-- | tokio/src/task/local.rs | 11 | ||||
-rw-r--r-- | tokio/src/task/queue.rs | 16 | ||||
-rw-r--r-- | tokio/tests/rt_basic.rs | 36 | ||||
-rw-r--r-- | tokio/tests/task_local_set.rs | 50 |
5 files changed, 141 insertions, 26 deletions
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 0bce72ac..53b8bcc9 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -84,7 +84,7 @@ where F::Output: Send + 'static, { let (task, handle) = task::joinable(future); - self.scheduler.schedule(task); + self.scheduler.schedule(task, true); handle } @@ -161,7 +161,7 @@ impl Spawner { F::Output: Send + 'static, { let (task, handle) = task::joinable(future); - self.scheduler.schedule(task); + self.scheduler.schedule(task, true); handle } @@ -230,6 +230,27 @@ impl SchedulerPriv { self.queues.push_local(task); handle } + + fn schedule(&self, task: Task<Self>, spawn: bool) { + let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv); + + if is_current { + unsafe { + // safety: this function is safe to call only from the + // thread the basic scheduler is running on. If `is_current` is + // then we are on that thread. + self.queues.push_local(task) + }; + } else { + let mut lock = self.queues.remote(); + lock.schedule(task, spawn); + + // while locked, call unpark + self.unpark.unpark(); + + drop(lock); + } + } } impl Schedule for SchedulerPriv { @@ -260,24 +281,7 @@ impl Schedule for SchedulerPriv { } fn schedule(&self, task: Task<Self>) { - let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv); - - if is_current { - unsafe { - // safety: this function is safe to call only from the - // thread the basic scheduler is running on. If `is_current` is - // then we are on that thread. - self.queues.push_local(task) - }; - } else { - let mut lock = self.queues.remote(); - lock.schedule(task); - - // while locked, call unpark - self.unpark.unpark(); - - drop(lock); - } + SchedulerPriv::schedule(self, task, false); } } @@ -298,10 +302,16 @@ where } // Wait until all tasks have been released. - while unsafe { self.scheduler.queues.has_tasks_remaining() } { - self.local.park.park().ok().expect("park failed"); + loop { unsafe { self.scheduler.queues.drain_pending_drop(); + self.scheduler.queues.drain_queues(); + + if !self.scheduler.queues.has_tasks_remaining() { + break; + } + + self.local.park.park().ok().expect("park failed"); } } } diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 203a2f39..d43187a8 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -345,7 +345,7 @@ impl Schedule for Scheduler { unsafe { self.queues.push_local(task) }; } else { let mut lock = self.queues.remote(); - lock.schedule(task); + lock.schedule(task, false); self.waker.wake(); @@ -437,8 +437,15 @@ impl Drop for Scheduler { // Wait until all tasks have been released. // XXX: this is a busy loop, but we don't really have any way to park // the thread here? - while self.queues.has_tasks_remaining() { + loop { self.queues.drain_pending_drop(); + self.queues.drain_queues(); + + if !self.queues.has_tasks_remaining() { + break; + } + + std::thread::yield_now(); } } } diff --git a/tokio/src/task/queue.rs b/tokio/src/task/queue.rs index 6a004fc7..a2badc8a 100644 --- a/tokio/src/task/queue.rs +++ b/tokio/src/task/queue.rs @@ -233,6 +233,16 @@ where self.drain_pending_drop(); } + /// Drain both the local and remote run queues, shutting down any tasks. + /// + /// # Safety + /// + /// This *must* be called only from the thread that owns the scheduler. + pub(crate) unsafe fn drain_queues(&self) { + self.close_local(); + self.close_remote(); + } + /// Shut down the scheduler's owned task list. /// /// # Safety @@ -300,8 +310,10 @@ where /// If the queue is open to accept new tasks, the task is pushed to the back /// of the queue. Otherwise, if the queue is closed (the scheduler is /// shutting down), the new task will be shut down immediately. - pub(crate) fn schedule(&mut self, task: Task<S>) { - if self.open { + /// + /// `spawn` should be set if the caller is spawning a new task. + pub(crate) fn schedule(&mut self, task: Task<S>, spawn: bool) { + if !spawn || self.open { self.queue.push_back(task); } else { task.shutdown(); diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 68e09ef3..4f0beab9 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -27,6 +27,42 @@ fn spawned_task_does_not_progress_without_block_on() { assert_eq!(out, "hello"); } +#[test] +fn acquire_mutex_in_drop() { + use futures::future::pending; + use tokio::task; + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + let mut rt = rt(); + + rt.spawn(async move { + let _ = rx2.await; + unreachable!(); + }); + + rt.spawn(async move { + let _ = rx1.await; + let _ = tx2.send(()).unwrap(); + unreachable!(); + }); + + // Spawn a task that will never notify + rt.spawn(async move { + pending::<()>().await; + tx1.send(()).unwrap(); + }); + + // Tick the loop + rt.block_on(async { + task::yield_now().await; + }); + + // Drop the rt + drop(rt); +} + fn rt() -> Runtime { tokio::runtime::Builder::new() .basic_scheduler() diff --git a/tokio/tests/task_local_set.rs b/tokio/tests/task_local_set.rs new file mode 100644 index 00000000..f5014275 --- /dev/null +++ b/tokio/tests/task_local_set.rs @@ -0,0 +1,50 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::runtime::Runtime; +use tokio::sync::oneshot; +use tokio::task::{self, LocalSet}; + +#[test] +fn acquire_mutex_in_drop() { + use futures::future::pending; + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + let mut rt = rt(); + let local = LocalSet::new(); + + local.spawn_local(async move { + let _ = rx2.await; + unreachable!(); + }); + + local.spawn_local(async move { + let _ = rx1.await; + let _ = tx2.send(()).unwrap(); + unreachable!(); + }); + + // Spawn a task that will never notify + local.spawn_local(async move { + pending::<()>().await; + tx1.send(()).unwrap(); + }); + + // Tick the loop + local.block_on(&mut rt, async { + task::yield_now().await; + }); + + // Drop the LocalSet + drop(local); +} + +fn rt() -> Runtime { + tokio::runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap() +} |