diff options
Diffstat (limited to 'tokio/src/runtime/basic_scheduler.rs')
-rw-r--r-- | tokio/src/runtime/basic_scheduler.rs | 461 |
1 files changed, 226 insertions, 235 deletions
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index f625920d..a494f9e3 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -1,80 +1,110 @@ use crate::park::{Park, Unpark}; -use crate::task::{self, queue::MpscQueues, JoinHandle, Schedule, ScheduleSendOnly, Task}; +use crate::runtime; +use crate::runtime::task::{self, JoinHandle, Schedule, Task}; +use crate::util::linked_list::LinkedList; +use crate::util::{waker_ref, Wake}; -use std::cell::Cell; +use std::cell::RefCell; +use std::collections::VecDeque; use std::fmt; use std::future::Future; -use std::mem::ManuallyDrop; -use std::ptr; -use std::sync::Arc; -use std::task::{RawWaker, RawWakerVTable, Waker}; +use std::sync::{Arc, Mutex}; +use std::task::Poll::Ready; use std::time::Duration; /// Executes tasks on the current thread -#[derive(Debug)] pub(crate) struct BasicScheduler<P> where P: Park, { - /// Scheduler component - scheduler: Arc<SchedulerPriv>, + /// Scheduler run queue + /// + /// When the scheduler is executed, the queue is removed from `self` and + /// moved into `Context`. + /// + /// This indirection is to allow `BasicScheduler` to be `Send`. + tasks: Option<Tasks>, - /// Local state - local: LocalState<P>, + /// Sendable task spawner + spawner: Spawner, + + /// Current tick + tick: u8, + + /// Thread park handle + park: P, } -#[derive(Debug, Clone)] +#[derive(Clone)] pub(crate) struct Spawner { - scheduler: Arc<SchedulerPriv>, + shared: Arc<Shared>, +} + +struct Tasks { + /// Collection of all active tasks spawned onto this executor. + owned: LinkedList<Task<Arc<Shared>>>, + + /// Local run queue. + /// + /// Tasks notified from the current thread are pushed into this queue. + queue: VecDeque<task::Notified<Arc<Shared>>>, } -/// The scheduler component. -pub(super) struct SchedulerPriv { - queues: MpscQueues<Self>, +/// Scheduler state shared between threads. +struct Shared { + /// Remote run queue + queue: Mutex<VecDeque<task::Notified<Arc<Shared>>>>, + /// Unpark the blocked thread unpark: Box<dyn Unpark>, } -unsafe impl Send for SchedulerPriv {} -unsafe impl Sync for SchedulerPriv {} +/// Thread-local context +struct Context { + /// Shared scheduler state + shared: Arc<Shared>, -/// Local state -#[derive(Debug)] -struct LocalState<P> { - /// Current tick - tick: u8, - - /// Thread park handle - park: P, + /// Local queue + tasks: RefCell<Tasks>, } +/// Initial queue capacity +const INITIAL_CAPACITY: usize = 64; + /// Max number of tasks to poll per tick. const MAX_TASKS_PER_TICK: usize = 61; -thread_local! { - static ACTIVE: Cell<*const SchedulerPriv> = Cell::new(ptr::null()) -} +/// How often ot check the remote queue first +const REMOTE_FIRST_INTERVAL: u8 = 31; + +// Tracks the current BasicScheduler +scoped_thread_local!(static CURRENT: Context); impl<P> BasicScheduler<P> where P: Park, { pub(crate) fn new(park: P) -> BasicScheduler<P> { - let unpark = park.unpark(); + let unpark = Box::new(park.unpark()); BasicScheduler { - scheduler: Arc::new(SchedulerPriv { - queues: MpscQueues::new(), - unpark: Box::new(unpark), + tasks: Some(Tasks { + owned: LinkedList::new(), + queue: VecDeque::with_capacity(INITIAL_CAPACITY), }), - local: LocalState { tick: 0, park }, + spawner: Spawner { + shared: Arc::new(Shared { + queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), + unpark: unpark as Box<dyn Unpark>, + }), + }, + tick: 0, + park, } } - pub(crate) fn spawner(&self) -> Spawner { - Spawner { - scheduler: self.scheduler.clone(), - } + pub(crate) fn spawner(&self) -> &Spawner { + &self.spawner } /// Spawns a future onto the thread pool @@ -83,74 +113,146 @@ where F: Future + Send + 'static, F::Output: Send + 'static, { - let (task, handle) = task::joinable(future); - self.scheduler.schedule(task, true); - handle + self.spawner.spawn(future) } - pub(crate) fn block_on<F>(&mut self, mut future: F) -> F::Output + pub(crate) fn block_on<F>(&mut self, future: F) -> F::Output where F: Future, { - use crate::runtime; - use std::pin::Pin; - use std::task::Context; - use std::task::Poll::Ready; + enter(self, |scheduler, context| { + let _enter = runtime::enter(); + let waker = waker_ref(&scheduler.spawner.shared); + let mut cx = std::task::Context::from_waker(&waker); - let local = &mut self.local; - let scheduler = &*self.scheduler; + pin!(future); - struct Guard { - old: *const SchedulerPriv, - } + 'outer: loop { + if let Ready(v) = future.as_mut().poll(&mut cx) { + return v; + } - impl Drop for Guard { - fn drop(&mut self) { - ACTIVE.with(|cell| cell.set(self.old)); - } - } + for _ in 0..MAX_TASKS_PER_TICK { + // Get and increment the current tick + let tick = scheduler.tick; + scheduler.tick = scheduler.tick.wrapping_add(1); + + let next = if tick % REMOTE_FIRST_INTERVAL == 0 { + scheduler + .spawner + .pop() + .or_else(|| context.tasks.borrow_mut().queue.pop_front()) + } else { + context + .tasks + .borrow_mut() + .queue + .pop_front() + .or_else(|| scheduler.spawner.pop()) + }; + + match next { + Some(task) => task.run(), + None => { + // Park until the thread is signaled + scheduler.park.park().ok().expect("failed to park"); + + // Try polling the `block_on` future next + continue 'outer; + } + } + } - // Track the current scheduler - let _guard = ACTIVE.with(|cell| { - let guard = Guard { old: cell.get() }; + // Yield to the park, this drives the timer and pulls any pending + // I/O events. + scheduler + .park + .park_timeout(Duration::from_millis(0)) + .ok() + .expect("failed to park"); + } + }) + } +} - cell.set(scheduler as *const SchedulerPriv); +/// Enter the scheduler context. This sets the queue and other necessary +/// scheduler state in the thread-local +fn enter<F, R, P>(scheduler: &mut BasicScheduler<P>, f: F) -> R +where + F: FnOnce(&mut BasicScheduler<P>, &Context) -> R, + P: Park, +{ + // Ensures the run queue is placed back in the `BasicScheduler` instance + // once `block_on` returns.` + struct Guard<'a, P: Park> { + context: Option<Context>, + scheduler: &'a mut BasicScheduler<P>, + } - guard - }); + impl<P: Park> Drop for Guard<'_, P> { + fn drop(&mut self) { + let Context { tasks, .. } = self.context.take().expect("context missing"); + self.scheduler.tasks = Some(tasks.into_inner()); + } + } - let mut _enter = runtime::enter(); + // Remove `tasks` from `self` and place it in a `Context`. + let tasks = scheduler.tasks.take().expect("invalid state"); - let raw_waker = RawWaker::new( - scheduler as *const SchedulerPriv as *const (), - &RawWakerVTable::new(sched_clone_waker, sched_noop, sched_wake_by_ref, sched_noop), - ); + let guard = Guard { + context: Some(Context { + shared: scheduler.spawner.shared.clone(), + tasks: RefCell::new(tasks), + }), + scheduler, + }; - let waker = ManuallyDrop::new(unsafe { Waker::from_raw(raw_waker) }); - let mut cx = Context::from_waker(&waker); + let context = guard.context.as_ref().unwrap(); + let scheduler = &mut *guard.scheduler; - // `block_on` takes ownership of `f`. Once it is pinned here, the - // original `f` binding can no longer be accessed, making the - // pinning safe. - let mut future = unsafe { Pin::new_unchecked(&mut future) }; + CURRENT.set(context, || f(scheduler, context)) +} - loop { - if let Ready(v) = future.as_mut().poll(&mut cx) { - return v; +impl<P> Drop for BasicScheduler<P> +where + P: Park, +{ + fn drop(&mut self) { + enter(self, |scheduler, context| { + // Loop required here to ensure borrow is dropped between iterations + #[allow(clippy::while_let_loop)] + loop { + let task = match context.tasks.borrow_mut().owned.pop_back() { + Some(task) => task, + None => break, + }; + + task.shutdown(); } - scheduler.tick(local); + // Drain local queue + for task in context.tasks.borrow_mut().queue.drain(..) { + task.shutdown(); + } - // Maintenance work - unsafe { - // safety: this function is safe to call only from the - // thread the basic scheduler is running on (which we are). - scheduler.queues.drain_pending_drop(); + // Drain remote queue + for task in scheduler.spawner.shared.queue.lock().unwrap().drain(..) { + task.shutdown(); } - } + + assert!(context.tasks.borrow().owned.is_empty()); + }); + } +} + +impl<P: Park> fmt::Debug for BasicScheduler<P> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("BasicScheduler").finish() } } +// ===== impl Spawner ===== + impl Spawner { /// Spawns a future onto the thread pool pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> @@ -159,177 +261,66 @@ impl Spawner { F::Output: Send + 'static, { let (task, handle) = task::joinable(future); - self.scheduler.schedule(task, true); + self.shared.schedule(task); handle } -} - -// === impl SchedulerPriv === - -impl SchedulerPriv { - fn tick(&self, local: &mut LocalState<impl Park>) { - for _ in 0..MAX_TASKS_PER_TICK { - // Get the current tick - let tick = local.tick; - - // Increment the tick - local.tick = tick.wrapping_add(1); - let next = unsafe { - // safety: this function is safe to call only from the - // thread the basic scheduler is running on. The `LocalState` - // parameter to this method implies that we are on that thread. - self.queues.next_task(tick) - }; - - let task = match next { - Some(task) => task, - None => { - local.park.park().ok().expect("failed to park"); - return; - } - }; - - if let Some(task) = task.run(&mut || Some(self.into())) { - unsafe { - // safety: this function is safe to call only from the - // thread the basic scheduler is running on. The `LocalState` - // parameter to this method implies that we are on that thread. - self.queues.push_local(task); - } - } - } - - local - .park - .park_timeout(Duration::from_millis(0)) - .ok() - .expect("failed to park"); - } - - /// Schedule the provided task on the scheduler. - /// - /// If this scheduler is the `ACTIVE` scheduler, enqueue this task on the local queue, otherwise - /// the task is enqueued on the remote queue. - fn schedule(&self, task: Task<Self>, spawn: bool) { - let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv); - if is_current { - unsafe { - // safety: this function is safe to call only from the - // thread the basic scheduler is running on. If `is_current` is - // then we are on that thread. - self.queues.push_local(task) - }; - } else { - let mut lock = self.queues.remote(); - lock.schedule(task, spawn); - - // while locked, call unpark - self.unpark.unpark(); - - drop(lock); - } + fn pop(&self) -> Option<task::Notified<Arc<Shared>>> { + self.shared.queue.lock().unwrap().pop_front() } } -impl Schedule for SchedulerPriv { - fn bind(&self, task: &Task<Self>) { - unsafe { - // safety: `Queues::add_task` is only safe to call from the thread - // that owns the queues (the thread the scheduler is running on). - // `Scheduler::bind` is called when polling a task that - // doesn't have a scheduler set. We will only poll new tasks from - // the thread that the scheduler is running on. Therefore, this is - // safe to call. - self.queues.add_task(task); - } +impl fmt::Debug for Spawner { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Spawner").finish() } +} - fn release(&self, task: Task<Self>) { - self.queues.release_remote(task); - } +// ===== impl Shared ===== - fn release_local(&self, task: &Task<Self>) { - unsafe { - // safety: `Scheduler::release_local` is only called from the - // thread that the scheduler is running on. The `Schedule` trait's - // contract is that releasing a task from another thread should call - // `release` rather than `release_local`. - self.queues.release_local(task); - } +impl Schedule for Arc<Shared> { + fn bind(task: Task<Self>) -> Arc<Shared> { + CURRENT.with(|maybe_cx| { + let cx = maybe_cx.expect("scheduler context missing"); + cx.tasks.borrow_mut().owned.push_front(task); + cx.shared.clone() + }) } - fn schedule(&self, task: Task<Self>) { - SchedulerPriv::schedule(self, task, false); - } -} + fn release(&self, task: &Task<Self>) -> Option<Task<Self>> { + use std::ptr::NonNull; -impl ScheduleSendOnly for SchedulerPriv {} + CURRENT.with(|maybe_cx| { + let cx = maybe_cx.expect("scheduler context missing"); -impl<P> Drop for BasicScheduler<P> -where - P: Park, -{ - fn drop(&mut self) { - unsafe { - // safety: the `Drop` impl owns the scheduler's queues. these fields - // will only be accessed when running the scheduler, and it can no - // longer be run, since we are in the process of dropping it. - - // Shut down the task queues. - self.scheduler.queues.shutdown(); - } - - // Wait until all tasks have been released. - loop { + // safety: the task is inserted in the list in `bind`. unsafe { - self.scheduler.queues.drain_pending_drop(); - self.scheduler.queues.drain_queues(); - - if !self.scheduler.queues.has_tasks_remaining() { - break; - } - - self.local.park.park().ok().expect("park failed"); + let ptr = NonNull::from(task.header()); + cx.tasks.borrow_mut().owned.remove(ptr) } - } + }) } -} -impl fmt::Debug for SchedulerPriv { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Scheduler") - .field("queues", &self.queues) - .finish() + fn schedule(&self, task: task::Notified<Self>) { + CURRENT.with(|maybe_cx| match maybe_cx { + Some(cx) if Arc::ptr_eq(self, &cx.shared) => { + cx.tasks.borrow_mut().queue.push_back(task); + } + _ => { + self.queue.lock().unwrap().push_back(task); + self.unpark.unpark(); + } + }); } } -unsafe fn sched_clone_waker(ptr: *const ()) -> RawWaker { - let s1 = ManuallyDrop::new(Arc::from_raw(ptr as *const SchedulerPriv)); - - #[allow(clippy::redundant_clone)] - let s2 = s1.clone(); - - RawWaker::new( - &**s2 as *const SchedulerPriv as *const (), - &RawWakerVTable::new(sched_clone_waker, sched_wake, sched_wake_by_ref, sched_drop), - ) -} - -unsafe fn sched_wake(ptr: *const ()) { - let scheduler = Arc::from_raw(ptr as *const SchedulerPriv); - scheduler.unpark.unpark(); -} - -unsafe fn sched_wake_by_ref(ptr: *const ()) { - let scheduler = ManuallyDrop::new(Arc::from_raw(ptr as *const SchedulerPriv)); - scheduler.unpark.unpark(); -} - -unsafe fn sched_drop(ptr: *const ()) { - let _ = Arc::from_raw(ptr as *const SchedulerPriv); -} +impl Wake for Shared { + fn wake(self: Arc<Self>) { + Wake::wake_by_ref(&self) + } -unsafe fn sched_noop(_ptr: *const ()) { - unreachable!(); + /// Wake by reference + fn wake_by_ref(arc_self: &Arc<Self>) { + arc_self.unpark.unpark(); + } } |