summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/basic_scheduler.rs
diff options
context:
space:
mode:
authorEliza Weisman <eliza@buoyant.io>2019-12-04 11:20:57 -0800
committerGitHub <noreply@github.com>2019-12-04 11:20:57 -0800
commit0e729aa341028c121b9c39fe552ed4309bae6b6a (patch)
treee69cc9079cd351462f278effd424f1a34dd85ba2 /tokio/src/runtime/basic_scheduler.rs
parentcbe369a3ed5c252ca7581e37986dc912d88e58c6 (diff)
task: fix infinite loop when dropping a `LocalSet` (#1892)
## Motivation There's currently an issue in `task::LocalSet` where dropping the local set can result in an infinite loop if a task running in the local set is notified from outside the local set (e.g. by a timer). This was reported in issue #1885. This issue exists because the `Drop` impl for `task::local::Scheduler` does not drain the queue of tasks notified externally, the way the basic scheduler does. Instead, only the local queue is drained, leaving some tasks in place. Since these tasks are never removed, the loop that continues trying to cancel tasks until the owned task list is totally empty continues infinitely. I think this issue was due to the `Drop` impl being written before a remote queue was added to the local scheduler, and the need to close the remote queue as well was overlooked. ## Solution This branch solves the problem by clearing the local scheduler's remote queue as well as the local one. I've added a test that reproduces the behavior. The test fails on master and passes after this change. In addition, this branch factors out the common task queue logic in the basic scheduler runtime and the `LocalSet` struct in `tokio::task`. This is because as more work was done on the `LocalSet`, it has gotten closer and closer to the basic scheduler in behavior, and factoring out the shared code reduces the risk of errors caused by `LocalSet` not doing something that the basic scheduler does. The queues are now encapsulated by a `MpscQueues` struct in `tokio::task::queue` (crate-public). As a follow-up, I'd also like to look into changing this type to use the same remote queue type as the threadpool (a linked list). In particular, I noticed the basic scheduler has a flag that indicates the remote queue has been closed, which is set when dropping the scheduler. This prevents tasks from being added after the scheduler has started shutting down, stopping a potential task leak. Rather than duplicating this code in `LocalSet`, I thought it was probably better to factor it out into a shared type. There are a few cases where there are small differences in behavior, though, so there is still a need for separate types implemented _using_ the new `MpscQueues` struct. However, it should cover most of the identical code. Note that this diff is rather large, due to the refactoring. However, the actual fix for the infinite loop is very simple. It can be reviewed on its own by looking at commit 4f46ac6. The refactor is in a separate commit, with the SHA 90b5b1f. Fixes #1885 Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Diffstat (limited to 'tokio/src/runtime/basic_scheduler.rs')
-rw-r--r--tokio/src/runtime/basic_scheduler.rs180
1 files changed, 60 insertions, 120 deletions
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs
index c674b961..0bce72ac 100644
--- a/tokio/src/runtime/basic_scheduler.rs
+++ b/tokio/src/runtime/basic_scheduler.rs
@@ -1,13 +1,12 @@
use crate::park::{Park, Unpark};
-use crate::task::{self, JoinHandle, Schedule, ScheduleSendOnly, Task};
+use crate::task::{self, queue::MpscQueues, JoinHandle, Schedule, ScheduleSendOnly, Task};
-use std::cell::{Cell, UnsafeCell};
-use std::collections::VecDeque;
+use std::cell::Cell;
use std::fmt;
use std::future::Future;
use std::mem::ManuallyDrop;
use std::ptr;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
use std::task::{RawWaker, RawWakerVTable, Waker};
use std::time::Duration;
@@ -31,31 +30,7 @@ pub(crate) struct Spawner {
/// The scheduler component.
pub(super) struct SchedulerPriv {
- /// List of all active tasks spawned onto this executor.
- ///
- /// # Safety
- ///
- /// Must only be accessed from the primary thread
- owned_tasks: UnsafeCell<task::OwnedList<Self>>,
-
- /// Local run queue.
- ///
- /// Tasks notified from the current thread are pushed into this queue.
- ///
- /// # Safety
- ///
- /// References should not be handed out. Only call `push` / `pop` functions.
- /// Only call from the owning thread.
- local_queue: UnsafeCell<VecDeque<Task<SchedulerPriv>>>,
-
- /// Remote run queue.
- ///
- /// Tasks notified from another thread are pushed into this queue.
- remote_queue: Mutex<RemoteQueue>,
-
- /// Tasks pending drop
- pending_drop: task::TransferStack<Self>,
-
+ queues: MpscQueues<Self>,
/// Unpark the blocked thread
unpark: Box<dyn Unpark>,
}
@@ -73,21 +48,9 @@ struct LocalState<P> {
park: P,
}
-#[derive(Debug)]
-struct RemoteQueue {
- /// FIFO list of tasks
- queue: VecDeque<Task<SchedulerPriv>>,
-
- /// `true` when a task can be pushed into the queue, false otherwise.
- open: bool,
-}
-
/// Max number of tasks to poll per tick.
const MAX_TASKS_PER_TICK: usize = 61;
-/// How often to check the remote queue first
-const CHECK_REMOTE_INTERVAL: u8 = 13;
-
thread_local! {
static ACTIVE: Cell<*const SchedulerPriv> = Cell::new(ptr::null())
}
@@ -101,13 +64,7 @@ where
BasicScheduler {
scheduler: Arc::new(SchedulerPriv {
- owned_tasks: UnsafeCell::new(task::OwnedList::new()),
- local_queue: UnsafeCell::new(VecDeque::with_capacity(64)),
- remote_queue: Mutex::new(RemoteQueue {
- queue: VecDeque::with_capacity(64),
- open: true,
- }),
- pending_drop: task::TransferStack::new(),
+ queues: MpscQueues::new(),
unpark: Box::new(unpark),
}),
local: LocalState { tick: 0, park },
@@ -155,9 +112,7 @@ where
// Track the current scheduler
let _guard = ACTIVE.with(|cell| {
- let guard = Guard {
- old: cell.get(),
- };
+ let guard = Guard { old: cell.get() };
cell.set(scheduler as *const SchedulerPriv);
@@ -188,7 +143,11 @@ where
scheduler.tick(local);
// Maintenance work
- scheduler.drain_pending_drop();
+ unsafe {
+ // safety: this function is safe to call only from the
+ // thread the basic scheduler is running on (which we are).
+ scheduler.queues.drain_pending_drop();
+ }
}
})
}
@@ -216,6 +175,8 @@ impl Spawner {
}
}
+// === impl SchedulerPriv ===
+
impl SchedulerPriv {
fn tick(&self, local: &mut LocalState<impl Park>) {
for _ in 0..MAX_TASKS_PER_TICK {
@@ -224,8 +185,14 @@ impl SchedulerPriv {
// Increment the tick
local.tick = tick.wrapping_add(1);
+ let next = unsafe {
+ // safety: this function is safe to call only from the
+ // thread the basic scheduler is running on. The `LocalState`
+ // parameter to this method implies that we are on that thread.
+ self.queues.next_task(tick)
+ };
- let task = match self.next_task(tick) {
+ let task = match next {
Some(task) => task,
None => {
local.park.park().ok().expect("failed to park");
@@ -235,7 +202,10 @@ impl SchedulerPriv {
if let Some(task) = task.run(&mut || Some(self.into())) {
unsafe {
- self.schedule_local(task);
+ // safety: this function is safe to call only from the
+ // thread the basic scheduler is running on. The `LocalState`
+ // parameter to this method implies that we are on that thread.
+ self.queues.push_local(task);
}
}
}
@@ -247,15 +217,6 @@ impl SchedulerPriv {
.expect("failed to park");
}
- fn drain_pending_drop(&self) {
- for task in self.pending_drop.drain() {
- unsafe {
- (*self.owned_tasks.get()).remove(&task);
- }
- drop(task);
- }
- }
-
/// # Safety
///
/// Must be called from the same thread that holds the `BasicScheduler`
@@ -266,63 +227,51 @@ impl SchedulerPriv {
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
- self.schedule_local(task);
+ self.queues.push_local(task);
handle
}
-
- unsafe fn schedule_local(&self, task: Task<Self>) {
- (*self.local_queue.get()).push_back(task);
- }
-
- fn next_task(&self, tick: u8) -> Option<Task<Self>> {
- if 0 == tick % CHECK_REMOTE_INTERVAL {
- self.next_remote_task().or_else(|| self.next_local_task())
- } else {
- self.next_local_task().or_else(|| self.next_remote_task())
- }
- }
-
- fn next_local_task(&self) -> Option<Task<Self>> {
- unsafe { (*self.local_queue.get()).pop_front() }
- }
-
- fn next_remote_task(&self) -> Option<Task<Self>> {
- self.remote_queue.lock().unwrap().queue.pop_front()
- }
}
impl Schedule for SchedulerPriv {
fn bind(&self, task: &Task<Self>) {
unsafe {
- (*self.owned_tasks.get()).insert(task);
+ // safety: `Queues::add_task` is only safe to call from the thread
+ // that owns the queues (the thread the scheduler is running on).
+ // `Scheduler::bind` is called when polling a task that
+ // doesn't have a scheduler set. We will only poll new tasks from
+ // the thread that the scheduler is running on. Therefore, this is
+ // safe to call.
+ self.queues.add_task(task);
}
}
fn release(&self, task: Task<Self>) {
- self.pending_drop.push(task);
+ self.queues.release_remote(task);
}
fn release_local(&self, task: &Task<Self>) {
unsafe {
- (*self.owned_tasks.get()).remove(task);
+ // safety: `Scheduler::release_local` is only called from the
+ // thread that the scheduler is running on. The `Schedule` trait's
+ // contract is that releasing a task from another thread should call
+ // `release` rather than `release_local`.
+ self.queues.release_local(task);
}
}
fn schedule(&self, task: Task<Self>) {
- let is_current = ACTIVE.with(|cell| {
- cell.get() == self as *const SchedulerPriv
- });
+ let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv);
if is_current {
- unsafe { self.schedule_local(task) };
+ unsafe {
+ // safety: this function is safe to call only from the
+ // thread the basic scheduler is running on. If `is_current` is
+ // then we are on that thread.
+ self.queues.push_local(task)
+ };
} else {
- let mut lock = self.remote_queue.lock().unwrap();
-
- if lock.open {
- lock.queue.push_back(task);
- } else {
- task.shutdown();
- }
+ let mut lock = self.queues.remote();
+ lock.schedule(task);
// while locked, call unpark
self.unpark.unpark();
@@ -339,39 +288,30 @@ where
P: Park,
{
fn drop(&mut self) {
- // Close the remote queue
- let mut lock = self.scheduler.remote_queue.lock().unwrap();
- lock.open = false;
-
- while let Some(task) = lock.queue.pop_front() {
- task.shutdown();
- }
-
- drop(lock);
-
- // Drain all local tasks
- while let Some(task) = self.scheduler.next_local_task() {
- task.shutdown();
- }
-
- // Release owned tasks
unsafe {
- (*self.scheduler.owned_tasks.get()).shutdown();
- }
+ // safety: the `Drop` impl owns the scheduler's queues. these fields
+ // will only be accessed when running the scheduler, and it can no
+ // longer be run, since we are in the process of dropping it.
- self.scheduler.drain_pending_drop();
+ // Shut down the task queues.
+ self.scheduler.queues.shutdown();
+ }
// Wait until all tasks have been released.
- while unsafe { !(*self.scheduler.owned_tasks.get()).is_empty() } {
+ while unsafe { self.scheduler.queues.has_tasks_remaining() } {
self.local.park.park().ok().expect("park failed");
- self.scheduler.drain_pending_drop();
+ unsafe {
+ self.scheduler.queues.drain_pending_drop();
+ }
}
}
}
impl fmt::Debug for SchedulerPriv {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Scheduler").finish()
+ fmt.debug_struct("Scheduler")
+ .field("queues", &self.queues)
+ .finish()
}
}