summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/basic_scheduler.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/runtime/basic_scheduler.rs')
-rw-r--r--tokio/src/runtime/basic_scheduler.rs365
1 files changed, 365 insertions, 0 deletions
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs
new file mode 100644
index 00000000..a4991c1e
--- /dev/null
+++ b/tokio/src/runtime/basic_scheduler.rs
@@ -0,0 +1,365 @@
+use crate::runtime::park::{Park, Unpark};
+use crate::task::{self, JoinHandle, Schedule, Task};
+
+use std::cell::UnsafeCell;
+use std::collections::VecDeque;
+use std::fmt;
+use std::future::Future;
+use std::mem::ManuallyDrop;
+use std::sync::{Arc, Mutex};
+use std::task::{RawWaker, RawWakerVTable, Waker};
+use std::time::Duration;
+
+/// Executes tasks on the current thread
+#[derive(Debug)]
+pub(crate) struct BasicScheduler<P>
+where
+ P: Park,
+{
+ /// Scheduler component
+ scheduler: Arc<SchedulerPriv>,
+
+ /// Local state
+ local: LocalState<P>,
+}
+
+#[derive(Debug, Clone)]
+pub(crate) struct Spawner {
+ scheduler: Arc<SchedulerPriv>,
+}
+
+/// The scheduler component.
+pub(super) struct SchedulerPriv {
+ /// List of all active tasks spawned onto this executor.
+ ///
+ /// # Safety
+ ///
+ /// Must only be accessed from the primary thread
+ owned_tasks: UnsafeCell<task::OwnedList<Self>>,
+
+ /// Local run queue.
+ ///
+ /// Tasks notified from the current thread are pushed into this queue.
+ ///
+ /// # Safety
+ ///
+ /// References should not be handed out. Only call `push` / `pop` functions.
+ /// Only call from the owning thread.
+ local_queue: UnsafeCell<VecDeque<Task<SchedulerPriv>>>,
+
+ /// Remote run queue.
+ ///
+ /// Tasks notified from another thread are pushed into this queue.
+ remote_queue: Mutex<RemoteQueue>,
+
+ /// Tasks pending drop
+ pending_drop: task::TransferStack<Self>,
+
+ /// Unpark the blocked thread
+ unpark: Box<dyn Unpark>,
+}
+
+unsafe impl Send for SchedulerPriv {}
+unsafe impl Sync for SchedulerPriv {}
+
+/// Local state
+#[derive(Debug)]
+struct LocalState<P> {
+ /// Current tick
+ tick: u8,
+
+ /// Thread park handle
+ park: P,
+}
+
+#[derive(Debug)]
+struct RemoteQueue {
+ /// FIFO list of tasks
+ queue: VecDeque<Task<SchedulerPriv>>,
+
+ /// `true` when a task can be pushed into the queue, false otherwise.
+ open: bool,
+}
+
+/// Max number of tasks to poll per tick.
+const MAX_TASKS_PER_TICK: usize = 61;
+
+/// How often to check the remote queue first
+const CHECK_REMOTE_INTERVAL: u8 = 13;
+
+impl<P> BasicScheduler<P>
+where
+ P: Park,
+{
+ pub(crate) fn new(park: P) -> BasicScheduler<P> {
+ let unpark = park.unpark();
+
+ BasicScheduler {
+ scheduler: Arc::new(SchedulerPriv {
+ owned_tasks: UnsafeCell::new(task::OwnedList::new()),
+ local_queue: UnsafeCell::new(VecDeque::with_capacity(64)),
+ remote_queue: Mutex::new(RemoteQueue {
+ queue: VecDeque::with_capacity(64),
+ open: true,
+ }),
+ pending_drop: task::TransferStack::new(),
+ unpark: Box::new(unpark),
+ }),
+ local: LocalState { tick: 0, park },
+ }
+ }
+
+ pub(crate) fn spawner(&self) -> Spawner {
+ Spawner {
+ scheduler: self.scheduler.clone(),
+ }
+ }
+
+ /// Spawn a future onto the thread pool
+ pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ let (task, handle) = task::joinable(future);
+ self.scheduler.schedule(task);
+ handle
+ }
+
+ pub(crate) fn block_on<F>(&mut self, mut future: F) -> F::Output
+ where
+ F: Future,
+ {
+ use crate::runtime;
+ use std::pin::Pin;
+ use std::task::Context;
+ use std::task::Poll::Ready;
+
+ let local = &mut self.local;
+ let scheduler = &*self.scheduler;
+
+ runtime::global::with_basic_scheduler(scheduler, || {
+ let mut _enter = runtime::enter();
+
+ let raw_waker = RawWaker::new(
+ scheduler as *const SchedulerPriv as *const (),
+ &RawWakerVTable::new(sched_clone_waker, sched_noop, sched_wake_by_ref, sched_noop),
+ );
+
+ let waker = ManuallyDrop::new(unsafe { Waker::from_raw(raw_waker) });
+ let mut cx = Context::from_waker(&waker);
+
+ // `block_on` takes ownership of `f`. Once it is pinned here, the
+ // original `f` binding can no longer be accessed, making the
+ // pinning safe.
+ let mut future = unsafe { Pin::new_unchecked(&mut future) };
+
+ loop {
+ if let Ready(v) = future.as_mut().poll(&mut cx) {
+ return v;
+ }
+
+ scheduler.tick(local);
+
+ // Maintenance work
+ scheduler.drain_pending_drop();
+ }
+ })
+ }
+}
+
+impl Spawner {
+ /// Spawn a future onto the thread pool
+ pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ let (task, handle) = task::joinable(future);
+ self.scheduler.schedule(task);
+ handle
+ }
+}
+
+impl SchedulerPriv {
+ fn tick(&self, local: &mut LocalState<impl Park>) {
+ for _ in 0..MAX_TASKS_PER_TICK {
+ // Get the current tick
+ let tick = local.tick;
+
+ // Increment the tick
+ local.tick = tick.wrapping_add(1);
+
+ let task = match self.next_task(tick) {
+ Some(task) => task,
+ None => {
+ local.park.park().ok().expect("failed to park");
+ return;
+ }
+ };
+
+ if let Some(task) = task.run(&mut || Some(self.into())) {
+ unsafe {
+ self.schedule_local(task);
+ }
+ }
+ }
+
+ local
+ .park
+ .park_timeout(Duration::from_millis(0))
+ .ok()
+ .expect("failed to park");
+ }
+
+ fn drain_pending_drop(&self) {
+ for task in self.pending_drop.drain() {
+ unsafe {
+ (*self.owned_tasks.get()).remove(&task);
+ }
+ drop(task);
+ }
+ }
+
+ /// # Safety
+ ///
+ /// Must be called from the same thread that holds the `BasicScheduler`
+ /// value.
+ pub(super) unsafe fn spawn_background<F>(&self, future: F)
+ where
+ F: Future<Output = ()> + Send + 'static,
+ {
+ let task = task::background(future);
+ self.schedule_local(task);
+ }
+
+ unsafe fn schedule_local(&self, task: Task<Self>) {
+ (*self.local_queue.get()).push_front(task);
+ }
+
+ fn next_task(&self, tick: u8) -> Option<Task<Self>> {
+ if 0 == tick % CHECK_REMOTE_INTERVAL {
+ self.next_remote_task().or_else(|| self.next_local_task())
+ } else {
+ self.next_local_task().or_else(|| self.next_remote_task())
+ }
+ }
+
+ fn next_local_task(&self) -> Option<Task<Self>> {
+ unsafe { (*self.local_queue.get()).pop_front() }
+ }
+
+ fn next_remote_task(&self) -> Option<Task<Self>> {
+ self.remote_queue.lock().unwrap().queue.pop_front()
+ }
+}
+
+impl Schedule for SchedulerPriv {
+ fn bind(&self, task: &Task<Self>) {
+ unsafe {
+ (*self.owned_tasks.get()).insert(task);
+ }
+ }
+
+ fn release(&self, task: Task<Self>) {
+ self.pending_drop.push(task);
+ }
+
+ fn release_local(&self, task: &Task<Self>) {
+ unsafe {
+ (*self.owned_tasks.get()).remove(task);
+ }
+ }
+
+ fn schedule(&self, task: Task<Self>) {
+ use crate::runtime::global;
+
+ if global::basic_scheduler_is_current(self) {
+ unsafe { self.schedule_local(task) };
+ } else {
+ let mut lock = self.remote_queue.lock().unwrap();
+
+ if lock.open {
+ lock.queue.push_back(task);
+ } else {
+ task.shutdown();
+ }
+
+ // while locked, call unpark
+ self.unpark.unpark();
+
+ drop(lock);
+ }
+ }
+}
+
+impl<P> Drop for BasicScheduler<P>
+where
+ P: Park,
+{
+ fn drop(&mut self) {
+ // Close the remote queue
+ let mut lock = self.scheduler.remote_queue.lock().unwrap();
+ lock.open = false;
+
+ while let Some(task) = lock.queue.pop_front() {
+ task.shutdown();
+ }
+
+ drop(lock);
+
+ // Drain all local tasks
+ while let Some(task) = self.scheduler.next_local_task() {
+ task.shutdown();
+ }
+
+ // Release owned tasks
+ unsafe {
+ (*self.scheduler.owned_tasks.get()).shutdown();
+ }
+
+ self.scheduler.drain_pending_drop();
+
+ // Wait until all tasks have been released.
+ while unsafe { !(*self.scheduler.owned_tasks.get()).is_empty() } {
+ self.local.park.park().ok().expect("park failed");
+ self.scheduler.drain_pending_drop();
+ }
+ }
+}
+
+impl fmt::Debug for SchedulerPriv {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Scheduler").finish()
+ }
+}
+
+unsafe fn sched_clone_waker(ptr: *const ()) -> RawWaker {
+ let s1 = ManuallyDrop::new(Arc::from_raw(ptr as *const SchedulerPriv));
+
+ #[allow(clippy::redundant_clone)]
+ let s2 = s1.clone();
+
+ RawWaker::new(
+ &**s2 as *const SchedulerPriv as *const (),
+ &RawWakerVTable::new(sched_clone_waker, sched_wake, sched_wake_by_ref, sched_drop),
+ )
+}
+
+unsafe fn sched_wake(ptr: *const ()) {
+ let scheduler = Arc::from_raw(ptr as *const SchedulerPriv);
+ scheduler.unpark.unpark();
+}
+
+unsafe fn sched_wake_by_ref(ptr: *const ()) {
+ let scheduler = ManuallyDrop::new(Arc::from_raw(ptr as *const SchedulerPriv));
+ scheduler.unpark.unpark();
+}
+
+unsafe fn sched_drop(ptr: *const ()) {
+ let _ = Arc::from_raw(ptr as *const SchedulerPriv);
+}
+
+unsafe fn sched_noop(_ptr: *const ()) {
+ unreachable!();
+}