summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/basic_scheduler.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/runtime/basic_scheduler.rs')
-rw-r--r--tokio/src/runtime/basic_scheduler.rs461
1 files changed, 226 insertions, 235 deletions
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs
index f625920d..a494f9e3 100644
--- a/tokio/src/runtime/basic_scheduler.rs
+++ b/tokio/src/runtime/basic_scheduler.rs
@@ -1,80 +1,110 @@
use crate::park::{Park, Unpark};
-use crate::task::{self, queue::MpscQueues, JoinHandle, Schedule, ScheduleSendOnly, Task};
+use crate::runtime;
+use crate::runtime::task::{self, JoinHandle, Schedule, Task};
+use crate::util::linked_list::LinkedList;
+use crate::util::{waker_ref, Wake};
-use std::cell::Cell;
+use std::cell::RefCell;
+use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
-use std::mem::ManuallyDrop;
-use std::ptr;
-use std::sync::Arc;
-use std::task::{RawWaker, RawWakerVTable, Waker};
+use std::sync::{Arc, Mutex};
+use std::task::Poll::Ready;
use std::time::Duration;
/// Executes tasks on the current thread
-#[derive(Debug)]
pub(crate) struct BasicScheduler<P>
where
P: Park,
{
- /// Scheduler component
- scheduler: Arc<SchedulerPriv>,
+ /// Scheduler run queue
+ ///
+ /// When the scheduler is executed, the queue is removed from `self` and
+ /// moved into `Context`.
+ ///
+ /// This indirection is to allow `BasicScheduler` to be `Send`.
+ tasks: Option<Tasks>,
- /// Local state
- local: LocalState<P>,
+ /// Sendable task spawner
+ spawner: Spawner,
+
+ /// Current tick
+ tick: u8,
+
+ /// Thread park handle
+ park: P,
}
-#[derive(Debug, Clone)]
+#[derive(Clone)]
pub(crate) struct Spawner {
- scheduler: Arc<SchedulerPriv>,
+ shared: Arc<Shared>,
+}
+
+struct Tasks {
+ /// Collection of all active tasks spawned onto this executor.
+ owned: LinkedList<Task<Arc<Shared>>>,
+
+ /// Local run queue.
+ ///
+ /// Tasks notified from the current thread are pushed into this queue.
+ queue: VecDeque<task::Notified<Arc<Shared>>>,
}
-/// The scheduler component.
-pub(super) struct SchedulerPriv {
- queues: MpscQueues<Self>,
+/// Scheduler state shared between threads.
+struct Shared {
+ /// Remote run queue
+ queue: Mutex<VecDeque<task::Notified<Arc<Shared>>>>,
+
/// Unpark the blocked thread
unpark: Box<dyn Unpark>,
}
-unsafe impl Send for SchedulerPriv {}
-unsafe impl Sync for SchedulerPriv {}
+/// Thread-local context
+struct Context {
+ /// Shared scheduler state
+ shared: Arc<Shared>,
-/// Local state
-#[derive(Debug)]
-struct LocalState<P> {
- /// Current tick
- tick: u8,
-
- /// Thread park handle
- park: P,
+ /// Local queue
+ tasks: RefCell<Tasks>,
}
+/// Initial queue capacity
+const INITIAL_CAPACITY: usize = 64;
+
/// Max number of tasks to poll per tick.
const MAX_TASKS_PER_TICK: usize = 61;
-thread_local! {
- static ACTIVE: Cell<*const SchedulerPriv> = Cell::new(ptr::null())
-}
+/// How often ot check the remote queue first
+const REMOTE_FIRST_INTERVAL: u8 = 31;
+
+// Tracks the current BasicScheduler
+scoped_thread_local!(static CURRENT: Context);
impl<P> BasicScheduler<P>
where
P: Park,
{
pub(crate) fn new(park: P) -> BasicScheduler<P> {
- let unpark = park.unpark();
+ let unpark = Box::new(park.unpark());
BasicScheduler {
- scheduler: Arc::new(SchedulerPriv {
- queues: MpscQueues::new(),
- unpark: Box::new(unpark),
+ tasks: Some(Tasks {
+ owned: LinkedList::new(),
+ queue: VecDeque::with_capacity(INITIAL_CAPACITY),
}),
- local: LocalState { tick: 0, park },
+ spawner: Spawner {
+ shared: Arc::new(Shared {
+ queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
+ unpark: unpark as Box<dyn Unpark>,
+ }),
+ },
+ tick: 0,
+ park,
}
}
- pub(crate) fn spawner(&self) -> Spawner {
- Spawner {
- scheduler: self.scheduler.clone(),
- }
+ pub(crate) fn spawner(&self) -> &Spawner {
+ &self.spawner
}
/// Spawns a future onto the thread pool
@@ -83,74 +113,146 @@ where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
- let (task, handle) = task::joinable(future);
- self.scheduler.schedule(task, true);
- handle
+ self.spawner.spawn(future)
}
- pub(crate) fn block_on<F>(&mut self, mut future: F) -> F::Output
+ pub(crate) fn block_on<F>(&mut self, future: F) -> F::Output
where
F: Future,
{
- use crate::runtime;
- use std::pin::Pin;
- use std::task::Context;
- use std::task::Poll::Ready;
+ enter(self, |scheduler, context| {
+ let _enter = runtime::enter();
+ let waker = waker_ref(&scheduler.spawner.shared);
+ let mut cx = std::task::Context::from_waker(&waker);
- let local = &mut self.local;
- let scheduler = &*self.scheduler;
+ pin!(future);
- struct Guard {
- old: *const SchedulerPriv,
- }
+ 'outer: loop {
+ if let Ready(v) = future.as_mut().poll(&mut cx) {
+ return v;
+ }
- impl Drop for Guard {
- fn drop(&mut self) {
- ACTIVE.with(|cell| cell.set(self.old));
- }
- }
+ for _ in 0..MAX_TASKS_PER_TICK {
+ // Get and increment the current tick
+ let tick = scheduler.tick;
+ scheduler.tick = scheduler.tick.wrapping_add(1);
+
+ let next = if tick % REMOTE_FIRST_INTERVAL == 0 {
+ scheduler
+ .spawner
+ .pop()
+ .or_else(|| context.tasks.borrow_mut().queue.pop_front())
+ } else {
+ context
+ .tasks
+ .borrow_mut()
+ .queue
+ .pop_front()
+ .or_else(|| scheduler.spawner.pop())
+ };
+
+ match next {
+ Some(task) => task.run(),
+ None => {
+ // Park until the thread is signaled
+ scheduler.park.park().ok().expect("failed to park");
+
+ // Try polling the `block_on` future next
+ continue 'outer;
+ }
+ }
+ }
- // Track the current scheduler
- let _guard = ACTIVE.with(|cell| {
- let guard = Guard { old: cell.get() };
+ // Yield to the park, this drives the timer and pulls any pending
+ // I/O events.
+ scheduler
+ .park
+ .park_timeout(Duration::from_millis(0))
+ .ok()
+ .expect("failed to park");
+ }
+ })
+ }
+}
- cell.set(scheduler as *const SchedulerPriv);
+/// Enter the scheduler context. This sets the queue and other necessary
+/// scheduler state in the thread-local
+fn enter<F, R, P>(scheduler: &mut BasicScheduler<P>, f: F) -> R
+where
+ F: FnOnce(&mut BasicScheduler<P>, &Context) -> R,
+ P: Park,
+{
+ // Ensures the run queue is placed back in the `BasicScheduler` instance
+ // once `block_on` returns.`
+ struct Guard<'a, P: Park> {
+ context: Option<Context>,
+ scheduler: &'a mut BasicScheduler<P>,
+ }
- guard
- });
+ impl<P: Park> Drop for Guard<'_, P> {
+ fn drop(&mut self) {
+ let Context { tasks, .. } = self.context.take().expect("context missing");
+ self.scheduler.tasks = Some(tasks.into_inner());
+ }
+ }
- let mut _enter = runtime::enter();
+ // Remove `tasks` from `self` and place it in a `Context`.
+ let tasks = scheduler.tasks.take().expect("invalid state");
- let raw_waker = RawWaker::new(
- scheduler as *const SchedulerPriv as *const (),
- &RawWakerVTable::new(sched_clone_waker, sched_noop, sched_wake_by_ref, sched_noop),
- );
+ let guard = Guard {
+ context: Some(Context {
+ shared: scheduler.spawner.shared.clone(),
+ tasks: RefCell::new(tasks),
+ }),
+ scheduler,
+ };
- let waker = ManuallyDrop::new(unsafe { Waker::from_raw(raw_waker) });
- let mut cx = Context::from_waker(&waker);
+ let context = guard.context.as_ref().unwrap();
+ let scheduler = &mut *guard.scheduler;
- // `block_on` takes ownership of `f`. Once it is pinned here, the
- // original `f` binding can no longer be accessed, making the
- // pinning safe.
- let mut future = unsafe { Pin::new_unchecked(&mut future) };
+ CURRENT.set(context, || f(scheduler, context))
+}
- loop {
- if let Ready(v) = future.as_mut().poll(&mut cx) {
- return v;
+impl<P> Drop for BasicScheduler<P>
+where
+ P: Park,
+{
+ fn drop(&mut self) {
+ enter(self, |scheduler, context| {
+ // Loop required here to ensure borrow is dropped between iterations
+ #[allow(clippy::while_let_loop)]
+ loop {
+ let task = match context.tasks.borrow_mut().owned.pop_back() {
+ Some(task) => task,
+ None => break,
+ };
+
+ task.shutdown();
}
- scheduler.tick(local);
+ // Drain local queue
+ for task in context.tasks.borrow_mut().queue.drain(..) {
+ task.shutdown();
+ }
- // Maintenance work
- unsafe {
- // safety: this function is safe to call only from the
- // thread the basic scheduler is running on (which we are).
- scheduler.queues.drain_pending_drop();
+ // Drain remote queue
+ for task in scheduler.spawner.shared.queue.lock().unwrap().drain(..) {
+ task.shutdown();
}
- }
+
+ assert!(context.tasks.borrow().owned.is_empty());
+ });
+ }
+}
+
+impl<P: Park> fmt::Debug for BasicScheduler<P> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("BasicScheduler").finish()
}
}
+// ===== impl Spawner =====
+
impl Spawner {
/// Spawns a future onto the thread pool
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
@@ -159,177 +261,66 @@ impl Spawner {
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
- self.scheduler.schedule(task, true);
+ self.shared.schedule(task);
handle
}
-}
-
-// === impl SchedulerPriv ===
-
-impl SchedulerPriv {
- fn tick(&self, local: &mut LocalState<impl Park>) {
- for _ in 0..MAX_TASKS_PER_TICK {
- // Get the current tick
- let tick = local.tick;
-
- // Increment the tick
- local.tick = tick.wrapping_add(1);
- let next = unsafe {
- // safety: this function is safe to call only from the
- // thread the basic scheduler is running on. The `LocalState`
- // parameter to this method implies that we are on that thread.
- self.queues.next_task(tick)
- };
-
- let task = match next {
- Some(task) => task,
- None => {
- local.park.park().ok().expect("failed to park");
- return;
- }
- };
-
- if let Some(task) = task.run(&mut || Some(self.into())) {
- unsafe {
- // safety: this function is safe to call only from the
- // thread the basic scheduler is running on. The `LocalState`
- // parameter to this method implies that we are on that thread.
- self.queues.push_local(task);
- }
- }
- }
-
- local
- .park
- .park_timeout(Duration::from_millis(0))
- .ok()
- .expect("failed to park");
- }
-
- /// Schedule the provided task on the scheduler.
- ///
- /// If this scheduler is the `ACTIVE` scheduler, enqueue this task on the local queue, otherwise
- /// the task is enqueued on the remote queue.
- fn schedule(&self, task: Task<Self>, spawn: bool) {
- let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv);
- if is_current {
- unsafe {
- // safety: this function is safe to call only from the
- // thread the basic scheduler is running on. If `is_current` is
- // then we are on that thread.
- self.queues.push_local(task)
- };
- } else {
- let mut lock = self.queues.remote();
- lock.schedule(task, spawn);
-
- // while locked, call unpark
- self.unpark.unpark();
-
- drop(lock);
- }
+ fn pop(&self) -> Option<task::Notified<Arc<Shared>>> {
+ self.shared.queue.lock().unwrap().pop_front()
}
}
-impl Schedule for SchedulerPriv {
- fn bind(&self, task: &Task<Self>) {
- unsafe {
- // safety: `Queues::add_task` is only safe to call from the thread
- // that owns the queues (the thread the scheduler is running on).
- // `Scheduler::bind` is called when polling a task that
- // doesn't have a scheduler set. We will only poll new tasks from
- // the thread that the scheduler is running on. Therefore, this is
- // safe to call.
- self.queues.add_task(task);
- }
+impl fmt::Debug for Spawner {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Spawner").finish()
}
+}
- fn release(&self, task: Task<Self>) {
- self.queues.release_remote(task);
- }
+// ===== impl Shared =====
- fn release_local(&self, task: &Task<Self>) {
- unsafe {
- // safety: `Scheduler::release_local` is only called from the
- // thread that the scheduler is running on. The `Schedule` trait's
- // contract is that releasing a task from another thread should call
- // `release` rather than `release_local`.
- self.queues.release_local(task);
- }
+impl Schedule for Arc<Shared> {
+ fn bind(task: Task<Self>) -> Arc<Shared> {
+ CURRENT.with(|maybe_cx| {
+ let cx = maybe_cx.expect("scheduler context missing");
+ cx.tasks.borrow_mut().owned.push_front(task);
+ cx.shared.clone()
+ })
}
- fn schedule(&self, task: Task<Self>) {
- SchedulerPriv::schedule(self, task, false);
- }
-}
+ fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
+ use std::ptr::NonNull;
-impl ScheduleSendOnly for SchedulerPriv {}
+ CURRENT.with(|maybe_cx| {
+ let cx = maybe_cx.expect("scheduler context missing");
-impl<P> Drop for BasicScheduler<P>
-where
- P: Park,
-{
- fn drop(&mut self) {
- unsafe {
- // safety: the `Drop` impl owns the scheduler's queues. these fields
- // will only be accessed when running the scheduler, and it can no
- // longer be run, since we are in the process of dropping it.
-
- // Shut down the task queues.
- self.scheduler.queues.shutdown();
- }
-
- // Wait until all tasks have been released.
- loop {
+ // safety: the task is inserted in the list in `bind`.
unsafe {
- self.scheduler.queues.drain_pending_drop();
- self.scheduler.queues.drain_queues();
-
- if !self.scheduler.queues.has_tasks_remaining() {
- break;
- }
-
- self.local.park.park().ok().expect("park failed");
+ let ptr = NonNull::from(task.header());
+ cx.tasks.borrow_mut().owned.remove(ptr)
}
- }
+ })
}
-}
-impl fmt::Debug for SchedulerPriv {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Scheduler")
- .field("queues", &self.queues)
- .finish()
+ fn schedule(&self, task: task::Notified<Self>) {
+ CURRENT.with(|maybe_cx| match maybe_cx {
+ Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
+ cx.tasks.borrow_mut().queue.push_back(task);
+ }
+ _ => {
+ self.queue.lock().unwrap().push_back(task);
+ self.unpark.unpark();
+ }
+ });
}
}
-unsafe fn sched_clone_waker(ptr: *const ()) -> RawWaker {
- let s1 = ManuallyDrop::new(Arc::from_raw(ptr as *const SchedulerPriv));
-
- #[allow(clippy::redundant_clone)]
- let s2 = s1.clone();
-
- RawWaker::new(
- &**s2 as *const SchedulerPriv as *const (),
- &RawWakerVTable::new(sched_clone_waker, sched_wake, sched_wake_by_ref, sched_drop),
- )
-}
-
-unsafe fn sched_wake(ptr: *const ()) {
- let scheduler = Arc::from_raw(ptr as *const SchedulerPriv);
- scheduler.unpark.unpark();
-}
-
-unsafe fn sched_wake_by_ref(ptr: *const ()) {
- let scheduler = ManuallyDrop::new(Arc::from_raw(ptr as *const SchedulerPriv));
- scheduler.unpark.unpark();
-}
-
-unsafe fn sched_drop(ptr: *const ()) {
- let _ = Arc::from_raw(ptr as *const SchedulerPriv);
-}
+impl Wake for Shared {
+ fn wake(self: Arc<Self>) {
+ Wake::wake_by_ref(&self)
+ }
-unsafe fn sched_noop(_ptr: *const ()) {
- unreachable!();
+ /// Wake by reference
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ arc_self.unpark.unpark();
+ }
}