summaryrefslogtreecommitdiffstats
path: root/tokio-threadpool
diff options
context:
space:
mode:
authorStjepan Glavina <stjepang@gmail.com>2018-12-28 20:34:54 +0100
committerToby Lawrence <tobz@users.noreply.github.com>2018-12-28 14:34:54 -0500
commitfdf4aba6219fd5a30097a3a38231aa51691c4ed7 (patch)
tree8f9f3cda970b128b24f5d0b60238cc52b14b9f62 /tokio-threadpool
parent201b6ce53a0f2fe7cbba50802253fda33e6e3bd5 (diff)
threadpool: introduce a global task queue (#798)
Diffstat (limited to 'tokio-threadpool')
-rw-r--r--tokio-threadpool/Cargo.toml1
-rw-r--r--tokio-threadpool/src/builder.rs6
-rw-r--r--tokio-threadpool/src/lib.rs1
-rw-r--r--tokio-threadpool/src/pool/mod.rs98
-rw-r--r--tokio-threadpool/src/sender.rs5
-rw-r--r--tokio-threadpool/src/shutdown.rs8
-rw-r--r--tokio-threadpool/src/task/mod.rs8
-rw-r--r--tokio-threadpool/src/task/queue.rs113
-rw-r--r--tokio-threadpool/src/worker/entry.rs43
-rw-r--r--tokio-threadpool/src/worker/mod.rs95
10 files changed, 90 insertions, 288 deletions
diff --git a/tokio-threadpool/Cargo.toml b/tokio-threadpool/Cargo.toml
index 21875ea1..f007b8ac 100644
--- a/tokio-threadpool/Cargo.toml
+++ b/tokio-threadpool/Cargo.toml
@@ -20,6 +20,7 @@ categories = ["concurrency", "asynchronous"]
[dependencies]
tokio-executor = { version = "0.1.2", path = "../tokio-executor" }
futures = "0.1.19"
+crossbeam-channel = "0.3.3"
crossbeam-deque = "0.6.1"
crossbeam-utils = "0.6.0"
num_cpus = "1.2"
diff --git a/tokio-threadpool/src/builder.rs b/tokio-threadpool/src/builder.rs
index 05af1820..9c49135e 100644
--- a/tokio-threadpool/src/builder.rs
+++ b/tokio-threadpool/src/builder.rs
@@ -3,6 +3,7 @@ use config::{Config, MAX_WORKERS};
use park::{BoxPark, BoxedPark, DefaultPark};
use shutdown::ShutdownTrigger;
use pool::{Pool, MAX_BACKUP};
+use task::Queue;
use thread_pool::ThreadPool;
use worker::{self, Worker, WorkerId};
@@ -413,11 +414,13 @@ impl Builder {
workers.into()
};
+ let queue = Arc::new(Queue::new());
+
// Create a trigger that will clean up resources on shutdown.
//
// The `Pool` contains a weak reference to it, while `Worker`s and the `ThreadPool` contain
// strong references.
- let trigger = Arc::new(ShutdownTrigger::new(workers.clone()));
+ let trigger = Arc::new(ShutdownTrigger::new(workers.clone(), queue.clone()));
// Create the pool
let pool = Arc::new(Pool::new(
@@ -425,6 +428,7 @@ impl Builder {
Arc::downgrade(&trigger),
self.max_blocking,
self.config.clone(),
+ queue,
));
ThreadPool::new2(pool, trigger)
diff --git a/tokio-threadpool/src/lib.rs b/tokio-threadpool/src/lib.rs
index 9fe53116..8f6f82d1 100644
--- a/tokio-threadpool/src/lib.rs
+++ b/tokio-threadpool/src/lib.rs
@@ -79,6 +79,7 @@
extern crate tokio_executor;
+extern crate crossbeam_channel;
extern crate crossbeam_deque as deque;
extern crate crossbeam_utils;
#[macro_use]
diff --git a/tokio-threadpool/src/pool/mod.rs b/tokio-threadpool/src/pool/mod.rs
index 52dbf805..eb48e349 100644
--- a/tokio-threadpool/src/pool/mod.rs
+++ b/tokio-threadpool/src/pool/mod.rs
@@ -15,7 +15,7 @@ use self::backup_stack::BackupStack;
use config::Config;
use shutdown::ShutdownTrigger;
-use task::{Task, Blocking};
+use task::{Blocking, Queue, Task};
use worker::{self, Worker, WorkerId};
use futures::Poll;
@@ -59,6 +59,12 @@ pub(crate) struct Pool {
// The number of workers will *usually* be small.
pub workers: Arc<[worker::Entry]>,
+ // The global MPMC queue of tasks.
+ //
+ // Spawned tasks are pushed into this queue. Although worker threads have their own dedicated
+ // task queues, they periodically steal tasks from this global queue, too.
+ pub queue: Arc<Queue>,
+
// Completes the shutdown process when the `ThreadPool` and all `Worker`s get dropped.
//
// When spawning a new `Worker`, this weak reference is upgraded and handed out to the new
@@ -90,6 +96,7 @@ impl Pool {
trigger: Weak<ShutdownTrigger>,
max_blocking: usize,
config: Config,
+ queue: Arc<Queue>,
) -> Pool {
let pool_size = workers.len();
let total_size = max_blocking + pool_size;
@@ -117,6 +124,7 @@ impl Pool {
sleep_stack: CachePadded::new(worker::Stack::new()),
num_workers: AtomicUsize::new(0),
workers,
+ queue,
trigger,
backup,
backup_stack,
@@ -264,50 +272,10 @@ impl Pool {
pub fn submit_external(&self, task: Arc<Task>, pool: &Arc<Pool>) {
debug_assert_eq!(*self, **pool);
- use worker::Lifecycle::Notified;
-
- // First try to get a handle to a sleeping worker. This ensures that
- // sleeping tasks get woken up
- if let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Notified, false) {
- trace!("submit to existing worker; idx={}; state={:?}", idx, worker_state);
- self.submit_to_external(idx, task, worker_state, pool);
- return;
- }
-
- // All workers are active, so pick a random worker and submit the
- // task to it.
- self.submit_to_random(task, pool);
- }
-
- /// Submit a task to a random worker
- ///
- /// Called from outside of the scheduler, this function is how new tasks
- /// enter the system.
- pub fn submit_to_random(&self, task: Arc<Task>, pool: &Arc<Pool>) {
- debug_assert_eq!(*self, **pool);
-
- let len = self.workers.len();
- let idx = self.rand_usize() % len;
-
- trace!(" -> submitting to random; idx={}", idx);
-
- let state = self.workers[idx].load_state();
- self.submit_to_external(idx, task, state, pool);
- }
-
- fn submit_to_external(&self,
- idx: usize,
- task: Arc<Task>,
- state: worker::State,
- pool: &Arc<Pool>)
- {
- debug_assert_eq!(*self, **pool);
-
- let entry = &self.workers[idx];
+ trace!(" -> submit external");
- if !entry.submit_external(task, state) {
- self.spawn_thread(WorkerId::new(idx), pool);
- }
+ self.queue.push(task);
+ self.signal_work(pool);
}
pub fn release_backup(&self, backup_id: BackupId) -> Result<(), ()> {
@@ -438,43 +406,21 @@ impl Pool {
pub fn signal_work(&self, pool: &Arc<Pool>) {
debug_assert_eq!(*self, **pool);
- use worker::Lifecycle::*;
+ use worker::Lifecycle::Signaled;
- if let Some((idx, mut worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, false) {
+ if let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, false) {
let entry = &self.workers[idx];
- debug_assert!(worker_state.lifecycle() != Signaled, "actual={:?}", worker_state.lifecycle());
-
- // Transition the worker state to signaled
- loop {
- let mut next = worker_state;
-
- next.set_lifecycle(Signaled);
+ debug_assert!(
+ worker_state.lifecycle() != Signaled,
+ "actual={:?}", worker_state.lifecycle(),
+ );
- let actual = entry.state.compare_and_swap(
- worker_state.into(), next.into(), AcqRel).into();
+ trace!("signal_work -- notify; idx={}", idx);
- if actual == worker_state {
- break;
- }
-
- worker_state = actual;
- }
-
- // The state has been transitioned to signal, now we need to wake up
- // the worker if necessary.
- match worker_state.lifecycle() {
- Sleeping => {
- trace!("signal_work -- wakeup; idx={}", idx);
- self.workers[idx].wakeup();
- }
- Shutdown => {
- trace!("signal_work -- spawn; idx={}", idx);
- self.spawn_thread(WorkerId(idx), pool);
- }
- Running | Notified | Signaled => {
- // The workers are already active. No need to wake them up.
- }
+ if !entry.notify(worker_state) {
+ trace!("signal_work -- spawn; idx={}", idx);
+ self.spawn_thread(WorkerId(idx), pool);
}
}
}
diff --git a/tokio-threadpool/src/sender.rs b/tokio-threadpool/src/sender.rs
index 0ff99d73..de5f0e07 100644
--- a/tokio-threadpool/src/sender.rs
+++ b/tokio-threadpool/src/sender.rs
@@ -161,7 +161,10 @@ impl<'a> tokio_executor::Executor for &'a Sender {
// Create a new task for the future
let task = Arc::new(Task::new(future));
- self.pool.submit_to_random(task, &self.pool);
+ // Call `submit_external()` in order to place the task into the global
+ // queue. This way all workers have equal chance of running this task,
+ // which means IO handles will be assigned to reactors more evenly.
+ self.pool.submit_external(task, &self.pool);
Ok(())
}
diff --git a/tokio-threadpool/src/shutdown.rs b/tokio-threadpool/src/shutdown.rs
index 8167d014..1cc19b54 100644
--- a/tokio-threadpool/src/shutdown.rs
+++ b/tokio-threadpool/src/shutdown.rs
@@ -1,3 +1,4 @@
+use task::Queue;
use worker;
use futures::{Future, Poll, Async};
@@ -61,25 +62,30 @@ impl Future for Shutdown {
pub(crate) struct ShutdownTrigger {
inner: Arc<Mutex<Inner>>,
workers: Arc<[worker::Entry]>,
+ queue: Arc<Queue>,
}
unsafe impl Send for ShutdownTrigger {}
unsafe impl Sync for ShutdownTrigger {}
impl ShutdownTrigger {
- pub(crate) fn new(workers: Arc<[worker::Entry]>) -> ShutdownTrigger {
+ pub(crate) fn new(workers: Arc<[worker::Entry]>, queue: Arc<Queue>) -> ShutdownTrigger {
ShutdownTrigger {
inner: Arc::new(Mutex::new(Inner {
task: AtomicTask::new(),
completed: false,
})),
workers,
+ queue,
}
}
}
impl Drop for ShutdownTrigger {
fn drop(&mut self) {
+ // Drain the global task queue.
+ while self.queue.pop().is_some() {}
+
// Notify the task interested in shutdown.
let mut inner = self.inner.lock().unwrap();
inner.completed = true;
diff --git a/tokio-threadpool/src/task/mod.rs b/tokio-threadpool/src/task/mod.rs
index be95b2dc..ef873af5 100644
--- a/tokio-threadpool/src/task/mod.rs
+++ b/tokio-threadpool/src/task/mod.rs
@@ -4,7 +4,7 @@ mod queue;
mod state;
pub(crate) use self::blocking::{Blocking, CanBlock};
-pub(crate) use self::queue::{Queue, Poll};
+pub(crate) use self::queue::Queue;
use self::blocking_state::BlockingState;
use self::state::State;
@@ -31,9 +31,6 @@ pub(crate) struct Task {
/// Task blocking related state
blocking: AtomicUsize,
- /// Next pointer in the queue that submits tasks to a worker.
- next: AtomicPtr<Task>,
-
/// Next pointer in the queue of tasks pending blocking capacity.
next_blocking: AtomicPtr<Task>,
@@ -63,7 +60,6 @@ impl Task {
Task {
state: AtomicUsize::new(State::new().into()),
blocking: AtomicUsize::new(BlockingState::new().into()),
- next: AtomicPtr::new(ptr::null_mut()),
next_blocking: AtomicPtr::new(ptr::null_mut()),
future: UnsafeCell::new(Some(task_fut)),
}
@@ -78,7 +74,6 @@ impl Task {
Task {
state: AtomicUsize::new(State::stub().into()),
blocking: AtomicUsize::new(BlockingState::new().into()),
- next: AtomicPtr::new(ptr::null_mut()),
next_blocking: AtomicPtr::new(ptr::null_mut()),
future: UnsafeCell::new(Some(task_fut)),
}
@@ -237,7 +232,6 @@ impl Task {
impl fmt::Debug for Task {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Task")
- .field("next", &self.next)
.field("state", &self.state)
.field("future", &"Spawn<BoxFuture>")
.finish()
diff --git a/tokio-threadpool/src/task/queue.rs b/tokio-threadpool/src/task/queue.rs
index 546b8c50..166438e7 100644
--- a/tokio-threadpool/src/task/queue.rs
+++ b/tokio-threadpool/src/task/queue.rs
@@ -1,33 +1,13 @@
use task::Task;
-use std::cell::UnsafeCell;
-use std::ptr;
use std::sync::Arc;
-use std::sync::atomic::AtomicPtr;
-use std::sync::atomic::Ordering::{Acquire, Release, AcqRel, Relaxed};
-use crossbeam_utils::CachePadded;
+use crossbeam_channel::{unbounded, Receiver, Sender};
#[derive(Debug)]
pub(crate) struct Queue {
- /// Queue head.
- ///
- /// This is a strong reference to `Task` (i.e, `Arc<Task>`)
- head: CachePadded<AtomicPtr<Task>>,
-
- /// Tail pointer. This is `Arc<Task>` unless it points to `stub`.
- tail: UnsafeCell<*mut Task>,
-
- /// Stub pointer, used as part of the intrusive mpsc channel algorithm
- /// described by 1024cores.
- stub: Box<Task>,
-}
-
-#[derive(Debug)]
-pub(crate) enum Poll {
- Empty,
- Inconsistent,
- Data(Arc<Task>),
+ // TODO(stjepang): Use a custom, faster MPMC queue implementation that supports `steal_many()`.
+ chan: (Sender<Arc<Task>>, Receiver<Arc<Task>>),
}
// ===== impl Queue =====
@@ -35,93 +15,20 @@ pub(crate) enum Poll {
impl Queue {
/// Create a new, empty, `Queue`.
pub fn new() -> Queue {
- let stub = Box::new(Task::stub());
- let ptr = &*stub as *const _ as *mut _;
-
Queue {
- head: CachePadded::new(AtomicPtr::new(ptr)),
- tail: UnsafeCell::new(ptr),
- stub: stub,
+ chan: unbounded(),
}
}
/// Push a task onto the queue.
- ///
- /// This function is `Sync`.
+ #[inline]
pub fn push(&self, task: Arc<Task>) {
- unsafe {
- self.push2(Arc::into_raw(task));
- }
- }
-
- unsafe fn push2(&self, task: *const Task) {
- let task = task as *mut Task;
-
- // Set the next pointer. This does not require an atomic operation as
- // this node is not accessible. The write will be flushed with the next
- // operation
- (*task).next.store(ptr::null_mut(), Relaxed);
-
- // Update the head to point to the new node. We need to see the previous
- // node in order to update the next pointer as well as release `task`
- // to any other threads calling `push`.
- let prev = self.head.swap(task, AcqRel);
-
- // Release `task` to the consume end.
- (*prev).next.store(task, Release);
+ self.chan.0.send(task).unwrap();
}
- /// Poll a task from the queue.
- ///
- /// This function is **not** `Sync` and requires coordination by the caller.
- pub unsafe fn poll(&self) -> Poll {
- let mut tail = *self.tail.get();
- let mut next = (*tail).next.load(Acquire);
-
- let stub = &*self.stub as *const _ as *mut _;
-
- if tail == stub {
- if next.is_null() {
- return Poll::Empty;
- }
-
- *self.tail.get() = next;
- tail = next;
- next = (*next).next.load(Acquire);
- }
-
- if !next.is_null() {
- *self.tail.get() = next;
-
- // No ref_count inc is necessary here as this poll is paired
- // with a `push` which "forgets" the handle.
- return Poll::Data(Arc::from_raw(tail));
- }
-
- if self.head.load(Acquire) != tail {
- return Poll::Inconsistent;
- }
-
- self.push2(stub);
-
- next = (*tail).next.load(Acquire);
-
- if !next.is_null() {
- *self.tail.get() = next;
-
- return Poll::Data(Arc::from_raw(tail));
- }
-
- Poll::Inconsistent
- }
-}
-
-impl Drop for Queue {
- fn drop(&mut self) {
- loop {
- if let Poll::Empty = unsafe { self.poll() } {
- break
- }
- }
+ /// Pop a task from the queue.
+ #[inline]
+ pub fn pop(&self) -> Option<Arc<Task>> {
+ self.chan.1.try_recv().ok()
}
}
diff --git a/tokio-threadpool/src/worker/entry.rs b/tokio-threadpool/src/worker/entry.rs
index 459e7cc1..d1013381 100644
--- a/tokio-threadpool/src/worker/entry.rs
+++ b/tokio-threadpool/src/worker/entry.rs
@@ -1,12 +1,12 @@
use park::{BoxPark, BoxUnpark};
-use task::{Task, Queue};
+use task::Task;
use worker::state::{State, PUSHED_MASK};
use std::cell::UnsafeCell;
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::atomic::Ordering::{Acquire, AcqRel, Relaxed};
+use std::sync::atomic::Ordering::{AcqRel, Relaxed};
use crossbeam_utils::CachePadded;
use deque;
@@ -36,9 +36,6 @@ pub(crate) struct WorkerEntry {
// Thread unparker
pub unpark: BoxUnpark,
-
- // MPSC queue of jobs submitted to the worker from an external source.
- pub inbound: Queue,
}
impl WorkerEntry {
@@ -50,21 +47,11 @@ impl WorkerEntry {
next_sleeper: UnsafeCell::new(0),
worker: w,
stealer: s,
- inbound: Queue::new(),
park: UnsafeCell::new(park),
unpark,
}
}
- /// Atomically load the worker's state
- ///
- /// # Ordering
- ///
- /// An `Acquire` ordering is established on the entry's state variable.
- pub fn load_state(&self) -> State {
- self.state.load(Acquire).into()
- }
-
/// Atomically unset the pushed flag.
///
/// # Return
@@ -85,20 +72,15 @@ impl WorkerEntry {
self.push_internal(task);
}
- /// Submits a task to the worker. This assumes that the caller is external
- /// to the worker. Internal submissions go through another path.
- ///
- /// Returns `false` if the worker needs to be spawned.
+ /// Notifies the worker and returns `false` if it needs to be spawned.
///
/// # Ordering
///
/// The `state` must have been obtained with an `Acquire` ordering.
- pub fn submit_external(&self, task: Arc<Task>, mut state: State) -> bool {
+ #[inline]
+ pub fn notify(&self, mut state: State) -> bool {
use worker::Lifecycle::*;
- // Push the task onto the external queue
- self.push_external(task);
-
loop {
let mut next = state;
next.notify();
@@ -188,6 +170,7 @@ impl WorkerEntry {
///
/// This **must** only be called by the thread that owns the worker entry.
/// This function is not `Sync`.
+ #[inline]
pub fn pop_task(&self) -> deque::Pop<Arc<Task>> {
self.worker.pop()
}
@@ -208,23 +191,18 @@ impl WorkerEntry {
///
/// This is called when the pool is shutting down.
pub fn drain_tasks(&self) {
- use deque::Pop;
+ use deque::Pop::*;
loop {
match self.worker.pop() {
- Pop::Data(_) => {}
- Pop::Empty => break,
- Pop::Retry => {}
+ Data(_) => {}
+ Empty => break,
+ Retry => {}
}
}
}
#[inline]
- fn push_external(&self, task: Arc<Task>) {
- self.inbound.push(task);
- }
-
- #[inline]
pub fn push_internal(&self, task: Arc<Task>) {
self.worker.push(task);
}
@@ -254,7 +232,6 @@ impl fmt::Debug for WorkerEntry {
.field("stealer", &self.stealer)
.field("park", &"UnsafeCell<BoxPark>")
.field("unpark", &"BoxUnpark")
- .field("inbound", &self.inbound)
.finish()
}
}
diff --git a/tokio-threadpool/src/worker/mod.rs b/tokio-threadpool/src/worker/mod.rs
index 98005ca7..a82fe9f5 100644
--- a/tokio-threadpool/src/worker/mod.rs
+++ b/tokio-threadpool/src/worker/mod.rs
@@ -242,10 +242,6 @@ impl Worker {
while self.check_run_state(first) {
first = false;
- // Poll inbound until empty, transferring all tasks to the internal
- // queue.
- let consistent = self.drain_inbound();
-
// Run the next available task
if self.try_run_task(&notify) {
if self.is_blocking.get() {
@@ -253,6 +249,8 @@ impl Worker {
return;
}
+ // Poll the reactor and the global queue every now and then to
+ // ensure no task gets left behind.
if tick % LIGHT_SLEEP_INTERVAL == 0 {
self.sleep_light();
}
@@ -264,11 +262,6 @@ impl Worker {
continue;
}
- if !consistent {
- spin_cnt = 0;
- continue;
- }
-
spin_cnt += 1;
// Yield the thread several times before it actually goes to sleep.
@@ -423,7 +416,7 @@ impl Worker {
if idx < len {
match self.pool.workers[idx].steal_tasks(self.entry()) {
Steal::Data(task) => {
- trace!("stole task");
+ trace!("stole task from another worker");
self.run_task(task, notify);
@@ -562,48 +555,6 @@ impl Worker {
task.run(notify)
}
- /// Drains all tasks on the extern queue and pushes them onto the internal
- /// queue.
- ///
- /// Returns `true` if the operation was able to complete in a consistent
- /// state.
- #[inline]
- fn drain_inbound(&self) -> bool {
- use task::Poll::*;
-
- let mut found_work = false;
-
- loop {
- let task = unsafe { self.entry().inbound.poll() };
-
- match task {
- Empty => {
- if found_work {
- // TODO: Why is this called on every iteration? Would it
- // not be better to only signal when work was found
- // after waking up?
- trace!("found work while draining; signal_work");
- self.pool.signal_work(&self.pool);
- }
-
- return true;
- }
- Inconsistent => {
- if found_work {
- trace!("found work while draining; signal_work");
- self.pool.signal_work(&self.pool);
- }
-
- return false;
- }
- Data(task) => {
- found_work = true;
- self.entry().push_internal(task);
- }
- }
- }
- }
-
/// Put the worker to sleep
///
/// Returns `true` if woken up due to new work arriving.
@@ -679,18 +630,16 @@ impl Worker {
trace!(" -> starting to sleep; idx={}", self.id.0);
+ // Do a quick check to see if there are any notifications in the
+ // reactor or new tasks in the global queue. Since this call will
+ // clear the wakeup token, we need to check the state again and
+ // only after that go to sleep.
+ self.sleep_light();
+
// The state has been transitioned to sleeping, we can now wait by
// calling the parker. This is done in a loop as condvars can wakeup
// spuriously.
loop {
- unsafe {
- (*self.entry().park.get())
- .park()
- .unwrap();
- }
-
- trace!(" -> wakeup; idx={}", self.id.0);
-
// Reload the state
state = self.entry().state.load(Acquire).into();
@@ -722,18 +671,38 @@ impl Worker {
unreachable!();
}
}
+
+ unsafe {
+ (*self.entry().park.get())
+ .park()
+ .unwrap();
+ }
+
+ trace!(" -> wakeup; idx={}", self.id.0);
}
}
/// This doesn't actually put the thread to sleep. It calls
/// `park.park_timeout` with a duration of 0. This allows the park
/// implementation to perform any work that might be done on an interval.
+ ///
+ /// Returns `true` if this worker has tasks in its queue.
fn sleep_light(&self) {
+ const STEAL_COUNT: usize = 32;
+
unsafe {
(*self.entry().park.get())
.park_timeout(Duration::from_millis(0))
.unwrap();
}
+
+ for _ in 0..STEAL_COUNT {
+ if let Some(task) = self.pool.queue.pop() {
+ self.pool.submit(task, &self.pool);
+ } else {
+ break;
+ }
+ }
}
fn entry(&self) -> &Entry {
@@ -747,14 +716,8 @@ impl Drop for Worker {
trace!("shutting down thread; idx={}", self.id.0);
if self.should_finalize.get() {
- // Get all inbound work and push it onto the work queue. The work
- // queue is drained in the next step.
- self.drain_inbound();
-
// Drain the work queue
self.entry().drain_tasks();
-
- // TODO: Drain the work queue...
}
}
}