From d70c928d88dff9e3e8d673b8ee02bce131598550 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 1 Nov 2019 13:18:52 -0700 Subject: runtime: merge multi & single threaded runtimes (#1716) Simplify Tokio's runtime construct by combining both Runtime variants into a single type. The execution style can be controlled by a configuration setting on `Builder`. The implication of this change is that there is no longer any way to spawn `!Send` futures. This, however, is a temporary limitation. A different strategy will be employed for supporting `!Send` futures. Included in this patch is a rework of `task::JoinHandle` to support using this type from both the thread-pool and current-thread executors. --- tokio-macros/src/lib.rs | 21 +- tokio-test/src/lib.rs | 14 +- tokio/Cargo.toml | 2 - tokio/src/executor/blocking/mod.rs | 1 + tokio/src/executor/current_thread/mod.rs | 1003 ++++++------------------ tokio/src/executor/current_thread/scheduler.rs | 808 ------------------- tokio/src/executor/global.rs | 43 +- tokio/src/executor/loom/mod.rs | 6 +- tokio/src/executor/loom/std/causal_cell.rs | 1 + tokio/src/executor/loom/std/mod.rs | 20 +- tokio/src/executor/mod.rs | 8 +- tokio/src/executor/task/core.rs | 29 +- tokio/src/executor/task/error.rs | 22 +- tokio/src/executor/task/harness.rs | 61 +- tokio/src/executor/task/join.rs | 25 +- tokio/src/executor/task/list.rs | 9 +- tokio/src/executor/task/mod.rs | 52 +- tokio/src/executor/task/raw.rs | 40 +- tokio/src/executor/task/stack.rs | 9 +- tokio/src/executor/task/tests/task.rs | 6 +- tokio/src/executor/task/waker.rs | 6 +- tokio/src/executor/tests/mock_schedule.rs | 2 +- tokio/src/executor/thread_pool/join.rs | 42 - tokio/src/executor/thread_pool/mod.rs | 6 - tokio/src/executor/thread_pool/pool.rs | 3 +- tokio/src/executor/thread_pool/queue/global.rs | 16 +- tokio/src/executor/thread_pool/set.rs | 6 +- tokio/src/executor/thread_pool/spawner.rs | 3 +- tokio/src/executor/thread_pool/tests/queue.rs | 2 +- tokio/src/runtime/builder.rs | 382 +++++++++ tokio/src/runtime/current_thread/builder.rs | 91 --- tokio/src/runtime/current_thread/mod.rs | 67 -- tokio/src/runtime/current_thread/runtime.rs | 206 ----- tokio/src/runtime/mod.rs | 204 ++++- tokio/src/runtime/spawner.rs | 77 ++ tokio/src/runtime/threadpool/builder.rs | 288 ------- tokio/src/runtime/threadpool/mod.rs | 202 ----- tokio/src/runtime/threadpool/spawner.rs | 59 -- tokio/src/signal/registry.rs | 8 +- tokio/src/signal/windows.rs | 13 +- tokio/tests/clock.rs | 20 +- tokio/tests/current_thread.rs | 781 ------------------ tokio/tests/process_issue_42.rs | 5 +- tokio/tests/rt_current_thread.rs | 339 ++++++++ tokio/tests/rt_thread_pool.rs | 197 +++++ tokio/tests/runtime_current_thread.rs | 137 ---- tokio/tests/runtime_threaded.rs | 188 ----- tokio/tests/signal_drop_rt.rs | 13 +- tokio/tests/signal_multi_rt.rs | 11 +- tokio/tests/timer_hammer.rs | 13 +- tokio/tests/timer_rt.rs | 7 +- 51 files changed, 1756 insertions(+), 3818 deletions(-) delete mode 100644 tokio/src/executor/current_thread/scheduler.rs delete mode 100644 tokio/src/executor/thread_pool/join.rs create mode 100644 tokio/src/runtime/builder.rs delete mode 100644 tokio/src/runtime/current_thread/builder.rs delete mode 100644 tokio/src/runtime/current_thread/mod.rs delete mode 100644 tokio/src/runtime/current_thread/runtime.rs create mode 100644 tokio/src/runtime/spawner.rs delete mode 100644 tokio/src/runtime/threadpool/builder.rs delete mode 100644 tokio/src/runtime/threadpool/mod.rs delete mode 100644 tokio/src/runtime/threadpool/spawner.rs delete mode 100644 tokio/tests/current_thread.rs create mode 100644 tokio/tests/rt_current_thread.rs create mode 100644 tokio/tests/rt_thread_pool.rs delete mode 100644 tokio/tests/runtime_current_thread.rs delete mode 100644 tokio/tests/runtime_threaded.rs diff --git a/tokio-macros/src/lib.rs b/tokio-macros/src/lib.rs index 00c0cc34..1cfc14c3 100644 --- a/tokio-macros/src/lib.rs +++ b/tokio-macros/src/lib.rs @@ -98,7 +98,7 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { } let result = match runtime { - RuntimeType::Multi => quote! { + RuntimeType::Multi | RuntimeType::Auto => quote! { #(#attrs)* fn #name(#inputs) #ret { tokio::runtime::Runtime::new().unwrap().block_on(async { #body }) @@ -107,14 +107,11 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { RuntimeType::Single => quote! { #(#attrs)* fn #name(#inputs) #ret { - tokio::runtime::current_thread::Runtime::new().unwrap().block_on(async { #body }) - } - }, - RuntimeType::Auto => quote! { - #(#attrs)* - fn #name() #ret { - let mut rt = tokio::runtime::__main::Runtime::new().unwrap(); - rt.block_on(async { #body }) + tokio::runtime::Builder::new() + .current_thread() + .build() + .unwrap() + .block_on(async { #body }) } }, }; @@ -211,7 +208,11 @@ pub fn test(args: TokenStream, item: TokenStream) -> TokenStream { #[test] #(#attrs)* fn #name() #ret { - tokio::runtime::current_thread::Runtime::new().unwrap().block_on(async { #body }) + tokio::runtime::Builder::new() + .current_thread() + .build() + .unwrap() + .block_on(async { #body }) } }, }; diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs index 58499bbb..749112d8 100644 --- a/tokio-test/src/lib.rs +++ b/tokio-test/src/lib.rs @@ -26,15 +26,9 @@ pub mod task; /// /// [runtime-block-on]: https://docs.rs/tokio/0.2.0-alpha.2/tokio/runtime/current_thread/struct.Runtime.html#method.block_on pub fn block_on(future: F) -> F::Output { - let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap(); - rt.block_on(future) -} + use tokio::runtime; -/* -#[doc(hidden)] -pub mod codegen { - pub mod futures { - pub use futures::*; - } + let mut rt = runtime::Builder::new().current_thread().build().unwrap(); + + rt.block_on(future) } -*/ diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 48b27a39..b01d16af 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -47,7 +47,6 @@ net-full = ["tcp", "udp", "uds"] net-driver = ["io-traits", "mio", "blocking", "lazy_static"] rt-current-thread = [ "executor-core", - "crossbeam-channel", "timer", "sync", "net-driver", @@ -95,7 +94,6 @@ futures-util-preview = { version = "=0.3.0-alpha.19", features = ["sink", "chann # Everything else is optional... bytes = { version = "0.4", optional = true } -crossbeam-channel = { version = "0.3.8", optional = true } fnv = { version = "1.0.6", optional = true } iovec = { version = "0.1", optional = true } lazy_static = { version = "1.0.2", optional = true } diff --git a/tokio/src/executor/blocking/mod.rs b/tokio/src/executor/blocking/mod.rs index 2ad573d8..92dc1c36 100644 --- a/tokio/src/executor/blocking/mod.rs +++ b/tokio/src/executor/blocking/mod.rs @@ -221,6 +221,7 @@ impl Pool { } } +#[derive(Debug)] pub(crate) struct PoolWaiter(Arc); impl From for PoolWaiter { diff --git a/tokio/src/executor/current_thread/mod.rs b/tokio/src/executor/current_thread/mod.rs index 62619f2a..8ee4da82 100644 --- a/tokio/src/executor/current_thread/mod.rs +++ b/tokio/src/executor/current_thread/mod.rs @@ -1,873 +1,380 @@ -//! A single-threaded executor which executes tasks on the same thread from which -//! they are spawned. -//! -//! [`CurrentThread`] is the main type of this crate. It executes tasks on the -//! current thread. The easiest way to start a new [`CurrentThread`] executor -//! is to call [`block_on_all`] with an initial task to seed the executor. All -//! tasks that are being managed by a [`CurrentThread`] executor are able to -//! spawn additional tasks by calling [`spawn`]. -//! -//! Application authors will not use this crate directly. Instead, they will use -//! the `tokio` crate. Library authors should only depend on -//! `tokio-current-thread` if they are building a custom task executor. -//! -//! [`CurrentThread`]: struct.CurrentThread.html -//! [`spawn`]: fn.spawn.html -//! [`block_on_all`]: fn.block_on_all.html - -mod scheduler; -use self::scheduler::{Scheduler, TickArgs}; - -#[cfg(feature = "blocking")] -use crate::executor::blocking::{Pool, PoolWaiter}; -use crate::executor::park::{Park, ParkThread, Unpark}; -use crate::executor::{EnterError, Executor, SpawnError, TypedExecutor}; - -use std::cell::Cell; -use std::error::Error; +use crate::executor::park::{Park, Unpark}; +use crate::executor::task::{self, JoinHandle, Schedule, Task}; +use crate::executor::Executor; + +use std::cell::UnsafeCell; +use std::collections::VecDeque; use std::fmt; use std::future::Future; -use std::pin::Pin; -use std::rc::Rc; -use std::sync::{atomic, Arc}; -use std::task::{Context, Poll, Waker}; -use std::thread; -use std::time::{Duration, Instant}; +use std::mem::ManuallyDrop; +use std::sync::{Arc, Mutex}; +use std::task::{RawWaker, RawWakerVTable, Waker}; +use std::time::Duration; /// Executes tasks on the current thread -pub struct CurrentThread { - /// Execute futures and receive unpark notifications. - scheduler: Scheduler, - - /// Current number of futures being executed. - /// - /// The LSB is used to indicate that the runtime is preparing to shut down. - /// Thus, to get the actual number of pending futures, `>>1`. - num_futures: Arc, - - /// Thread park handle - park: P, - - /// Handle for spawning new futures from other threads - spawn_handle: Handle, - - /// Receiver for futures spawned from other threads - spawn_receiver: crossbeam_channel::Receiver + Send + 'static>>>, - - /// Handle to pool for handling blocking tasks - #[cfg(feature = "blocking")] - blocking: PoolWaiter, - - /// The thread-local ID assigned to this executor. - id: u64, -} - -/// Executes futures on the current thread. -/// -/// All futures executed using this executor will be executed on the current -/// thread. As such, `run` will wait for these futures to complete before -/// returning. -/// -/// For more details, see the [module level](index.html) documentation. -#[derive(Debug, Clone)] -pub struct TaskExecutor { - // Prevent the handle from moving across threads. - _p: ::std::marker::PhantomData>, -} - -/// Returned by the `turn` function. #[derive(Debug)] -pub struct Turn { - polled: bool, -} +pub(crate) struct CurrentThread

+where + P: Park, +{ + /// Scheduler component + scheduler: Arc, -impl Turn { - /// `true` if any futures were polled at all and `false` otherwise. - pub fn has_polled(&self) -> bool { - self.polled - } + /// Local state + local: Local

, } -/// A `CurrentThread` instance bound to a supplied execution context. -pub struct Entered<'a, P: Park> { - executor: &'a mut CurrentThread

, +#[derive(Debug, Clone)] +pub(crate) struct Spawner { + scheduler: Arc, } -/// Error returned by the `run` function. -#[derive(Debug)] -pub struct RunError { - _p: (), -} +/// The scheduler component. +pub(super) struct Scheduler { + /// List of all active tasks spawned onto this executor. + /// + /// # Safety + /// + /// Must only be accessed from the primary thread + owned_tasks: UnsafeCell>, -impl fmt::Display for RunError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "Run error") - } -} + /// Local run queue. + /// + /// Tasks notified from the current thread are pushed into this queue. + /// + /// # Safety + /// + /// References should not be handed out. Only call `push` / `pop` functions. + /// Only call from the owning thread. + local_queue: UnsafeCell>>, -impl Error for RunError {} + /// Remote run queue. + /// + /// Tasks notified from another thread are pushed into this queue. + remote_queue: Mutex, -/// Error returned by the `run_timeout` function. -#[derive(Debug)] -pub struct RunTimeoutError { - timeout: bool, -} + /// Tasks pending drop + pending_drop: task::TransferStack, -impl fmt::Display for RunTimeoutError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - let descr = if self.timeout { - "Run timeout error (timeout)" - } else { - "Run timeout error (not timeout)" - }; - write!(fmt, "{}", descr) - } + /// Unpark the blocked thread + unpark: Box, } -impl Error for RunTimeoutError {} +unsafe impl Send for Scheduler {} +unsafe impl Sync for Scheduler {} -/// Error returned by the `turn` function. +/// Local state #[derive(Debug)] -pub struct TurnError { - _p: (), -} +struct Local

{ + /// Current tick + tick: u8, -impl fmt::Display for TurnError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "Turn error") - } + /// Thread park handle + park: P, } -impl Error for TurnError {} - -/// Error returned by the `block_on` function. #[derive(Debug)] -pub struct BlockError { - inner: Option, -} +struct RemoteQueue { + /// FIFO list of tasks + queue: VecDeque>, -impl fmt::Display for BlockError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "Block error") - } + /// `true` when a task can be pushed into the queue, false otherwise. + open: bool, } -impl Error for BlockError {} +/// Max number of tasks to poll per tick. +const MAX_TASKS_PER_TICK: usize = 61; -/// This is mostly split out to make the borrow checker happy. -struct Borrow<'a, U> { - spawner: BorrowSpawner<'a, U>, - #[cfg(feature = "blocking")] - blocking: &'a PoolWaiter, -} - -/// As is this. -struct BorrowSpawner<'a, U> { - id: u64, - num_futures: &'a atomic::AtomicUsize, - scheduler: &'a mut Scheduler, -} - -trait SpawnLocal { - fn spawn_local(&mut self, future: Pin>>, already_counted: bool); -} - -struct CurrentRunner { - spawn: Cell>, - id: Cell>, -} +/// How often to check the remote queue first +const CHECK_REMOTE_INTERVAL: u8 = 13; -thread_local! { - /// Current thread's task runner. This is set in `TaskRunner::with` - static CURRENT: CurrentRunner = CurrentRunner { - spawn: Cell::new(None), - id: Cell::new(None), - } -} - -thread_local! { - /// Unique ID to assign to each new executor launched on this thread. - /// - /// The unique ID is used to determine if the currently running executor matches the one - /// referred to by a `Handle` so that direct task dispatch can be used. - static EXECUTOR_ID: Cell = Cell::new(0) -} - -/// Run the executor bootstrapping the execution with the provided future. -/// -/// This creates a new [`CurrentThread`] executor, spawns the provided future, -/// and blocks the current thread until the provided future and **all** -/// subsequently spawned futures complete. In other words: -/// -/// * If the provided bootstrap future does **not** spawn any additional tasks, -/// `block_on_all` returns once `future` completes. -/// * If the provided bootstrap future **does** spawn additional tasks, then -/// `block_on_all` returns once **all** spawned futures complete. -/// -/// See [module level][mod] documentation for more details. -/// -/// [`CurrentThread`]: struct.CurrentThread.html -/// [mod]: index.html -pub fn block_on_all(future: F) -> F::Output +impl

CurrentThread

where - F: Future, + P: Park, { - let mut current_thread = CurrentThread::new(); - - let ret = current_thread.block_on(future); - current_thread.run().unwrap(); - ret -} - -/// Executes a future on the current thread. -/// -/// The provided future must complete or be canceled before `run` will return. -/// -/// Unlike [`tokio::spawn`], this function will always spawn on a -/// `CurrentThread` executor and is able to spawn futures that are not `Send`. -/// -/// # Panics -/// -/// This function can only be invoked from the context of a `run` call; any -/// other use will result in a panic. -/// -/// [`tokio::spawn`]: ../fn.spawn.html -pub fn spawn(future: F) -where - F: Future + 'static, -{ - TaskExecutor::current() - .spawn_local(Box::pin(future)) - .unwrap(); -} - -// ===== impl CurrentThread ===== - -impl CurrentThread { - /// Create a new instance of `CurrentThread`. - pub fn new() -> Self { - CurrentThread::new_with_park(ParkThread::new()) - } -} - -impl CurrentThread

{ - /// Create a new instance of `CurrentThread` backed by the given park - /// handle. - pub fn new_with_park(park: P) -> Self { + pub(crate) fn new(park: P) -> CurrentThread

{ let unpark = park.unpark(); - let (spawn_sender, spawn_receiver) = crossbeam_channel::unbounded(); - let thread = thread::current().id(); - let id = EXECUTOR_ID.with(|idc| { - let id = idc.get(); - idc.set(id + 1); - id - }); - - let scheduler = Scheduler::new(unpark); - let waker = scheduler.waker(); - - let num_futures = Arc::new(atomic::AtomicUsize::new(0)); - CurrentThread { - scheduler, - num_futures: num_futures.clone(), - park, - id, - spawn_handle: Handle { - sender: spawn_sender, - num_futures, - waker, - thread, - id, - }, - spawn_receiver, - - #[cfg(feature = "blocking")] - blocking: PoolWaiter::from(Pool::default()), + scheduler: Arc::new(Scheduler { + owned_tasks: UnsafeCell::new(task::OwnedList::new()), + local_queue: UnsafeCell::new(VecDeque::with_capacity(64)), + remote_queue: Mutex::new(RemoteQueue { + queue: VecDeque::with_capacity(64), + open: true, + }), + pending_drop: task::TransferStack::new(), + unpark: Box::new(unpark), + }), + local: Local { tick: 0, park }, } } - /// Returns `true` if the executor is currently idle. - /// - /// An idle executor is defined by not currently having any spawned tasks. - /// - /// Note that this method is inherently racy -- if a future is spawned from a remote `Handle`, - /// this method may return `true` even though there are more futures to be executed. - pub fn is_idle(&self) -> bool { - self.num_futures.load(atomic::Ordering::SeqCst) <= 1 + pub(crate) fn spawner(&self) -> Spawner { + Spawner { + scheduler: self.scheduler.clone(), + } } - /// Spawn the future on the executor. - /// - /// This internally queues the future to be executed once `run` is called. - pub fn spawn(&mut self, future: F) -> &mut Self + /// Spawn a future onto the thread pool + pub(crate) fn spawn(&self, future: F) -> JoinHandle where - F: Future + 'static, + F: Future + Send + 'static, + F::Output: Send + 'static, { - self.borrow().spawner.spawn_local(Box::pin(future), false); - self + let (task, handle) = task::joinable(future); + self.scheduler.schedule(task); + handle } - /// Synchronously waits for the provided `future` to complete. - /// - /// This function can be used to synchronously block the current thread - /// until the provided `future` has resolved either successfully or with an - /// error. The result of the future is then returned from this function - /// call. - /// - /// Note that this function will **also** execute any spawned futures on the - /// current thread, but will **not** block until these other spawned futures - /// have completed. - /// - /// The caller is responsible for ensuring that other spawned futures - /// complete execution. - pub fn block_on(&mut self, future: F) -> F::Output + pub(crate) fn block_on(&mut self, mut future: F) -> F::Output where F: Future, { - let _enter = crate::executor::enter().expect("failed to start `current_thread::Runtime`"); - self.enter().block_on(future) - } - - /// Run the executor to completion, blocking the thread until **all** - /// spawned futures have completed. - pub fn run(&mut self) -> Result<(), RunError> { - let _enter = crate::executor::enter().expect("failed to start `current_thread::Runtime`"); - self.enter().run() - } - - /// Run the executor to completion, blocking the thread until all - /// spawned futures have completed **or** `duration` time has elapsed. - pub fn run_timeout(&mut self, duration: Duration) -> Result<(), RunTimeoutError> { - let _enter = crate::executor::enter().expect("failed to start `current_thread::Runtime`"); - self.enter().run_timeout(duration) - } - - /// Perform a single iteration of the event loop. - /// - /// This function blocks the current thread even if the executor is idle. - pub fn turn(&mut self, duration: Option) -> Result { - let _enter = crate::executor::enter().expect("failed to start `current_thread::Runtime`"); - self.enter().turn(duration) - } - - /// Bind `CurrentThread` instance with an execution context. - fn enter(&mut self) -> Entered<'_, P> { - Entered { executor: self } - } + use std::pin::Pin; + use std::task::Context; + use std::task::Poll::Ready; - /// Returns a reference to the underlying `Park` instance. - pub fn get_park(&self) -> &P { - &self.park - } + let local = &mut self.local; + let scheduler = &*self.scheduler; - /// Returns a mutable reference to the underlying `Park` instance. - pub fn get_park_mut(&mut self) -> &mut P { - &mut self.park - } + crate::executor::global::with_current_thread(scheduler, || { + let mut _enter = + crate::executor::enter().expect("attempting to block while on a Tokio executor"); - fn borrow(&mut self) -> Borrow<'_, P::Unpark> { - Borrow { - spawner: BorrowSpawner { - id: self.id, - scheduler: &mut self.scheduler, - num_futures: &*self.num_futures, - }, - #[cfg(feature = "blocking")] - blocking: &self.blocking, - } - } + let raw_waker = RawWaker::new( + scheduler as *const Scheduler as *const (), + &RawWakerVTable::new(sched_clone_waker, sched_noop, sched_wake_by_ref, sched_noop), + ); - /// Get a new handle to spawn futures on the executor - /// - /// Different to the executor itself, the handle can be sent to different - /// threads and can be used to spawn futures on the executor. - pub fn handle(&self) -> Handle { - self.spawn_handle.clone() - } -} + let waker = ManuallyDrop::new(unsafe { Waker::from_raw(raw_waker) }); + let mut cx = Context::from_waker(&waker); -impl Drop for CurrentThread

{ - fn drop(&mut self) { - // Signal to Handles that no more futures can be spawned by setting LSB. - // - // NOTE: this isn't technically necessary since the send on the mpsc will fail once the - // receiver is dropped, but it's useful to illustrate how clean shutdown will be - // implemented (e.g., by setting the LSB). - let pending = self.num_futures.fetch_add(1, atomic::Ordering::SeqCst); - - // TODO: We currently ignore any pending futures at the time we shut down. - // - // The "proper" fix for this is to have an explicit shutdown phase (`shutdown_on_idle`) - // which sets LSB (as above) do make Handle::spawn stop working, and then runs until - // num_futures.load() == 1. - let _ = pending; - - // We will wait for any blocking ops by virtue of dropping `blocking`. - } -} + // `block_on` takes ownership of `f`. Once it is pinned here, the + // original `f` binding can no longer be accessed, making the + // pinning safe. + let mut future = unsafe { Pin::new_unchecked(&mut future) }; -impl Executor for CurrentThread { - fn spawn( - &mut self, - future: Pin + Send>>, - ) -> Result<(), SpawnError> { - self.borrow().spawner.spawn_local(future, false); - Ok(()) - } -} + loop { + if let Ready(v) = future.as_mut().poll(&mut cx) { + return v; + } -impl TypedExecutor for CurrentThread -where - T: Future + 'static, -{ - fn spawn(&mut self, future: T) -> Result<(), SpawnError> { - self.borrow().spawner.spawn_local(Box::pin(future), false); - Ok(()) - } -} + scheduler.tick(local); -impl fmt::Debug for CurrentThread

{ - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("CurrentThread") - .field("scheduler", &self.scheduler) - .field( - "num_futures", - &self.num_futures.load(atomic::Ordering::SeqCst), - ) - .finish() - } -} - -impl Default for CurrentThread

{ - fn default() -> Self { - CurrentThread::new_with_park(P::default()) + // Maintenance work + scheduler.drain_pending_drop(); + } + }) } } -// ===== impl Entered ===== - -impl Entered<'_, P> { - /// Spawn the future on the executor. - /// - /// This internally queues the future to be executed once `run` is called. - pub fn spawn(&mut self, future: F) -> &mut Self +impl Spawner { + /// Spawn a future onto the thread pool + pub(crate) fn spawn(&self, future: F) -> JoinHandle where - F: Future + 'static, + F: Future + Send + 'static, + F::Output: Send + 'static, { - self.executor - .borrow() - .spawner - .spawn_local(Box::pin(future), false); - self + let (task, handle) = task::joinable(future); + self.scheduler.schedule(task); + handle } +} - /// Synchronously waits for the provided `future` to complete. - /// - /// This function can be used to synchronously block the current thread - /// until the provided `future` has resolved either successfully or with an - /// error. The result of the future is then returned from this function - /// call. - /// - /// Note that this function will **also** execute any spawned futures on the - /// current thread, but will **not** block until these other spawned futures - /// have completed. - /// - /// The caller is responsible for ensuring that other spawned futures - /// complete execution. - /// - /// # Panics - /// - /// This function will panic if the `Park` call returns an error. - pub fn block_on(&mut self, mut future: F) -> F::Output - where - F: Future, - { - // Safety: we shadow the original `future`, so it will never move - // again. - let mut future = unsafe { Pin::new_unchecked(&mut future) }; - let waker = self.executor.scheduler.waker(); - let mut cx = Context::from_waker(&waker); - - loop { - let res = self - .executor - .borrow() - .enter(|| future.as_mut().poll(&mut cx)); - - match res { - Poll::Ready(e) => return e, - Poll::Pending => {} - } +impl Scheduler { + fn tick(&self, local: &mut Local) { + for _ in 0..MAX_TASKS_PER_TICK { + // Get the current tick + let tick = local.tick; - self.tick(); + // Increment the tick + local.tick = tick.wrapping_add(1); - if self.executor.park.park().is_err() { - panic!("block_on park failed"); + let task = match self.next_task(tick) { + Some(task) => task, + None => { + local.park.park().ok().expect("failed to park"); + return; + } + }; + + if let Some(task) = task.run(&mut || Some(self.into())) { + unsafe { + self.schedule_local(task); + } } } - } - - /// Run the executor to completion, blocking the thread until **all** - /// spawned futures have completed. - pub fn run(&mut self) -> Result<(), RunError> { - self.run_timeout2(None).map_err(|_| RunError { _p: () }) - } - /// Run the executor to completion, blocking the thread until all - /// spawned futures have completed **or** `duration` time has elapsed. - pub fn run_timeout(&mut self, duration: Duration) -> Result<(), RunTimeoutError> { - self.run_timeout2(Some(duration)) + local + .park + .park_timeout(Duration::from_millis(0)) + .ok() + .expect("failed to park"); } - /// Perform a single iteration of the event loop. - /// - /// This function blocks the current thread even if the executor is idle. - pub fn turn(&mut self, duration: Option) -> Result { - let res = if self.executor.scheduler.has_pending_futures() { - self.executor.park.park_timeout(Duration::from_millis(0)) - } else { - match duration { - Some(duration) => self.executor.park.park_timeout(duration), - None => self.executor.park.park(), + fn drain_pending_drop(&self) { + for task in self.pending_drop.drain() { + unsafe { + (*self.owned_tasks.get()).remove(&task); } - }; - - if res.is_err() { - return Err(TurnError { _p: () }); + drop(task); } - - let polled = self.tick(); - - Ok(Turn { polled }) - } - - /// Returns a reference to the underlying `Park` instance. - pub fn get_park(&self) -> &P { - &self.executor.park } - /// Returns a mutable reference to the underlying `Park` instance. - pub fn get_park_mut(&mut self) -> &mut P { - &mut self.executor.park + /// # Safety + /// + /// Must be called from the same thread that holds the `CurrentThread` + /// value. + pub(super) unsafe fn spawn_background(&self, future: F) + where + F: Future + Send + 'static, + { + let task = task::background(future); + self.schedule_local(task); } - fn run_timeout2(&mut self, dur: Option) -> Result<(), RunTimeoutError> { - if self.executor.is_idle() { - // Nothing to do - return Ok(()); - } - - let mut time = dur.map(|dur| (Instant::now() + dur, dur)); - - loop { - self.tick(); - - if self.executor.is_idle() { - return Ok(()); - } - - match time { - Some((until, rem)) => { - if self.executor.park.park_timeout(rem).is_err() { - return Err(RunTimeoutError::new(false)); - } - - let now = Instant::now(); - - if now >= until { - return Err(RunTimeoutError::new(true)); - } - - time = Some((until, until - now)); - } - None => { - if self.executor.park.park().is_err() { - return Err(RunTimeoutError::new(false)); - } - } - } - } + unsafe fn schedule_local(&self, task: Task) { + (*self.local_queue.get()).push_front(task); } - /// Returns `true` if any futures were processed - fn tick(&mut self) -> bool { - // Spawn any futures that were spawned from other threads by manually - // looping over the receiver stream - - // FIXME: Slightly ugly but needed to make the borrow checker happy - let (mut borrow, spawn_receiver) = ( - Borrow { - spawner: BorrowSpawner { - id: self.executor.id, - scheduler: &mut self.executor.scheduler, - num_futures: &*self.executor.num_futures, - }, - #[cfg(feature = "blocking")] - blocking: &self.executor.blocking, - }, - &mut self.executor.spawn_receiver, - ); - - while let Ok(future) = spawn_receiver.try_recv() { - borrow.spawner.spawn_local(future, true); + fn next_task(&self, tick: u8) -> Option> { + if 0 == tick % CHECK_REMOTE_INTERVAL { + self.next_remote_task().or_else(|| self.next_local_task()) + } else { + self.next_local_task().or_else(|| self.next_remote_task()) } - - // After any pending futures were scheduled, do the actual tick - borrow.spawner.scheduler.tick(TickArgs { - id: borrow.spawner.id, - num_futures: borrow.spawner.num_futures, - #[cfg(feature = "blocking")] - blocking: borrow.blocking, - }) } -} -impl fmt::Debug for Entered<'_, P> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Entered") - .field("executor", &self.executor) - .finish() + fn next_local_task(&self) -> Option> { + unsafe { (*self.local_queue.get()).pop_front() } } -} - -// ===== impl Handle ===== - -/// Handle to spawn a future on the corresponding `CurrentThread` instance -#[derive(Clone)] -pub struct Handle { - sender: crossbeam_channel::Sender + Send + 'static>>>, - num_futures: Arc, - /// Waker to the Scheduler - waker: Waker, - thread: thread::ThreadId, - /// The thread-local ID assigned to this Handle's executor. - id: u64, -} - -// Manual implementation because the Sender does not implement Debug -impl fmt::Debug for Handle { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Handle") - .field("shut_down", &self.is_shut_down()) - .finish() + fn next_remote_task(&self) -> Option> { + self.remote_queue.lock().unwrap().queue.pop_front() } } -impl Handle { - /// Spawn a future onto the `CurrentThread` instance corresponding to this handle - /// - /// # Panics - /// - /// This function panics if the spawn fails. Failure occurs if the `CurrentThread` - /// instance of the `Handle` does not exist anymore. - pub fn spawn(&self, future: F) -> Result<(), SpawnError> - where - F: Future + Send + 'static, - { - if thread::current().id() == self.thread { - let mut e = TaskExecutor::current(); - if e.id() == Some(self.id) { - return e.spawn_local(Box::pin(future)); - } - } - - // NOTE: += 2 since LSB is the shutdown bit - let pending = self.num_futures.fetch_add(2, atomic::Ordering::SeqCst); - if pending % 2 == 1 { - // Bring the count back so we still know when the Runtime is idle. - self.num_futures.fetch_sub(2, atomic::Ordering::SeqCst); - - return Err(SpawnError::shutdown()); +impl Schedule for Scheduler { + fn bind(&self, task: &Task) { + unsafe { + (*self.owned_tasks.get()).insert(task); } + } - self.sender - .send(Box::pin(future)) - .expect("CurrentThread does not exist anymore"); - self.waker.wake_by_ref(); - Ok(()) + fn release(&self, task: Task) { + self.pending_drop.push(task); } - /// Provides a best effort **hint** to whether or not `spawn` will succeed. - /// - /// This function may return both false positives **and** false negatives. - /// If `status` returns `Ok`, then a call to `spawn` will *probably* - /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will - /// *probably* fail, but may succeed. - /// - /// This allows a caller to avoid creating the task if the call to `spawn` - /// has a high likelihood of failing. - pub fn status(&self) -> Result<(), SpawnError> { - if self.is_shut_down() { - return Err(SpawnError::shutdown()); + fn release_local(&self, task: &Task) { + unsafe { + (*self.owned_tasks.get()).remove(task); } - - Ok(()) } - fn is_shut_down(&self) -> bool { - // LSB of "num_futures" is the shutdown bit - let num_futures = self.num_futures.load(atomic::Ordering::SeqCst); - num_futures % 2 == 1 - } -} + fn schedule(&self, task: Task) { + use crate::executor::global; -// ===== impl TaskExecutor ===== + if global::current_thread_is_current(self) { + unsafe { self.schedule_local(task) }; + } else { + let mut lock = self.remote_queue.lock().unwrap(); -impl TaskExecutor { - /// Returns an executor that executes futures on the current thread. - /// - /// The user of `TaskExecutor` must ensure that when a future is submitted, - /// that it is done within the context of a call to `run`. - /// - /// For more details, see the [module level](index.html) documentation. - pub fn current() -> TaskExecutor { - TaskExecutor { - _p: ::std::marker::PhantomData, - } - } + if lock.open { + lock.queue.push_back(task); + } else { + task.shutdown(); + } - /// Get the current executor's thread-local ID. - fn id(&self) -> Option { - CURRENT.with(|current| current.id.get()) - } + // while locked, call unpark + self.unpark.unpark(); - /// Spawn a future onto the current `CurrentThread` instance. - pub fn spawn_local( - &mut self, - future: Pin>>, - ) -> Result<(), SpawnError> { - CURRENT.with(|current| match current.spawn.get() { - Some(spawn) => { - unsafe { (*spawn).spawn_local(future, false) }; - Ok(()) - } - None => Err(SpawnError::shutdown()), - }) + drop(lock); + } } } -impl Executor for TaskExecutor { +impl Executor for &Scheduler { fn spawn( &mut self, - future: Pin + Send>>, - ) -> Result<(), SpawnError> { - self.spawn_local(future) + future: std::pin::Pin + Send>>, + ) -> Result<(), crate::executor::SpawnError> { + // Safety: This implementation should only be called by `global.rs` from + // the thread local. + // + // TODO: Delete this implementation. + unsafe { + Scheduler::spawn_background(self, future); + } + Ok(()) } } -impl TypedExecutor for TaskExecutor +impl

Drop for CurrentThread

where - F: Future + 'static, + P: Park, { - fn spawn(&mut self, future: F) -> Result<(), SpawnError> { - self.spawn_local(Box::pin(future)) - } -} - -// ===== impl Borrow ===== - -impl Borrow<'_, U> { - fn enter(&mut self, f: F) -> R - where - F: FnOnce() -> R, - { - CURRENT.with(|current| { - current.id.set(Some(self.spawner.id)); - - let Borrow { - ref mut spawner, - #[cfg(all(feature = "blocking", not(loom)))] - ref blocking, - .. - } = self; - - current.set_spawn(spawner, || { - #[cfg(all(feature = "blocking", not(loom)))] - let res = crate::executor::blocking::with_pool(blocking, || f()); - #[cfg(any(not(feature = "blocking"), loom))] - let res = f(); - res - }) - }) - } -} + fn drop(&mut self) { + // Close the remote queue + let mut lock = self.scheduler.remote_queue.lock().unwrap(); + lock.open = false; -impl SpawnLocal for BorrowSpawner<'_, U> { - fn spawn_local(&mut self, future: Pin>>, already_counted: bool) { - if !already_counted { - // NOTE: we have a borrow of the Runtime, so we know that it isn't shut down. - // NOTE: += 2 since LSB is the shutdown bit - self.num_futures.fetch_add(2, atomic::Ordering::SeqCst); + while let Some(task) = lock.queue.pop_front() { + task.shutdown(); } - self.scheduler.schedule(future); - } -} -// ===== impl CurrentRunner ===== + drop(lock); -impl CurrentRunner { - fn set_spawn(&self, spawn: &mut dyn SpawnLocal, f: F) -> R - where - F: FnOnce() -> R, - { - struct Reset<'a>(&'a CurrentRunner); - - impl Drop for Reset<'_> { - fn drop(&mut self) { - self.0.spawn.set(None); - self.0.id.set(None); - } + // Drain all local tasks + while let Some(task) = self.scheduler.next_local_task() { + task.shutdown(); } - let _reset = Reset(self); + // Release owned tasks + unsafe { + (*self.scheduler.owned_tasks.get()).shutdown(); + } - let spawn = unsafe { hide_lt(spawn as *mut dyn SpawnLocal) }; - self.spawn.set(Some(spawn)); + self.scheduler.drain_pending_drop(); - f() + // Wait until all tasks have been released. + while unsafe { !(*self.scheduler.owned_tasks.get()).is_empty() } { + self.local.park.park().ok().expect("park failed"); + self.scheduler.drain_pending_drop(); + } } } -unsafe fn hide_lt<'a>(p: *mut (dyn SpawnLocal + 'a)) -> *mut (dyn SpawnLocal + 'static) { - use std::mem; - // false positive: https://github.com/rust-lang/rust-clippy/issues/2906 - #[allow(clippy::transmute_ptr_to_ptr)] - mem::transmute(p) +impl fmt::Debug for Scheduler { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Scheduler").finish() + } } -// ===== impl RunTimeoutError ===== +unsafe fn sched_clone_waker(ptr: *const ()) -> RawWaker { + let s1 = ManuallyDrop::new(Arc::from_raw(ptr as *const Scheduler)); + let s2 = s1.clone(); -impl RunTimeoutError { - fn new(timeout: bool) -> Self { - RunTimeoutError { timeout } - } - - /// Returns `true` if the error was caused by the operation timing out. - pub fn is_timeout(&self) -> bool { - self.timeout - } + RawWaker::new( + &**s2 as *const Scheduler as *const (), + &RawWakerVTable::new(sched_clone_waker, sched_wake, sched_wake_by_ref, sched_drop), + ) } -impl From for RunTimeoutError { - fn from(_: EnterError) -> Self { - RunTimeoutError::new(false) - } +unsafe fn sched_wake(ptr: *const ()) { + let scheduler = Arc::from_raw(ptr as *const Scheduler); + scheduler.unpark.unpark(); } -// ===== impl BlockError ===== +unsafe fn sched_wake_by_ref(ptr: *const ()) { + let scheduler = ManuallyDrop::new(Arc::from_raw(ptr as *const Scheduler)); + scheduler.unpark.unpark(); +} -impl BlockError { - /// Returns the error yielded by the future being blocked on - pub fn into_inner(self) -> Option { - self.inner - } +unsafe fn sched_drop(ptr: *const ()) { + let _ = Arc::from_raw(ptr as *const Scheduler); } -impl From for BlockError { - fn from(_: EnterError) -> Self { - BlockError { inner: None } - } +unsafe fn sched_noop(_ptr: *const ()) { + unreachable!(); } diff --git a/tokio/src/executor/current_thread/scheduler.rs b/tokio/src/executor/current_thread/scheduler.rs deleted file mode 100644 index ba14ee88..00000000 --- a/tokio/src/executor/current_thread/scheduler.rs +++ /dev/null @@ -1,808 +0,0 @@ -use crate::executor::current_thread::{Borrow, BorrowSpawner}; -use crate::executor::park::Unpark; - -use std::cell::UnsafeCell; -use std::fmt::{self, Debug}; -use std::future::Future; -use std::mem; -use std::pin::Pin; -use std::ptr; -use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst}; -use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize}; -use std::sync::{Arc, Weak}; -use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; -use std::thread; -use std::usize; - -/// A generic task-aware scheduler. -/// -/// This is used both by `FuturesUnordered` and the current-thread executor. -pub(crate) struct Scheduler { - inner: Arc>, - nodes: List, -} - -// A linked-list of nodes -struct List { - len: usize, - head: *const Node, - tail: *const Node, -} - -// Scheduler is implemented using two linked lists. The first linked list tracks -// all items managed by a `Scheduler`. This list is stored on the `Scheduler` -// struct and is **not** thread safe. The second linked list is an -// implementation of the intrusive MPSC queue algorithm described by -// 1024cores.net and is stored on `Inner`. This linked list can push items to -// the back concurrently but only one consumer may pop from the front. To -// enforce this requirement, all popping will be performed via fns on -// `Scheduler` that take `&mut self`. -// -// When a item is submitted to the set a node is allocated and inserted in -// both linked lists. This means that all insertion operations **must** be -// originated from `Scheduler` with `&mut self` The next call to `tick` will -// (eventually) see this node and call `poll` on the item. -// -// Nodes are wrapped in `Arc` cells which manage the lifetime of the node. -// However, `Arc` handles are sometimes cast to `*const Node` pointers. -// Specifically, when a node is stored in at least one of the two lists -// described above, this represents a logical `Arc` handle. This is how -// `Scheduler` maintains its reference to all nodes it manages. Each -// `NotifyHandle` instance is an `Arc` as well. -// -// When `Scheduler` drops, it clears the linked list of all nodes that it -// manages. When doing so, it must attempt to decrement the reference count (by -// dropping an Arc handle). However, it can **only** decrement the reference -// count if the node is not currently stored in the mpsc channel. If the node -// **is** "queued" in the mpsc channel, then the arc reference count cannot be -// decremented. Once the node is popped from the mpsc channel, then the final -// arc reference count can be decremented, thus freeing the node. - -struct Inner { - // Thread unpark handle - unpark: U, - - // Tick number - tick_num: AtomicUsize, - - // Head/tail of the readiness queue - head_readiness: AtomicPtr>, - tail_readiness: UnsafeCell<*const Node>, - - // Used as part of the mpsc queue algorithm - stub: Arc>, -} - -unsafe impl Send for Inner {} -unsafe impl Sync for Inner {} - -struct Node { - // The item - item: UnsafeCell>, - - // The tick at which this node was notified - notified_at: AtomicUsize, - - // Next pointer for linked list tracking all active nodes - next_all: UnsafeCell<*const Node>, - - // Previous node in linked list tracking all active nodes - prev_all: UnsafeCell<*const Node>, - - // Next pointer in readiness queue - next_readiness: AtomicPtr>, - - // Whether or not this node is currently in the mpsc queue. - queued: AtomicBool, - - // Queue that we'll be enqueued to when notified - queue: Weak>, -} - -/// Returned by `Inner::dequeue`, representing either a dequeue success (with -/// the dequeued node), an empty list, or an inconsistent state. -/// -/// The inconsistent state is described in more detail at [1024cores], but -/// roughly indicates that a node will be ready to dequeue sometime shortly in -/// the future and the caller should try again soon. -/// -/// [1024cores]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue -enum Dequeue { - Data(*const Node), - Empty, - Yield, - Inconsistent, -} - -/// Wraps a spawned boxed future -struct Task(Pin>>); - -/// A task that is scheduled. `turn` must be called -pub(crate) struct Scheduled<'a, U> { - task: &'a mut Task, - node: &'a Arc>, - done: &'a mut bool, -} - -pub(super) struct TickArgs<'a> { - pub(super) id: u64, - pub(super) num_futures: &'a AtomicUsize, - #[cfg(feature = "blocking")] - pub(super) blocking: &'a crate::executor::blocking::PoolWaiter, -} - -impl Scheduler -where - U: Unpark, -{ - /// Constructs a new, empty `Scheduler` - /// - /// The returned `Scheduler` does not contain any items and, in this - /// state, `Scheduler::poll` will return `Ok(Async::Ready(None))`. - pub(crate) fn new(unpark: U) -> Self { - let stub = Arc::new(Node { - item: UnsafeCell::new(None), - notified_at: AtomicUsize::new(0), - next_all: UnsafeCell::new(ptr::null()), - prev_all: UnsafeCell::new(ptr::null()), - next_readiness: AtomicPtr::new(ptr::null_mut()), - queued: AtomicBool::new(true), - queue: Weak::new(), - }); - let stub_ptr = &*stub as *const Node; - let inner = Arc::new(Inner { - unpark, - tick_num: AtomicUsize::new(0), - head_readiness: AtomicPtr::new(stub_ptr as *mut _), - tail_readiness: UnsafeCell::new(stub_ptr), - stub, - }); - - Scheduler { - inner, - nodes: List::new(), - } - } - - pub(crate) fn waker(&self) -> Waker { - waker_inner(self.inner.clone()) - } - - pub(crate) fn schedule(&mut self, item: Pin>>) { - // Get the current scheduler tick - let tick_num = self.inner.tick_num.load(SeqCst); - - let node = Arc::new(Node { - item: UnsafeCell::new(Some(Task::new(item))), - notified_at: AtomicUsize::new(tick_num), - next_all: UnsafeCell::new(ptr::null_mut()), - prev_all: UnsafeCell::new(ptr::null_mut()), - next_readiness: AtomicPtr::new(ptr::null_mut()), - queued: AtomicBool::new(true), - queue: Arc::downgrade(&self.inner), - }); - - // Right now our node has a strong reference count of 1. We transfer - // ownership of this reference count to our internal linked list - // and we'll reclaim ownership through the `unlink` function below. - let ptr = self.nodes.push_back(node); - - // We'll need to get the item "into the system" to start tracking it, - // e.g. getting its unpark notifications going to us tracking which - // items are ready. To do that we unconditionally enqueue it for - // polling here. - self.inner.enqueue(ptr); - } - - /// Returns `true` if there are currently any pending futures - pub(crate) fn has_pending_futures(&mut self) -> bool { - // See function definition for why the unsafe is needed and - // correctly used here - unsafe { self.inner.has_pending_futures() } - } - - /// Advance the scheduler state, returning `true` if any futures were - /// processed. - /// - /// This function should be called whenever the caller is notified via a - /// wakeup. - pub(super) fn tick(&mut self, args: TickArgs<'_>) -> bool { - let mut ret = false; - let tick = self.inner.tick_num.fetch_add(1, SeqCst).wrapping_add(1); - - loop { - let node = match unsafe { self.inner.dequeue(Some(tick)) } { - Dequeue::Empty => { - return ret; - } - Dequeue::Yield => { - self.inner.unpark.unpark(); - return ret; - } - Dequeue::Inconsistent => { - thread::yield_now(); - continue; - } - Dequeue::Data(node) => node, - }; - - ret = true; - - debug_assert!(node != self.inner.stub()); - - unsafe { - if (*(*node).item.get()).is_none() { - // The node has already been released. However, while it was - // being released, another thread notified it, which - // resulted in it getting pushed into the mpsc channel. - // - // In this case, we just decrement the ref count. - let node = ptr2arc(node); - assert!((*node.next_all.get()).is_null()); - assert!((*node.prev_all.get()).is_null()); - continue; - }; - - // We're going to need to be very careful if the `poll` - // function below panics. We need to (a) not leak memory and - // (b) ensure that we still don't have any use-after-frees. To - // manage this we do a few things: - // - // * This "bomb" here will call `release_node` if dropped - // abnormally. That way we'll be sure the memory management - // of the `node` is managed correctly. - // - // * We unlink the node from our internal queue to preemptively - // assume is is complete (will return Ready or panic), in - // which case we'll want to discard it regardless. - // - struct Bomb<'a, U: Unpark> { - borrow: &'a mut Borrow<'a, U>, - node: Option>>, - } - - impl Drop for Bomb<'_, U> { - fn drop(&mut self) { - if let Some(node) = self.node.take() { - self.borrow.enter(|| release_node(node)) - } - } - } - - let node = self.nodes.remove(node); - - let mut borrow = Borrow { - spawner: BorrowSpawner { - id: args.id, - scheduler: self, - num_futures: args.num_futures, - }, - #[cfg(feature = "blocking")] - blocking: args.blocking, - }; - - let mut bomb = Bomb { - node: Some(node), - borrow: &mut borrow, - }; - - let mut done = false; - - // Now that the bomb holds the node, create a new scope. This - // scope ensures that the borrow will go out of scope before we - // mutate the node pointer in `bomb` again - { - let node = bomb.node.as_ref().unwrap(); - - // Get a reference to the inner future. We already ensured - // that the item `is_some`. - let item = (*node.item.get()).as_mut().unwrap(); - - // Unset queued flag... this must be done before - // polling. This ensures that the item gets - // rescheduled if it is notified **during** a call - // to `poll`. - let prev = (*node).queued.swap(false, SeqCst); - assert!(prev); - - // Poll the underlying item with the appropriate `notify` - // implementation. This is where a large bit of the unsafety - // starts to stem from internally. The `notify` instance itself - // is basically just our `Arc` and tracks the mpsc - // queue of ready items. - // - // Critically though `Node` won't actually access `Task`, the - // item, while it's floating around inside of `Task` - // instances. These structs will basically just use `T` to size - // the internal allocation, appropriately accessing fields and - // deallocating the node if need be. - let borrow = &mut *bomb.borrow; - - let mut scheduled = Scheduled { - task: item, - node: bomb.node.as_ref().unwrap(), - done: &mut done, - }; - - if borrow.enter(|| scheduled.tick()) { - // we have a borrow of the Runtime, so we know it's not shut down - borrow.spawner.num_futures.fetch_sub(2, SeqCst); - } - } - - if !done { - // The future is not done, push it back into the "all - // node" list. - let node = bomb.node.take().unwrap(); - bomb.borrow.spawner.scheduler.nodes.push_back(node); - } - } - } - } -} - -impl Scheduled<'_, U> { - /// Polls the task, returns `true` if the task has completed. - pub(crate) fn tick(&mut self) -> bool { - let waker = unsafe { - // Safety: we don't hold this waker ref longer than - // this `tick` function - waker_ref(self.node) - }; - let mut cx = Context::from_waker(&waker); - let ret = match self.task.0.as_mut().poll(&mut cx) { - Poll::Ready(()) => true, - Poll::Pending => false, - }; - - *self.done = ret; - ret - } -} - -impl Task { - pub(crate) fn new(future: Pin + 'static>>) -> Self { - Task(future) - } -} - -impl fmt::Debug for Task { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Task").finish() - } -} - -fn release_node(node: Arc>) { - // The item is done, try to reset the queued flag. This will prevent - // `notify` from doing any work in the item - let prev = node.queued.swap(true, SeqCst); - - // Drop the item, even if it hasn't finished yet. This is safe - // because we're dropping the item on the thread that owns - // `Scheduler`, which correctly tracks T's lifetimes and such. - unsafe { - drop((*node.item.get()).take()); - } - - // If the queued flag was previously set then it means that this node - // is still in our internal mpsc queue. We then transfer ownership - // of our reference count to the mpsc queue, and it'll come along and - // free it later, noticing that the item is `None`. - // - // If, however, the queued flag was *not* set then we're safe to - // release our reference count on the internal node. The queued flag - // was set above so all item `enqueue` operations will not actually - // enqueue the node, so our node will never see the mpsc queue again. - // The node itself will be deallocated once all reference counts have - // been dropped by the various owning tasks elsewhere. - if prev { - mem::forget(node); - } -} - -impl Debug for Scheduler { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "Scheduler {{ ... }}") - } -} - -impl Drop for Scheduler { - fn drop(&mut self) { - // When a `Scheduler` is dropped we want to drop all items associated - // with it. At the same time though there may be tons of `Task` handles - // flying around which contain `Node` references inside them. We'll - // let those naturally get deallocated when the `Task` itself goes out - // of scope or gets notified. - while let Some(node) = self.nodes.pop_front() { - release_node(node); - } - - // Note that at this point we could still have a bunch of nodes in the - // mpsc queue. None of those nodes, however, have items associated - // with them so they're safe to destroy on any thread. At this point - // the `Scheduler` struct, the owner of the one strong reference - // to `Inner` will drop the strong reference. At that point - // whichever thread releases the strong refcount last (be it this - // thread or some other thread as part of an `upgrade`) will clear out - // the mpsc queue and free all remaining nodes. - // - // While that freeing operation isn't guaranteed to happen here, it's - // guaranteed to happen "promptly" as no more "blocking work" will - // happen while there's a strong refcount held. - } -} - -impl Inner { - /// The enqueue function from the 1024cores intrusive MPSC queue algorithm. - fn enqueue(&self, node: *const Node) { - unsafe { - debug_assert!((*node).queued.load(Relaxed)); - - // This action does not require any coordination - (*node).next_readiness.store(ptr::null_mut(), Relaxed); - - // Note that these atomic orderings come from 1024cores - let node = node as *mut _; - let prev = self.head_readiness.swap(node, AcqRel); - (*prev).next_readiness.store(node, Release); - } - } - - /// Returns `true` if there are currently any pending futures - /// - /// See `dequeue` for an explanation why this function is unsafe. - unsafe fn has_pending_futures(&self) -> bool { - let tail = *self.tail_readiness.get(); - let next = (*tail).next_readiness.load(Acquire); - - if tail == self.stub() && next.is_null() { - return false; - } - - true - } - - /// The dequeue function from the 1024cores intrusive MPSC queue algorithm - /// - /// Note that this unsafe as it required mutual exclusion (only one thread - /// can call this) to be guaranteed elsewhere. - unsafe fn dequeue(&self, tick: Option) -> Dequeue { - let mut tail = *self.tail_readiness.get(); - let mut next = (*tail).next_readiness.load(Acquire); - - if tail == self.stub() { - if next.is_null() { - return Dequeue::Empty; - } - - *self.tail_readiness.get() = next; - tail = next; - next = (*next).next_readiness.load(Acquire); - } - - if let Some(tick) = tick { - let actual = (*tail).notified_at.load(SeqCst); - - // Only dequeue if the node was not scheduled during the current - // tick. - if actual == tick { - // Only doing the check above **should** be