summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorEliza Weisman <eliza@buoyant.io>2019-12-04 11:20:57 -0800
committerGitHub <noreply@github.com>2019-12-04 11:20:57 -0800
commit0e729aa341028c121b9c39fe552ed4309bae6b6a (patch)
treee69cc9079cd351462f278effd424f1a34dd85ba2 /tokio
parentcbe369a3ed5c252ca7581e37986dc912d88e58c6 (diff)
task: fix infinite loop when dropping a `LocalSet` (#1892)
## Motivation There's currently an issue in `task::LocalSet` where dropping the local set can result in an infinite loop if a task running in the local set is notified from outside the local set (e.g. by a timer). This was reported in issue #1885. This issue exists because the `Drop` impl for `task::local::Scheduler` does not drain the queue of tasks notified externally, the way the basic scheduler does. Instead, only the local queue is drained, leaving some tasks in place. Since these tasks are never removed, the loop that continues trying to cancel tasks until the owned task list is totally empty continues infinitely. I think this issue was due to the `Drop` impl being written before a remote queue was added to the local scheduler, and the need to close the remote queue as well was overlooked. ## Solution This branch solves the problem by clearing the local scheduler's remote queue as well as the local one. I've added a test that reproduces the behavior. The test fails on master and passes after this change. In addition, this branch factors out the common task queue logic in the basic scheduler runtime and the `LocalSet` struct in `tokio::task`. This is because as more work was done on the `LocalSet`, it has gotten closer and closer to the basic scheduler in behavior, and factoring out the shared code reduces the risk of errors caused by `LocalSet` not doing something that the basic scheduler does. The queues are now encapsulated by a `MpscQueues` struct in `tokio::task::queue` (crate-public). As a follow-up, I'd also like to look into changing this type to use the same remote queue type as the threadpool (a linked list). In particular, I noticed the basic scheduler has a flag that indicates the remote queue has been closed, which is set when dropping the scheduler. This prevents tasks from being added after the scheduler has started shutting down, stopping a potential task leak. Rather than duplicating this code in `LocalSet`, I thought it was probably better to factor it out into a shared type. There are a few cases where there are small differences in behavior, though, so there is still a need for separate types implemented _using_ the new `MpscQueues` struct. However, it should cover most of the identical code. Note that this diff is rather large, due to the refactoring. However, the actual fix for the infinite loop is very simple. It can be reviewed on its own by looking at commit 4f46ac6. The refactor is in a separate commit, with the SHA 90b5b1f. Fixes #1885 Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/runtime/basic_scheduler.rs180
-rw-r--r--tokio/src/task/local.rs220
-rw-r--r--tokio/src/task/mod.rs2
-rw-r--r--tokio/src/task/queue.rs319
4 files changed, 485 insertions, 236 deletions
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs
index c674b961..0bce72ac 100644
--- a/tokio/src/runtime/basic_scheduler.rs
+++ b/tokio/src/runtime/basic_scheduler.rs
@@ -1,13 +1,12 @@
use crate::park::{Park, Unpark};
-use crate::task::{self, JoinHandle, Schedule, ScheduleSendOnly, Task};
+use crate::task::{self, queue::MpscQueues, JoinHandle, Schedule, ScheduleSendOnly, Task};
-use std::cell::{Cell, UnsafeCell};
-use std::collections::VecDeque;
+use std::cell::Cell;
use std::fmt;
use std::future::Future;
use std::mem::ManuallyDrop;
use std::ptr;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
use std::task::{RawWaker, RawWakerVTable, Waker};
use std::time::Duration;
@@ -31,31 +30,7 @@ pub(crate) struct Spawner {
/// The scheduler component.
pub(super) struct SchedulerPriv {
- /// List of all active tasks spawned onto this executor.
- ///
- /// # Safety
- ///
- /// Must only be accessed from the primary thread
- owned_tasks: UnsafeCell<task::OwnedList<Self>>,
-
- /// Local run queue.
- ///
- /// Tasks notified from the current thread are pushed into this queue.
- ///
- /// # Safety
- ///
- /// References should not be handed out. Only call `push` / `pop` functions.
- /// Only call from the owning thread.
- local_queue: UnsafeCell<VecDeque<Task<SchedulerPriv>>>,
-
- /// Remote run queue.
- ///
- /// Tasks notified from another thread are pushed into this queue.
- remote_queue: Mutex<RemoteQueue>,
-
- /// Tasks pending drop
- pending_drop: task::TransferStack<Self>,
-
+ queues: MpscQueues<Self>,
/// Unpark the blocked thread
unpark: Box<dyn Unpark>,
}
@@ -73,21 +48,9 @@ struct LocalState<P> {
park: P,
}
-#[derive(Debug)]
-struct RemoteQueue {
- /// FIFO list of tasks
- queue: VecDeque<Task<SchedulerPriv>>,
-
- /// `true` when a task can be pushed into the queue, false otherwise.
- open: bool,
-}
-
/// Max number of tasks to poll per tick.
const MAX_TASKS_PER_TICK: usize = 61;
-/// How often to check the remote queue first
-const CHECK_REMOTE_INTERVAL: u8 = 13;
-
thread_local! {
static ACTIVE: Cell<*const SchedulerPriv> = Cell::new(ptr::null())
}
@@ -101,13 +64,7 @@ where
BasicScheduler {
scheduler: Arc::new(SchedulerPriv {
- owned_tasks: UnsafeCell::new(task::OwnedList::new()),
- local_queue: UnsafeCell::new(VecDeque::with_capacity(64)),
- remote_queue: Mutex::new(RemoteQueue {
- queue: VecDeque::with_capacity(64),
- open: true,
- }),
- pending_drop: task::TransferStack::new(),
+ queues: MpscQueues::new(),
unpark: Box::new(unpark),
}),
local: LocalState { tick: 0, park },
@@ -155,9 +112,7 @@ where
// Track the current scheduler
let _guard = ACTIVE.with(|cell| {
- let guard = Guard {
- old: cell.get(),
- };
+ let guard = Guard { old: cell.get() };
cell.set(scheduler as *const SchedulerPriv);
@@ -188,7 +143,11 @@ where
scheduler.tick(local);
// Maintenance work
- scheduler.drain_pending_drop();
+ unsafe {
+ // safety: this function is safe to call only from the
+ // thread the basic scheduler is running on (which we are).
+ scheduler.queues.drain_pending_drop();
+ }
}
})
}
@@ -216,6 +175,8 @@ impl Spawner {
}
}
+// === impl SchedulerPriv ===
+
impl SchedulerPriv {
fn tick(&self, local: &mut LocalState<impl Park>) {
for _ in 0..MAX_TASKS_PER_TICK {
@@ -224,8 +185,14 @@ impl SchedulerPriv {
// Increment the tick
local.tick = tick.wrapping_add(1);
+ let next = unsafe {
+ // safety: this function is safe to call only from the
+ // thread the basic scheduler is running on. The `LocalState`
+ // parameter to this method implies that we are on that thread.
+ self.queues.next_task(tick)
+ };
- let task = match self.next_task(tick) {
+ let task = match next {
Some(task) => task,
None => {
local.park.park().ok().expect("failed to park");
@@ -235,7 +202,10 @@ impl SchedulerPriv {
if let Some(task) = task.run(&mut || Some(self.into())) {
unsafe {
- self.schedule_local(task);
+ // safety: this function is safe to call only from the
+ // thread the basic scheduler is running on. The `LocalState`
+ // parameter to this method implies that we are on that thread.
+ self.queues.push_local(task);
}
}
}
@@ -247,15 +217,6 @@ impl SchedulerPriv {
.expect("failed to park");
}
- fn drain_pending_drop(&self) {
- for task in self.pending_drop.drain() {
- unsafe {
- (*self.owned_tasks.get()).remove(&task);
- }
- drop(task);
- }
- }
-
/// # Safety
///
/// Must be called from the same thread that holds the `BasicScheduler`
@@ -266,63 +227,51 @@ impl SchedulerPriv {
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
- self.schedule_local(task);
+ self.queues.push_local(task);
handle
}
-
- unsafe fn schedule_local(&self, task: Task<Self>) {
- (*self.local_queue.get()).push_back(task);
- }
-
- fn next_task(&self, tick: u8) -> Option<Task<Self>> {
- if 0 == tick % CHECK_REMOTE_INTERVAL {
- self.next_remote_task().or_else(|| self.next_local_task())
- } else {
- self.next_local_task().or_else(|| self.next_remote_task())
- }
- }
-
- fn next_local_task(&self) -> Option<Task<Self>> {
- unsafe { (*self.local_queue.get()).pop_front() }
- }
-
- fn next_remote_task(&self) -> Option<Task<Self>> {
- self.remote_queue.lock().unwrap().queue.pop_front()
- }
}
impl Schedule for SchedulerPriv {
fn bind(&self, task: &Task<Self>) {
unsafe {
- (*self.owned_tasks.get()).insert(task);
+ // safety: `Queues::add_task` is only safe to call from the thread
+ // that owns the queues (the thread the scheduler is running on).
+ // `Scheduler::bind` is called when polling a task that
+ // doesn't have a scheduler set. We will only poll new tasks from
+ // the thread that the scheduler is running on. Therefore, this is
+ // safe to call.
+ self.queues.add_task(task);
}
}
fn release(&self, task: Task<Self>) {
- self.pending_drop.push(task);
+ self.queues.release_remote(task);
}
fn release_local(&self, task: &Task<Self>) {
unsafe {
- (*self.owned_tasks.get()).remove(task);
+ // safety: `Scheduler::release_local` is only called from the
+ // thread that the scheduler is running on. The `Schedule` trait's
+ // contract is that releasing a task from another thread should call
+ // `release` rather than `release_local`.
+ self.queues.release_local(task);
}
}
fn schedule(&self, task: Task<Self>) {
- let is_current = ACTIVE.with(|cell| {
- cell.get() == self as *const SchedulerPriv
- });
+ let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv);
if is_current {
- unsafe { self.schedule_local(task) };
+ unsafe {
+ // safety: this function is safe to call only from the
+ // thread the basic scheduler is running on. If `is_current` is
+ // then we are on that thread.
+ self.queues.push_local(task)
+ };
} else {
- let mut lock = self.remote_queue.lock().unwrap();
-
- if lock.open {
- lock.queue.push_back(task);
- } else {
- task.shutdown();
- }
+ let mut lock = self.queues.remote();
+ lock.schedule(task);
// while locked, call unpark
self.unpark.unpark();
@@ -339,39 +288,30 @@ where
P: Park,
{
fn drop(&mut self) {
- // Close the remote queue
- let mut lock = self.scheduler.remote_queue.lock().unwrap();
- lock.open = false;
-
- while let Some(task) = lock.queue.pop_front() {
- task.shutdown();
- }
-
- drop(lock);
-
- // Drain all local tasks
- while let Some(task) = self.scheduler.next_local_task() {
- task.shutdown();
- }
-
- // Release owned tasks
unsafe {
- (*self.scheduler.owned_tasks.get()).shutdown();
- }
+ // safety: the `Drop` impl owns the scheduler's queues. these fields
+ // will only be accessed when running the scheduler, and it can no
+ // longer be run, since we are in the process of dropping it.
- self.scheduler.drain_pending_drop();
+ // Shut down the task queues.
+ self.scheduler.queues.shutdown();
+ }
// Wait until all tasks have been released.
- while unsafe { !(*self.scheduler.owned_tasks.get()).is_empty() } {
+ while unsafe { self.scheduler.queues.has_tasks_remaining() } {
self.local.park.park().ok().expect("park failed");
- self.scheduler.drain_pending_drop();
+ unsafe {
+ self.scheduler.queues.drain_pending_drop();
+ }
}
}
}
impl fmt::Debug for SchedulerPriv {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Scheduler").finish()
+ fmt.debug_struct("Scheduler")
+ .field("queues", &self.queues)
+ .finish()
}
}
diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs
index f81522c9..aed5105d 100644
--- a/tokio/src/task/local.rs
+++ b/tokio/src/task/local.rs
@@ -1,15 +1,12 @@
//! Runs `!Send` futures on the current thread.
use crate::sync::AtomicWaker;
-use crate::task::{self, JoinHandle, Schedule, Task, TransferStack};
+use crate::task::{self, queue::MpscQueues, JoinHandle, Schedule, Task};
-use std::cell::{Cell, UnsafeCell};
-use std::collections::VecDeque;
-use std::fmt;
+use std::cell::Cell;
use std::future::Future;
use std::pin::Pin;
use std::ptr::{self, NonNull};
use std::rc::Rc;
-use std::sync::Mutex;
use std::task::{Context, Poll};
use pin_project_lite::pin_project;
@@ -82,33 +79,12 @@ cfg_rt_util! {
scheduler: Rc<Scheduler>,
}
}
-struct Scheduler {
- /// List of all active tasks spawned onto this executor.
- ///
- /// # Safety
- ///
- /// Must only be accessed from the primary thread
- tasks: UnsafeCell<task::OwnedList<Scheduler>>,
-
- /// Local run local_queue.
- ///
- /// Tasks notified from the current thread are pushed into this queue.
- ///
- /// # Safety
- ///
- /// References should not be handed out. Only call `push` / `pop` functions.
- /// Only call from the owning thread.
- local_queue: UnsafeCell<VecDeque<Task<Scheduler>>>,
+#[derive(Debug)]
+struct Scheduler {
tick: Cell<u8>,
- /// Remote run queue.
- ///
- /// Tasks notified from another thread are pushed into this queue.
- remote_queue: Mutex<VecDeque<Task<Scheduler>>>,
-
- /// Tasks pending drop
- pending_drop: TransferStack<Self>,
+ queues: MpscQueues<Self>,
/// Used to notify the `LocalFuture` when a task in the local task set is
/// notified.
@@ -166,12 +142,17 @@ cfg_rt_util! {
CURRENT_TASK_SET.with(|current| {
let current = current
.get()
- .expect("`spawn_local` called from outside of a local::LocalSet!");
+ .expect("`spawn_local` called from outside of a task::LocalSet!");
+ let (task, handle) = task::joinable_local(future);
unsafe {
- let (task, handle) = task::joinable_local(future);
- current.as_ref().schedule_local(task);
- handle
+ // safety: this function is unsafe to call outside of the local
+ // thread. Since the call above to get the current task set
+ // would not succeed if we were outside of a local set, this is
+ // safe.
+ current.as_ref().queues.push_local(task);
}
+
+ handle
})
}
}
@@ -179,9 +160,6 @@ cfg_rt_util! {
/// Max number of tasks to poll per tick.
const MAX_TASKS_PER_TICK: usize = 61;
-/// How often to check the remote queue first
-const CHECK_REMOTE_INTERVAL: u8 = 13;
-
impl LocalSet {
/// Returns a new local task set.
pub fn new() -> Self {
@@ -232,9 +210,9 @@ impl LocalSet {
{
let (task, handle) = task::joinable_local(future);
unsafe {
- // This is safe: since `LocalSet` is not Send or Sync, this is
+ // safety: since `LocalSet` is not Send or Sync, this is
// always being called from the local thread.
- self.scheduler.schedule_local(task);
+ self.scheduler.queues.push_local(task);
}
handle
}
@@ -341,31 +319,32 @@ impl Schedule for Scheduler {
fn bind(&self, task: &Task<Self>) {
assert!(self.is_current());
unsafe {
- (*self.tasks.get()).insert(task);
+ self.queues.add_task(task);
}
}
fn release(&self, task: Task<Self>) {
// This will be called when dropping the local runtime.
- self.pending_drop.push(task);
+ self.queues.release_remote(task);
}
fn release_local(&self, task: &Task<Self>) {
debug_assert!(self.is_current());
unsafe {
- (*self.tasks.get()).remove(task);
+ self.queues.release_local(task);
}
}
fn schedule(&self, task: Task<Self>) {
if self.is_current() {
- unsafe {
- self.schedule_local(task);
- }
+ unsafe { self.queues.push_local(task) };
} else {
- self.remote_queue.lock().unwrap().push_back(task);
+ let mut lock = self.queues.remote();
+ lock.schedule(task);
self.waker.wake();
+
+ drop(lock);
}
}
}
@@ -373,11 +352,8 @@ impl Schedule for Scheduler {
impl Scheduler {
fn new() -> Self {
Self {
- tasks: UnsafeCell::new(task::OwnedList::new()),
- local_queue: UnsafeCell::new(VecDeque::with_capacity(64)),
tick: Cell::new(0),
- pending_drop: TransferStack::new(),
- remote_queue: Mutex::new(VecDeque::with_capacity(64)),
+ queues: MpscQueues::new(),
waker: AtomicWaker::new(),
}
}
@@ -401,10 +377,6 @@ impl Scheduler {
})
}
- unsafe fn schedule_local(&self, task: Task<Self>) {
- (*self.local_queue.get()).push_back(task);
- }
-
fn is_current(&self) -> bool {
CURRENT_TASK_SET
.try_with(|current| {
@@ -416,88 +388,46 @@ impl Scheduler {
.unwrap_or(false)
}
- fn next_task(&self, tick: u8) -> Option<Task<Self>> {
- if 0 == tick % CHECK_REMOTE_INTERVAL {
- self.next_remote_task().or_else(|| self.next_local_task())
- } else {
- self.next_local_task().or_else(|| self.next_remote_task())
- }
- }
-
- fn next_local_task(&self) -> Option<Task<Self>> {
- unsafe { (*self.local_queue.get()).pop_front() }
- }
-
- fn next_remote_task(&self) -> Option<Task<Self>> {
- // there is no semantic information in the `PoisonError`, and it
- // doesn't implement `Debug`, but clippy thinks that it's bad to
- // match all errors here...
- #[allow(clippy::match_wild_err_arm)]
- let mut lock = match self.remote_queue.lock() {
- // If the lock is poisoned, but the thread is already panicking,
- // avoid a double panic. This is necessary since `next_task` (which
- // calls `next_remote_task`) can be called in the `Drop` impl.
- Err(_) if std::thread::panicking() => return None,
- Err(_) => panic!("mutex poisoned"),
- Ok(lock) => lock,
- };
- lock.pop_front()
- }
-
fn tick(&self) {
assert!(self.is_current());
for _ in 0..MAX_TASKS_PER_TICK {
let tick = self.tick.get().wrapping_add(1);
self.tick.set(tick);
- let task = match self.next_task(tick) {
+
+ let task = match unsafe {
+ // safety: we must be on the local thread to call this. The assertion
+ // the top of this method ensures that `tick` is only called locally.
+ self.queues.next_task(tick)
+ } {
Some(task) => task,
None => return,
};
if let Some(task) = task.run(&mut || Some(self.into())) {
unsafe {
- // we are on the local thread, so this is okay.
- self.schedule_local(task);
+ // safety: we must be on the local thread to call this. The
+ // the top of this method ensures that `tick` is only called locally.
+ self.queues.push_local(task);
}
}
}
}
-
- fn drain_pending_drop(&self) {
- for task in self.pending_drop.drain() {
- unsafe {
- (*self.tasks.get()).remove(&task);
- }
- drop(task);
- }
- }
-}
-
-impl fmt::Debug for Scheduler {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Scheduler { .. }").finish()
- }
}
impl Drop for Scheduler {
fn drop(&mut self) {
- // Drain all local tasks
- while let Some(task) = self.next_local_task() {
- task.shutdown();
- }
-
- // Release owned tasks
unsafe {
- (*self.tasks.get()).shutdown();
- }
-
- self.drain_pending_drop();
-
- // Wait until all tasks have been released.
- // XXX: this is a busy loop, but we don't really have any way to park
- // the thread here?
- while unsafe { !(*self.tasks.get()).is_empty() } {
- self.drain_pending_drop();
+ // safety: these functions are unsafe to call outside of the local
+ // thread. Since the `Scheduler` type is not `Send` or `Sync`, we
+ // know it will be dropped only from the local thread.
+ self.queues.shutdown();
+
+ // Wait until all tasks have been released.
+ // XXX: this is a busy loop, but we don't really have any way to park
+ // the thread here?
+ while self.queues.has_tasks_remaining() {
+ self.queues.drain_pending_drop();
+ }
}
}
}
@@ -506,6 +436,7 @@ impl Drop for Scheduler {
mod tests {
use super::*;
use crate::{runtime, task};
+ use std::time::Duration;
#[test]
fn local_current_thread() {
@@ -729,7 +660,6 @@ mod tests {
fn drop_cancels_tasks() {
// This test reproduces issue #1842
use crate::sync::oneshot;
- use std::time::Duration;
let mut rt = runtime::Builder::new()
.enable_time()
@@ -753,4 +683,62 @@ mod tests {
drop(local);
drop(rt);
}
+
+ #[test]
+ fn drop_cancels_remote_tasks() {
+ // This test reproduces issue #1885.
+ use std::sync::mpsc::RecvTimeoutError;
+
+ let (done_tx, done_rx) = std::sync::mpsc::channel();
+ let thread = std::thread::spawn(move || {
+ let (tx, mut rx) = crate::sync::mpsc::channel::<()>(1024);
+
+ let mut rt = runtime::Builder::new()
+ .enable_time()
+ .basic_scheduler()
+ .build()
+ .expect("building runtime should succeed");
+
+ let local = LocalSet::new();
+ local.spawn_local(async move { while let Some(_) = rx.recv().await {} });
+ local.block_on(&mut rt, async {
+ crate::time::delay_for(Duration::from_millis(1)).await;
+ });
+
+ drop(tx);
+
+ // This enters an infinite loop if the remote notified tasks are not
+ // properly cancelled.
+ drop(local);
+
+ // Send a message on the channel so that the test thread can
+ // determine if we have entered an infinite loop:
+ done_tx.send(()).unwrap();
+ });
+
+ // Since the failure mode of this test is an infinite loop, rather than
+ // something we can easily make assertions about, we'll run it in a
+ // thread. When the test thread finishes, it will send a message on a
+ // channel to this thread. We'll wait for that message with a fairly
+ // generous timeout, and if we don't recieve it, we assume the test
+ // thread has hung.
+ //
+ // Note that it should definitely complete in under a minute, but just
+ // in case CI is slow, we'll give it a long timeout.
+ match done_rx.recv_timeout(Duration::from_secs(60)) {
+ Err(RecvTimeoutError::Timeout) => panic!(
+ "test did not complete within 60 seconds, \
+ we have (probably) entered an infinite loop!"
+ ),
+ // Did the test thread panic? We'll find out for sure when we `join`
+ // with it.
+ Err(RecvTimeoutError::Disconnected) => {
+ println!("done_rx dropped, did the test thread panic?");
+ }
+ // Test completed successfully!
+ Ok(()) => {}
+ }
+
+ thread.join().expect("test thread should not panic!")
+ }
}
diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs
index e336ec9c..f762a561 100644
--- a/tokio/src/task/mod.rs
+++ b/tokio/src/task/mod.rs
@@ -234,6 +234,8 @@ cfg_rt_core! {
mod list;
pub(crate) use self::list::OwnedList;
+ pub(crate) mod queue;
+
mod raw;
use self::raw::RawTask;
diff --git a/tokio/src/task/queue.rs b/tokio/src/task/queue.rs
new file mode 100644
index 00000000..6a004fc7
--- /dev/null
+++ b/tokio/src/task/queue.rs
@@ -0,0 +1,319 @@
+use super::{OwnedList, Schedule, Task, TransferStack};
+use std::{
+ cell::UnsafeCell,
+ collections::VecDeque,
+ fmt,
+ sync::{Mutex, MutexGuard},
+};
+
+/// A set of multi-producer, single consumer task queues, suitable for use by a
+/// single-threaded scheduler.
+///
+/// This consists of a list of _all_ tasks bound to the scheduler, a run queue
+/// of tasks notified from the thread the scheduler is running on (the "local
+/// queue"), a run queue of tasks notified from another thread (the "remote
+/// queue"), and a stack of tasks released from other threads which will
+/// eventually need to be dropped by the scheduler on its own thread ("pending
+/// drop").
+///
+/// Submitting tasks to or popping tasks from the local queue is unsafe, as it
+/// must only be performed on the same thread as the scheduler.
+pub(crate) struct MpscQueues<S: 'static> {
+ /// List of all active tasks spawned onto this executor.
+ ///
+ /// # Safety
+ ///
+ /// Must only be accessed from the primary thread
+ owned_tasks: UnsafeCell<OwnedList<S>>,
+
+ /// Local run queue.
+ ///
+ /// Tasks notified from the current thread are pushed into this queue.
+ ///
+ /// # Safety
+ ///
+ /// References should not be handed out. Only call `push` / `pop` functions.
+ /// Only call from the owning thread.
+ local_queue: UnsafeCell<VecDeque<Task<S>>>,
+
+ /// Remote run queue.
+ ///
+ /// Tasks notified from another thread are pushed into this queue.
+ remote_queue: Mutex<RemoteQueue<S>>,
+
+ /// Tasks pending drop
+ pending_drop: TransferStack<S>,
+}
+
+pub(crate) struct RemoteQueue<S: 'static> {
+ /// FIFO list of tasks
+ queue: VecDeque<Task<S>>,
+
+ /// `true` when a task can be pushed into the queue, false otherwise.
+ open: bool,
+}
+
+// === impl Queues ===
+
+impl<S> MpscQueues<S>
+where
+ S: Schedule + 'static,
+{
+ pub(crate) const INITIAL_CAPACITY: usize = 64;
+
+ /// How often to check the remote queue first
+ pub(crate) const CHECK_REMOTE_INTERVAL: u8 = 13;
+
+ pub(crate) fn new() -> Self {
+ Self {
+ owned_tasks: UnsafeCell::new(OwnedList::new()),
+ local_queue: UnsafeCell::new(VecDeque::with_capacity(Self::INITIAL_CAPACITY)),
+ pending_drop: TransferStack::new(),
+ remote_queue: Mutex::new(RemoteQueue {
+ queue: VecDeque::with_capacity(Self::INITIAL_CAPACITY),
+ open: true,
+ }),
+ }
+ }
+
+ /// Add a new task to the scheduler.
+ ///
+ /// # Safety
+ ///
+ /// This *must* be called only from the thread that owns the scheduler.
+ pub(crate) unsafe fn add_task(&self, task: &Task<S>) {
+ (*self.owned_tasks.get()).insert(task);
+ }
+
+ /// Push a task to the local queue.
+ ///
+ /// # Safety
+ ///
+ /// This *must* be called only from the thread that owns the scheduler.
+ pub(crate) unsafe fn push_local(&self, task: Task<S>) {
+ (*self.local_queue.get()).push_back(task);
+ }
+
+ /// Remove a task from the local queue.
+ ///
+ /// # Safety
+ ///
+ /// This *must* be called only from the thread that owns the scheduler.
+ pub(crate) unsafe fn release_local(&self, task: &Task<S>) {
+ (*self.owned_tasks.get()).remove(task);
+ }
+
+ /// Lock the remote queue, returning a `MutexGuard`.
+ ///
+ /// This can be used to push to the remote queue and perform other
+ /// operations while holding the lock.
+ ///
+ /// # Panics
+ ///
+ /// If the remote queue mutex is poisoned.
+ pub(crate) fn remote(&self) -> MutexGuard<'_, RemoteQueue<S>> {
+ self.remote_queue
+ .lock()
+ .expect("failed to lock remote queue")
+ }
+
+ /// Release a task from outside of the thread that owns the scheduler.
+ ///
+ /// This simply pushes the task to the pending drop queue.
+ pub(crate) fn release_remote(&self, task: Task<S>) {
+ self.pending_drop.push(task);
+ }
+
+ /// Returns the next task from the remote *or* local queue.
+ ///
+ /// Typically, this checks the local queue before the remote queue, and only
+ /// checks the remote queue if the local queue is empty. However, to avoid
+ /// starving the remote queue, it is checked first every
+ /// `CHECK_REMOTE_INTERVAL` ticks.
+ ///
+ /// # Safety
+ ///
+ /// This *must* be called only from the thread that owns the scheduler.
+ pub(crate) unsafe fn next_task(&self, tick: u8) -> Option<Task<S>> {
+ if 0 == tick % Self::CHECK_REMOTE_INTERVAL {
+ self.next_remote_task().or_else(|| self.next_local_task())
+ } else {
+ self.next_local_task().or_else(|| self.next_remote_task())
+ }
+ }
+
+ /// Returns the next task from the local queue.
+ ///
+ /// # Safety
+ ///
+ /// This *must* be called only from the thread that owns the scheduler.
+ pub(crate) unsafe fn next_local_task(&self) -> Option<Task<S>> {
+ (*self.local_queue.get()).pop_front()
+ }
+
+ /// Returns the next task from the remote queue.
+ ///
+ /// # Panics
+ ///
+ /// If the mutex around the remote queue is poisoned _and_ the current
+ /// thread is not already panicking. This is safe to call in a `Drop` impl.
+ pub(crate) fn next_remote_task(&self) -> Option<Task<S>> {
+ // there is no semantic information in the `PoisonError`, and it
+ // doesn't implement `Debug`, but clippy thinks that it's bad to
+ // match all errors here...
+ #[allow(clippy::match_wild_err_arm)]
+ let mut lock = match self.remote_queue.lock() {
+ // If the lock is poisoned, but the thread is already panicking,
+ // avoid a double panic. This is necessary since `next_task` (which
+ // calls `next_remote_task`) can be called in the `Drop` impl.
+ Err(_) if std::thread::panicking() => return None,
+ Err(_) => panic!("mutex poisoned"),
+ Ok(lock) => lock,
+ };
+ lock.queue.pop_front()
+ }
+
+ /// Returns true if any owned tasks are still bound to this scheduler.
+ ///
+ /// # Safety
+ ///
+ /// This *must* be called only from the thread that owns the scheduler.
+ pub(crate) unsafe fn has_tasks_remaining(&self) -> bool {
+ !(*self.owned_tasks.get()).is_empty()
+ }
+
+ /// Drain any tasks that have previously been released from other threads.
+ ///
+ /// # Safety
+ ///
+ /// This *must* be called only from the thread that owns the scheduler.
+ pub(crate) unsafe fn drain_pending_drop(&self) {
+ for task in self.pending_drop.drain() {
+ (*self.owned_tasks.get()).remove(&task);
+ drop(task);
+ }
+ }
+
+ /// Shut down the queues.
+ ///
+ /// This performs the following operations:
+ ///
+ /// 1. Close the remote queue (so that it will no longer accept new tasks).
+ /// 2. Drain the remote queue and shut down all tasks.
+ /// 3. Drain the local queue and shut down all tasks.
+ /// 4. Shut down the owned task list.
+ /// 5. Drain the list of tasks dropped externally and remove them from the
+ /// owned task list.
+ ///
+ /// This method should be called before dropping a `Queues`. It is provided
+ /// as a method rather than a `Drop` impl because types that own a `Queues`
+ /// wish to perform other work in their `Drop` implementations _after_
+ /// shutting down the task queues.
+ ///
+ /// # Safety
+ ///
+ /// This method accesses the local task queue, and therefore *must* be
+ /// called only from the thread that owns the scheduler.
+ ///
+ /// # Panics
+ ///
+ /// If the mutex around the remote queue is poisoned _and_ the current
+ /// thread is not already panicking. This is safe to call in a `Drop` impl.
+ pub(crate) unsafe fn shutdown(&self) {
+ // Close and drain the remote queue.
+ self.close_remote();
+
+ // Drain the local queue.
+ self.close_local();
+
+ // Release owned tasks
+ self.shutdown_owned_tasks();
+
+ // Drain tasks pending drop.
+ self.drain_pending_drop();
+ }
+
+ /// Shut down the scheduler's owned task list.
+ ///
+ /// # Safety
+ ///
+ /// This *must* be called only from the thread that owns the scheduler.
+ unsafe fn shutdown_owned_tasks(&self) {
+ (*self.owned_tasks.get()).shutdown();
+ }
+
+ /// Drain the remote queue, and shut down its tasks.
+ ///
+ /// This closes the remote queue. Any additional tasks added to it will be
+ /// shut down instead.
+ ///
+ /// # Panics
+ /// If the mutex around the remote queue is poisoned _and_ the current
+ /// thread is not already panicking. This is safe to call in a `Drop` impl.
+ fn close_remote(&self) {
+ #[allow(clippy::match_wild_err_arm)]
+ let mut lock = match self.remote_queue.lock() {
+ // If the lock is poisoned, but the thread is already panicking,
+ // avoid a double panic. This is necessary since this fn can be
+ // called in a drop impl.
+ Err(_) if std::thread::panicking() => return,
+ Err(_) => panic!("mutex poisoned"),
+ Ok(lock) => lock,
+ };
+ lock.open = false;
+
+ while let Some(task) = lock.queue.pop_front() {
+ task.shutdown();
+ }
+ }
+
+ /// Drain the local queue, and shut down its tasks.
+ ///
+ /// # Safety
+ ///
+ /// This *must* be called only from the thread that owns the scheduler.
+ unsafe fn close_local(&self) {
+ while let Some(task) = self.next_local_task() {
+ task.shutdown();
+ }
+ }
+}
+
+impl<S> fmt::Debug for MpscQueues<S> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("MpscQueues")