diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-01 13:18:52 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-01 13:18:52 -0700 |
commit | d70c928d88dff9e3e8d673b8ee02bce131598550 (patch) | |
tree | 6b079db2f80bd61764203a32ffe48769d18c1386 | |
parent | 742d89b0f333150f6a550ae7840235851f4eb069 (diff) |
runtime: merge multi & single threaded runtimes (#1716)
Simplify Tokio's runtime construct by combining both Runtime variants
into a single type. The execution style can be controlled by a
configuration setting on `Builder`.
The implication of this change is that there is no longer any way to
spawn `!Send` futures. This, however, is a temporary limitation. A
different strategy will be employed for supporting `!Send` futures.
Included in this patch is a rework of `task::JoinHandle` to support
using this type from both the thread-pool and current-thread executors.
49 files changed, 1523 insertions, 3585 deletions
diff --git a/tokio-macros/src/lib.rs b/tokio-macros/src/lib.rs index 00c0cc34..1cfc14c3 100644 --- a/tokio-macros/src/lib.rs +++ b/tokio-macros/src/lib.rs @@ -98,7 +98,7 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { } let result = match runtime { - RuntimeType::Multi => quote! { + RuntimeType::Multi | RuntimeType::Auto => quote! { #(#attrs)* fn #name(#inputs) #ret { tokio::runtime::Runtime::new().unwrap().block_on(async { #body }) @@ -107,14 +107,11 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { RuntimeType::Single => quote! { #(#attrs)* fn #name(#inputs) #ret { - tokio::runtime::current_thread::Runtime::new().unwrap().block_on(async { #body }) - } - }, - RuntimeType::Auto => quote! { - #(#attrs)* - fn #name() #ret { - let mut rt = tokio::runtime::__main::Runtime::new().unwrap(); - rt.block_on(async { #body }) + tokio::runtime::Builder::new() + .current_thread() + .build() + .unwrap() + .block_on(async { #body }) } }, }; @@ -211,7 +208,11 @@ pub fn test(args: TokenStream, item: TokenStream) -> TokenStream { #[test] #(#attrs)* fn #name() #ret { - tokio::runtime::current_thread::Runtime::new().unwrap().block_on(async { #body }) + tokio::runtime::Builder::new() + .current_thread() + .build() + .unwrap() + .block_on(async { #body }) } }, }; diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs index 58499bbb..749112d8 100644 --- a/tokio-test/src/lib.rs +++ b/tokio-test/src/lib.rs @@ -26,15 +26,9 @@ pub mod task; /// /// [runtime-block-on]: https://docs.rs/tokio/0.2.0-alpha.2/tokio/runtime/current_thread/struct.Runtime.html#method.block_on pub fn block_on<F: std::future::Future>(future: F) -> F::Output { - let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap(); - rt.block_on(future) -} + use tokio::runtime; -/* -#[doc(hidden)] -pub mod codegen { - pub mod futures { - pub use futures::*; - } + let mut rt = runtime::Builder::new().current_thread().build().unwrap(); + + rt.block_on(future) } -*/ diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 48b27a39..b01d16af 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -47,7 +47,6 @@ net-full = ["tcp", "udp", "uds"] net-driver = ["io-traits", "mio", "blocking", "lazy_static"] rt-current-thread = [ "executor-core", - "crossbeam-channel", "timer", "sync", "net-driver", @@ -95,7 +94,6 @@ futures-util-preview = { version = "=0.3.0-alpha.19", features = ["sink", "chann # Everything else is optional... bytes = { version = "0.4", optional = true } -crossbeam-channel = { version = "0.3.8", optional = true } fnv = { version = "1.0.6", optional = true } iovec = { version = "0.1", optional = true } lazy_static = { version = "1.0.2", optional = true } diff --git a/tokio/src/executor/blocking/mod.rs b/tokio/src/executor/blocking/mod.rs index 2ad573d8..92dc1c36 100644 --- a/tokio/src/executor/blocking/mod.rs +++ b/tokio/src/executor/blocking/mod.rs @@ -221,6 +221,7 @@ impl Pool { } } +#[derive(Debug)] pub(crate) struct PoolWaiter(Arc<Pool>); impl From<Pool> for PoolWaiter { diff --git a/tokio/src/executor/current_thread/mod.rs b/tokio/src/executor/current_thread/mod.rs index 62619f2a..8ee4da82 100644 --- a/tokio/src/executor/current_thread/mod.rs +++ b/tokio/src/executor/current_thread/mod.rs @@ -1,873 +1,380 @@ -//! A single-threaded executor which executes tasks on the same thread from which -//! they are spawned. -//! -//! [`CurrentThread`] is the main type of this crate. It executes tasks on the -//! current thread. The easiest way to start a new [`CurrentThread`] executor -//! is to call [`block_on_all`] with an initial task to seed the executor. All -//! tasks that are being managed by a [`CurrentThread`] executor are able to -//! spawn additional tasks by calling [`spawn`]. -//! -//! Application authors will not use this crate directly. Instead, they will use -//! the `tokio` crate. Library authors should only depend on -//! `tokio-current-thread` if they are building a custom task executor. -//! -//! [`CurrentThread`]: struct.CurrentThread.html -//! [`spawn`]: fn.spawn.html -//! [`block_on_all`]: fn.block_on_all.html - -mod scheduler; -use self::scheduler::{Scheduler, TickArgs}; - -#[cfg(feature = "blocking")] -use crate::executor::blocking::{Pool, PoolWaiter}; -use crate::executor::park::{Park, ParkThread, Unpark}; -use crate::executor::{EnterError, Executor, SpawnError, TypedExecutor}; - -use std::cell::Cell; -use std::error::Error; +use crate::executor::park::{Park, Unpark}; +use crate::executor::task::{self, JoinHandle, Schedule, Task}; +use crate::executor::Executor; + +use std::cell::UnsafeCell; +use std::collections::VecDeque; use std::fmt; use std::future::Future; -use std::pin::Pin; -use std::rc::Rc; -use std::sync::{atomic, Arc}; -use std::task::{Context, Poll, Waker}; -use std::thread; -use std::time::{Duration, Instant}; +use std::mem::ManuallyDrop; +use std::sync::{Arc, Mutex}; +use std::task::{RawWaker, RawWakerVTable, Waker}; +use std::time::Duration; /// Executes tasks on the current thread -pub struct CurrentThread<P: Park = ParkThread> { - /// Execute futures and receive unpark notifications. - scheduler: Scheduler<P::Unpark>, - - /// Current number of futures being executed. - /// - /// The LSB is used to indicate that the runtime is preparing to shut down. - /// Thus, to get the actual number of pending futures, `>>1`. - num_futures: Arc<atomic::AtomicUsize>, - - /// Thread park handle - park: P, - - /// Handle for spawning new futures from other threads - spawn_handle: Handle, - - /// Receiver for futures spawned from other threads - spawn_receiver: crossbeam_channel::Receiver<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>, - - /// Handle to pool for handling blocking tasks - #[cfg(feature = "blocking")] - blocking: PoolWaiter, - - /// The thread-local ID assigned to this executor. - id: u64, -} - -/// Executes futures on the current thread. -/// -/// All futures executed using this executor will be executed on the current -/// thread. As such, `run` will wait for these futures to complete before -/// returning. -/// -/// For more details, see the [module level](index.html) documentation. -#[derive(Debug, Clone)] -pub struct TaskExecutor { - // Prevent the handle from moving across threads. - _p: ::std::marker::PhantomData<Rc<()>>, -} - -/// Returned by the `turn` function. #[derive(Debug)] -pub struct Turn { - polled: bool, -} +pub(crate) struct CurrentThread<P> +where + P: Park, +{ + /// Scheduler component + scheduler: Arc<Scheduler>, -impl Turn { - /// `true` if any futures were polled at all and `false` otherwise. - pub fn has_polled(&self) -> bool { - self.polled - } + /// Local state + local: Local<P>, } -/// A `CurrentThread` instance bound to a supplied execution context. -pub struct Entered<'a, P: Park> { - executor: &'a mut CurrentThread<P>, +#[derive(Debug, Clone)] +pub(crate) struct Spawner { + scheduler: Arc<Scheduler>, } -/// Error returned by the `run` function. -#[derive(Debug)] -pub struct RunError { - _p: (), -} +/// The scheduler component. +pub(super) struct Scheduler { + /// List of all active tasks spawned onto this executor. + /// + /// # Safety + /// + /// Must only be accessed from the primary thread + owned_tasks: UnsafeCell<task::OwnedList<Self>>, -impl fmt::Display for RunError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "Run error") - } -} + /// Local run queue. + /// + /// Tasks notified from the current thread are pushed into this queue. + /// + /// # Safety + /// + /// References should not be handed out. Only call `push` / `pop` functions. + /// Only call from the owning thread. + local_queue: UnsafeCell<VecDeque<Task<Scheduler>>>, -impl Error for RunError {} + /// Remote run queue. + /// + /// Tasks notified from another thread are pushed into this queue. + remote_queue: Mutex<RemoteQueue>, -/// Error returned by the `run_timeout` function. -#[derive(Debug)] -pub struct RunTimeoutError { - timeout: bool, -} + /// Tasks pending drop + pending_drop: task::TransferStack<Self>, -impl fmt::Display for RunTimeoutError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - let descr = if self.timeout { - "Run timeout error (timeout)" - } else { - "Run timeout error (not timeout)" - }; - write!(fmt, "{}", descr) - } + /// Unpark the blocked thread + unpark: Box<dyn Unpark>, } -impl Error for RunTimeoutError {} +unsafe impl Send for Scheduler {} +unsafe impl Sync for Scheduler {} -/// Error returned by the `turn` function. +/// Local state #[derive(Debug)] -pub struct TurnError { - _p: (), -} +struct Local<P> { + /// Current tick + tick: u8, -impl fmt::Display for TurnError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "Turn error") - } + /// Thread park handle + park: P, } -impl Error for TurnError {} - -/// Error returned by the `block_on` function. #[derive(Debug)] -pub struct BlockError<T> { - inner: Option<T>, -} +struct RemoteQueue { + /// FIFO list of tasks + queue: VecDeque<Task<Scheduler>>, -impl<T> fmt::Display for BlockError<T> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "Block error") - } + /// `true` when a task can be pushed into the queue, false otherwise. + open: bool, } -impl<T: fmt::Debug> Error for BlockError<T> {} +/// Max number of tasks to poll per tick. +const MAX_TASKS_PER_TICK: usize = 61; -/// This is mostly split out to make the borrow checker happy. -struct Borrow<'a, U> { - spawner: BorrowSpawner<'a, U>, - #[cfg(feature = "blocking")] - blocking: &'a PoolWaiter, -} - -/// As is this. -struct BorrowSpawner<'a, U> { - id: u64, - num_futures: &'a atomic::AtomicUsize, - scheduler: &'a mut Scheduler<U>, -} - -trait SpawnLocal { - fn spawn_local(&mut self, future: Pin<Box<dyn Future<Output = ()>>>, already_counted: bool); -} - -struct CurrentRunner { - spawn: Cell<Option<*mut dyn SpawnLocal>>, - id: Cell<Option<u64>>, -} +/// How often to check the remote queue first +const CHECK_REMOTE_INTERVAL: u8 = 13; -thread_local! { - /// Current thread's task runner. This is set in `TaskRunner::with` - static CURRENT: CurrentRunner = CurrentRunner { - spawn: Cell::new(None), - id: Cell::new(None), - } -} - -thread_local! { - /// Unique ID to assign to each new executor launched on this thread. - /// - /// The unique ID is used to determine if the currently running executor matches the one - /// referred to by a `Handle` so that direct task dispatch can be used. - static EXECUTOR_ID: Cell<u64> = Cell::new(0) -} - -/// Run the executor bootstrapping the execution with the provided future. -/// -/// This creates a new [`CurrentThread`] executor, spawns the provided future, -/// and blocks the current thread until the provided future and **all** -/// subsequently spawned futures complete. In other words: -/// -/// * If the provided bootstrap future does **not** spawn any additional tasks, -/// `block_on_all` returns once `future` completes. -/// * If the provided bootstrap future **does** spawn additional tasks, then -/// `block_on_all` returns once **all** spawned futures complete. -/// -/// See [module level][mod] documentation for more details. -/// -/// [`CurrentThread`]: struct.CurrentThread.html -/// [mod]: index.html -pub fn block_on_all<F>(future: F) -> F::Output +impl<P> CurrentThread<P> where - F: Future, + P: Park, { - let mut current_thread = CurrentThread::new(); - - let ret = current_thread.block_on(future); - current_thread.run().unwrap(); - ret -} - -/// Executes a future on the current thread. -/// -/// The provided future must complete or be canceled before `run` will return. -/// -/// Unlike [`tokio::spawn`], this function will always spawn on a -/// `CurrentThread` executor and is able to spawn futures that are not `Send`. -/// -/// # Panics -/// -/// This function can only be invoked from the context of a `run` call; any -/// other use will result in a panic. -/// -/// [`tokio::spawn`]: ../fn.spawn.html -pub fn spawn<F>(future: F) -where - F: Future<Output = ()> + 'static, -{ - TaskExecutor::current() - .spawn_local(Box::pin(future)) - .unwrap(); -} - -// ===== impl CurrentThread ===== - -impl CurrentThread<ParkThread> { - /// Create a new instance of `CurrentThread`. - pub fn new() -> Self { - CurrentThread::new_with_park(ParkThread::new()) - } -} - -impl<P: Park> CurrentThread<P> { - /// Create a new instance of `CurrentThread` backed by the given park - /// handle. - pub fn new_with_park(park: P) -> Self { + pub(crate) fn new(park: P) -> CurrentThread<P> { let unpark = park.unpark(); - let (spawn_sender, spawn_receiver) = crossbeam_channel::unbounded(); - let thread = thread::current().id(); - let id = EXECUTOR_ID.with(|idc| { - let id = idc.get(); - idc.set(id + 1); - id - }); - - let scheduler = Scheduler::new(unpark); - let waker = scheduler.waker(); - - let num_futures = Arc::new(atomic::AtomicUsize::new(0)); - CurrentThread { - scheduler, - num_futures: num_futures.clone(), - park, - id, - spawn_handle: Handle { - sender: spawn_sender, - num_futures, - waker, - thread, - id, - }, - spawn_receiver, - - #[cfg(feature = "blocking")] - blocking: PoolWaiter::from(Pool::default()), + scheduler: Arc::new(Scheduler { + owned_tasks: UnsafeCell::new(task::OwnedList::new()), + local_queue: UnsafeCell::new(VecDeque::with_capacity(64)), + remote_queue: Mutex::new(RemoteQueue { + queue: VecDeque::with_capacity(64), + open: true, + }), + pending_drop: task::TransferStack::new(), + unpark: Box::new(unpark), + }), + local: Local { tick: 0, park }, } } - /// Returns `true` if the executor is currently idle. - /// - /// An idle executor is defined by not currently having any spawned tasks. - /// - /// Note that this method is inherently racy -- if a future is spawned from a remote `Handle`, - /// this method may return `true` even though there are more futures to be executed. - pub fn is_idle(&self) -> bool { - self.num_futures.load(atomic::Ordering::SeqCst) <= 1 + pub(crate) fn spawner(&self) -> Spawner { + Spawner { + scheduler: self.scheduler.clone(), + } } - /// Spawn the future on the executor. - /// - /// This internally queues the future to be executed once `run` is called. - pub fn spawn<F>(&mut self, future: F) -> &mut Self + /// Spawn a future onto the thread pool + pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> where - F: Future<Output = ()> + 'static, + F: Future + Send + 'static, + F::Output: Send + 'static, { - self.borrow().spawner.spawn_local(Box::pin(future), false); - self + let (task, handle) = task::joinable(future); + self.scheduler.schedule(task); + handle } - /// Synchronously waits for the provided `future` to complete. - /// - /// This function can be used to synchronously block the current thread - /// until the provided `future` has resolved either successfully or with an - /// error. The result of the future is then returned from this function - /// call. - /// - /// Note that this function will **also** execute any spawned futures on the - /// current thread, but will **not** block until these other spawned futures - /// have completed. - /// - /// The caller is responsible for ensuring that other spawned futures - /// complete execution. - pub fn block_on<F>(&mut self, future: F) -> F::Output + pub(crate) fn block_on<F>(&mut self, mut future: F) -> F::Output where F: Future, { - let _enter = crate::executor::enter().expect("failed to start `current_thread::Runtime`"); - self.enter().block_on(future) - } - - /// Run the executor to completion, blocking the thread until **all** - /// spawned futures have completed. - pub fn run(&mut self) -> Result<(), RunError> { - let _enter = crate::executor::enter().expect("failed to start `current_thread::Runtime`"); - self.enter().run() - } - - /// Run the executor to completion, blocking the thread until all - /// spawned futures have completed **or** `duration` time has elapsed. - pub fn run_timeout(&mut self, duration: Duration) -> Result<(), RunTimeoutError> { - let _enter = crate::executor::enter().expect("failed to start `current_thread::Runtime`"); - self.enter().run_timeout(duration) - } - - /// Perform a single iteration of the event loop. - /// - /// This function blocks the current thread even if the executor is idle. - pub fn turn(&mut self, duration: Option<Duration>) -> Result<Turn, TurnError> { - let _enter = crate::executor::enter().expect("failed to start `current_thread::Runtime`"); - self.enter().turn(duration) - } - - /// Bind `CurrentThread` instance with an execution context. - fn enter(&mut self) -> Entered<'_, P> { - Entered { executor: self } - } + use std::pin::Pin; + use std::task::Context; + use std::task::Poll::Ready; - /// Returns a reference to the underlying `Park` instance. - pub fn get_park(&self) -> &P { - &self.park - } + let local = &mut self.local; + let scheduler = &*self.scheduler; - /// Returns a mutable reference to the underlying `Park` instance. - pub fn get_park_mut(&mut self) -> &mut P { - &mut self.park - } + crate::executor::global::with_current_thread(scheduler, || { + let mut _enter = + crate::executor::enter().expect("attempting to block while on a Tokio executor"); - fn borrow(&mut self) -> Borrow<'_, P::Unpark> { - Borrow { - spawner: BorrowSpawner { - id: self.id, - scheduler: &mut self.scheduler, - num_futures: &*self.num_futures, - }, - #[cfg(feature = "blocking")] - blocking: &self.blocking, - } - } + let raw_waker = RawWaker::new( + scheduler as *const Scheduler as *const (), + &RawWakerVTable::new(sched_clone_waker, sched_noop, sched_wake_by_ref, sched_noop), + ); - /// Get a new handle to spawn futures on the executor - /// - /// Different to the executor itself, the handle can be sent to different - /// threads and can be used to spawn futures on the executor. - pub fn handle(&self) -> Handle { - self.spawn_handle.clone() - } -} + let waker = ManuallyDrop::new(unsafe { Waker::from_raw(raw_waker) }); + let mut cx = Context::from_waker(&waker); -impl<P: Park> Drop for CurrentThread<P> { - fn drop(&mut self) { - // Signal to Handles that no more futures can be spawned by setting LSB. - // - // NOTE: this isn't technically necessary since the send on the mpsc will fail once the - // receiver is dropped, but it's useful to illustrate how clean shutdown will be - // implemented (e.g., by setting the LSB). - let pending = self.num_futures.fetch_add(1, atomic::Ordering::SeqCst); - - // TODO: We currently ignore any pending futures at the time we shut down. - // - // The "proper" fix for this is to have an explicit shutdown phase (`shutdown_on_idle`) - // which sets LSB (as above) do make Handle::spawn stop working, and then runs until - // num_futures.load() == 1. - let _ = pending; - - // We will wait for any blocking ops by virtue of dropping `blocking`. - } -} + // `block_on` takes ownership of `f`. Once it is pinned here, the + // original `f` binding can no longer be accessed, making the + // pinning safe. + let mut future = unsafe { Pin::new_unchecked(&mut future) }; -impl Executor for CurrentThread { - fn spawn( - &mut self, - future: Pin<Box<dyn Future<Output = ()> + Send>>, - ) -> Result<(), SpawnError> { - self.borrow().spawner.spawn_local(future, false); - Ok(()) - } -} + loop { + if let Ready(v) = future.as_mut().poll(&mut cx) { + return v; + } -impl<T> TypedExecutor<T> for CurrentThread -where - T: Future<Output = ()> + 'static, -{ - fn spawn(&mut self, future: T) -> Result<(), SpawnError> { - self.borrow().spawner.spawn_local(Box::pin(future), false); - Ok(()) - } -} + scheduler.tick(local); -impl<P: Park> fmt::Debug for CurrentThread<P> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("CurrentThread") - .field("scheduler", &self.scheduler) - .field( - "num_futures", - &self.num_futures.load(atomic::Ordering::SeqCst), - ) - .finish() - } -} - -impl<P: Park + Default> Default for CurrentThread<P> { - fn default() -> Self { - CurrentThread::new_with_park(P::default()) + // Maintenance work + scheduler.drain_pending_drop(); + } |