diff options
author | Eliza Weisman <eliza@buoyant.io> | 2019-12-04 11:20:57 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-12-04 11:20:57 -0800 |
commit | 0e729aa341028c121b9c39fe552ed4309bae6b6a (patch) | |
tree | e69cc9079cd351462f278effd424f1a34dd85ba2 /tokio/src/runtime/basic_scheduler.rs | |
parent | cbe369a3ed5c252ca7581e37986dc912d88e58c6 (diff) |
task: fix infinite loop when dropping a `LocalSet` (#1892)
## Motivation
There's currently an issue in `task::LocalSet` where dropping the local
set can result in an infinite loop if a task running in the local set is
notified from outside the local set (e.g. by a timer). This was reported
in issue #1885.
This issue exists because the `Drop` impl for `task::local::Scheduler`
does not drain the queue of tasks notified externally, the way the basic
scheduler does. Instead, only the local queue is drained, leaving some
tasks in place. Since these tasks are never removed, the loop that
continues trying to cancel tasks until the owned task list is totally
empty continues infinitely.
I think this issue was due to the `Drop` impl being written before a
remote queue was added to the local scheduler, and the need to close the
remote queue as well was overlooked.
## Solution
This branch solves the problem by clearing the local scheduler's remote
queue as well as the local one.
I've added a test that reproduces the behavior. The test fails on master
and passes after this change.
In addition, this branch factors out the common task queue logic in the
basic scheduler runtime and the `LocalSet` struct in `tokio::task`. This
is because as more work was done on the `LocalSet`, it has gotten closer
and closer to the basic scheduler in behavior, and factoring out the
shared code reduces the risk of errors caused by `LocalSet` not doing
something that the basic scheduler does. The queues are now encapsulated
by a `MpscQueues` struct in `tokio::task::queue` (crate-public). As a
follow-up, I'd also like to look into changing this type to use the same
remote queue type as the threadpool (a linked list).
In particular, I noticed the basic scheduler has a flag that indicates
the remote queue has been closed, which is set when dropping the
scheduler. This prevents tasks from being added after the scheduler has
started shutting down, stopping a potential task leak. Rather than
duplicating this code in `LocalSet`, I thought it was probably better to
factor it out into a shared type.
There are a few cases where there are small differences in behavior,
though, so there is still a need for separate types implemented _using_
the new `MpscQueues` struct. However, it should cover most of the
identical code.
Note that this diff is rather large, due to the refactoring. However, the
actual fix for the infinite loop is very simple. It can be reviewed on its own
by looking at commit 4f46ac6. The refactor is in a separate commit, with
the SHA 90b5b1f.
Fixes #1885
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Diffstat (limited to 'tokio/src/runtime/basic_scheduler.rs')
-rw-r--r-- | tokio/src/runtime/basic_scheduler.rs | 180 |
1 files changed, 60 insertions, 120 deletions
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index c674b961..0bce72ac 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -1,13 +1,12 @@ use crate::park::{Park, Unpark}; -use crate::task::{self, JoinHandle, Schedule, ScheduleSendOnly, Task}; +use crate::task::{self, queue::MpscQueues, JoinHandle, Schedule, ScheduleSendOnly, Task}; -use std::cell::{Cell, UnsafeCell}; -use std::collections::VecDeque; +use std::cell::Cell; use std::fmt; use std::future::Future; use std::mem::ManuallyDrop; use std::ptr; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::task::{RawWaker, RawWakerVTable, Waker}; use std::time::Duration; @@ -31,31 +30,7 @@ pub(crate) struct Spawner { /// The scheduler component. pub(super) struct SchedulerPriv { - /// List of all active tasks spawned onto this executor. - /// - /// # Safety - /// - /// Must only be accessed from the primary thread - owned_tasks: UnsafeCell<task::OwnedList<Self>>, - - /// Local run queue. - /// - /// Tasks notified from the current thread are pushed into this queue. - /// - /// # Safety - /// - /// References should not be handed out. Only call `push` / `pop` functions. - /// Only call from the owning thread. - local_queue: UnsafeCell<VecDeque<Task<SchedulerPriv>>>, - - /// Remote run queue. - /// - /// Tasks notified from another thread are pushed into this queue. - remote_queue: Mutex<RemoteQueue>, - - /// Tasks pending drop - pending_drop: task::TransferStack<Self>, - + queues: MpscQueues<Self>, /// Unpark the blocked thread unpark: Box<dyn Unpark>, } @@ -73,21 +48,9 @@ struct LocalState<P> { park: P, } -#[derive(Debug)] -struct RemoteQueue { - /// FIFO list of tasks - queue: VecDeque<Task<SchedulerPriv>>, - - /// `true` when a task can be pushed into the queue, false otherwise. - open: bool, -} - /// Max number of tasks to poll per tick. const MAX_TASKS_PER_TICK: usize = 61; -/// How often to check the remote queue first -const CHECK_REMOTE_INTERVAL: u8 = 13; - thread_local! { static ACTIVE: Cell<*const SchedulerPriv> = Cell::new(ptr::null()) } @@ -101,13 +64,7 @@ where BasicScheduler { scheduler: Arc::new(SchedulerPriv { - owned_tasks: UnsafeCell::new(task::OwnedList::new()), - local_queue: UnsafeCell::new(VecDeque::with_capacity(64)), - remote_queue: Mutex::new(RemoteQueue { - queue: VecDeque::with_capacity(64), - open: true, - }), - pending_drop: task::TransferStack::new(), + queues: MpscQueues::new(), unpark: Box::new(unpark), }), local: LocalState { tick: 0, park }, @@ -155,9 +112,7 @@ where // Track the current scheduler let _guard = ACTIVE.with(|cell| { - let guard = Guard { - old: cell.get(), - }; + let guard = Guard { old: cell.get() }; cell.set(scheduler as *const SchedulerPriv); @@ -188,7 +143,11 @@ where scheduler.tick(local); // Maintenance work - scheduler.drain_pending_drop(); + unsafe { + // safety: this function is safe to call only from the + // thread the basic scheduler is running on (which we are). + scheduler.queues.drain_pending_drop(); + } } }) } @@ -216,6 +175,8 @@ impl Spawner { } } +// === impl SchedulerPriv === + impl SchedulerPriv { fn tick(&self, local: &mut LocalState<impl Park>) { for _ in 0..MAX_TASKS_PER_TICK { @@ -224,8 +185,14 @@ impl SchedulerPriv { // Increment the tick local.tick = tick.wrapping_add(1); + let next = unsafe { + // safety: this function is safe to call only from the + // thread the basic scheduler is running on. The `LocalState` + // parameter to this method implies that we are on that thread. + self.queues.next_task(tick) + }; - let task = match self.next_task(tick) { + let task = match next { Some(task) => task, None => { local.park.park().ok().expect("failed to park"); @@ -235,7 +202,10 @@ impl SchedulerPriv { if let Some(task) = task.run(&mut || Some(self.into())) { unsafe { - self.schedule_local(task); + // safety: this function is safe to call only from the + // thread the basic scheduler is running on. The `LocalState` + // parameter to this method implies that we are on that thread. + self.queues.push_local(task); } } } @@ -247,15 +217,6 @@ impl SchedulerPriv { .expect("failed to park"); } - fn drain_pending_drop(&self) { - for task in self.pending_drop.drain() { - unsafe { - (*self.owned_tasks.get()).remove(&task); - } - drop(task); - } - } - /// # Safety /// /// Must be called from the same thread that holds the `BasicScheduler` @@ -266,63 +227,51 @@ impl SchedulerPriv { F::Output: Send + 'static, { let (task, handle) = task::joinable(future); - self.schedule_local(task); + self.queues.push_local(task); handle } - - unsafe fn schedule_local(&self, task: Task<Self>) { - (*self.local_queue.get()).push_back(task); - } - - fn next_task(&self, tick: u8) -> Option<Task<Self>> { - if 0 == tick % CHECK_REMOTE_INTERVAL { - self.next_remote_task().or_else(|| self.next_local_task()) - } else { - self.next_local_task().or_else(|| self.next_remote_task()) - } - } - - fn next_local_task(&self) -> Option<Task<Self>> { - unsafe { (*self.local_queue.get()).pop_front() } - } - - fn next_remote_task(&self) -> Option<Task<Self>> { - self.remote_queue.lock().unwrap().queue.pop_front() - } } impl Schedule for SchedulerPriv { fn bind(&self, task: &Task<Self>) { unsafe { - (*self.owned_tasks.get()).insert(task); + // safety: `Queues::add_task` is only safe to call from the thread + // that owns the queues (the thread the scheduler is running on). + // `Scheduler::bind` is called when polling a task that + // doesn't have a scheduler set. We will only poll new tasks from + // the thread that the scheduler is running on. Therefore, this is + // safe to call. + self.queues.add_task(task); } } fn release(&self, task: Task<Self>) { - self.pending_drop.push(task); + self.queues.release_remote(task); } fn release_local(&self, task: &Task<Self>) { unsafe { - (*self.owned_tasks.get()).remove(task); + // safety: `Scheduler::release_local` is only called from the + // thread that the scheduler is running on. The `Schedule` trait's + // contract is that releasing a task from another thread should call + // `release` rather than `release_local`. + self.queues.release_local(task); } } fn schedule(&self, task: Task<Self>) { - let is_current = ACTIVE.with(|cell| { - cell.get() == self as *const SchedulerPriv - }); + let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv); if is_current { - unsafe { self.schedule_local(task) }; + unsafe { + // safety: this function is safe to call only from the + // thread the basic scheduler is running on. If `is_current` is + // then we are on that thread. + self.queues.push_local(task) + }; } else { - let mut lock = self.remote_queue.lock().unwrap(); - - if lock.open { - lock.queue.push_back(task); - } else { - task.shutdown(); - } + let mut lock = self.queues.remote(); + lock.schedule(task); // while locked, call unpark self.unpark.unpark(); @@ -339,39 +288,30 @@ where P: Park, { fn drop(&mut self) { - // Close the remote queue - let mut lock = self.scheduler.remote_queue.lock().unwrap(); - lock.open = false; - - while let Some(task) = lock.queue.pop_front() { - task.shutdown(); - } - - drop(lock); - - // Drain all local tasks - while let Some(task) = self.scheduler.next_local_task() { - task.shutdown(); - } - - // Release owned tasks unsafe { - (*self.scheduler.owned_tasks.get()).shutdown(); - } + // safety: the `Drop` impl owns the scheduler's queues. these fields + // will only be accessed when running the scheduler, and it can no + // longer be run, since we are in the process of dropping it. - self.scheduler.drain_pending_drop(); + // Shut down the task queues. + self.scheduler.queues.shutdown(); + } // Wait until all tasks have been released. - while unsafe { !(*self.scheduler.owned_tasks.get()).is_empty() } { + while unsafe { self.scheduler.queues.has_tasks_remaining() } { self.local.park.park().ok().expect("park failed"); - self.scheduler.drain_pending_drop(); + unsafe { + self.scheduler.queues.drain_pending_drop(); + } } } } impl fmt::Debug for SchedulerPriv { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Scheduler").finish() + fmt.debug_struct("Scheduler") + .field("queues", &self.queues) + .finish() } } |