From 0b3918bce956567cccc617213a56c339a5a21d6f Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Thu, 5 Nov 2020 13:00:13 +0100 Subject: rt: bring back a public Handle type (#3076) Signed-off-by: Marc-Antoine Perennou Co-authored-by: Alice Ryhl Co-authored-by: Carl Lerche --- tokio/src/runtime/basic_scheduler.rs | 9 -- tokio/src/runtime/handle.rs | 166 ++++++++++++++++++++++++++++++++--- tokio/src/runtime/mod.rs | 46 +++++----- tokio/src/runtime/thread_pool/mod.rs | 9 -- 4 files changed, 178 insertions(+), 52 deletions(-) diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 5ca84671..860eab40 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -125,15 +125,6 @@ impl BasicScheduler

{ &self.spawner } - /// Spawns a future onto the thread pool - pub(crate) fn spawn(&self, future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - self.spawner.spawn(future) - } - pub(crate) fn block_on(&self, future: F) -> F::Output { pin!(future); diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 72b9c065..138d13b2 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -1,6 +1,9 @@ use crate::runtime::blocking::task::BlockingTask; use crate::runtime::task::{self, JoinHandle}; -use crate::runtime::{blocking, driver, Spawner}; +use crate::runtime::{blocking, context, driver, Spawner}; + +use std::future::Future; +use std::{error, fmt}; /// Handle to the runtime. /// @@ -9,7 +12,7 @@ use crate::runtime::{blocking, driver, Spawner}; /// /// [`Runtime::handle`]: crate::runtime::Runtime::handle() #[derive(Debug, Clone)] -pub(crate) struct Handle { +pub struct Handle { pub(super) spawner: Spawner, /// Handles to the I/O drivers @@ -28,21 +31,143 @@ pub(crate) struct Handle { pub(super) blocking_spawner: blocking::Spawner, } +/// Runtime context guard. +/// +/// Returned by [`Runtime::enter`] and [`Handle::enter`], the context guard exits +/// the runtime context on drop. +/// +/// [`Runtime::enter`]: fn@crate::runtime::Runtime::enter +#[derive(Debug)] +pub struct EnterGuard<'a> { + handle: &'a Handle, + guard: context::EnterGuard, +} + impl Handle { - // /// Enter the runtime context. This allows you to construct types that must - // /// have an executor available on creation such as [`Sleep`] or [`TcpStream`]. - // /// It will also allow you to call methods such as [`tokio::spawn`]. - // pub(crate) fn enter(&self, f: F) -> R - // where - // F: FnOnce() -> R, - // { - // context::enter(self.clone(), f) - // } + /// Enter the runtime context. This allows you to construct types that must + /// have an executor available on creation such as [`Sleep`] or [`TcpStream`]. + /// It will also allow you to call methods such as [`tokio::spawn`]. + /// + /// [`Sleep`]: struct@crate::time::Sleep + /// [`TcpStream`]: struct@crate::net::TcpStream + /// [`tokio::spawn`]: fn@crate::spawn + pub fn enter(&self) -> EnterGuard<'_> { + EnterGuard { + handle: self, + guard: context::enter(self.clone()), + } + } + + /// Returns a `Handle` view over the currently running `Runtime` + /// + /// # Panic + /// + /// This will panic if called outside the context of a Tokio runtime. That means that you must + /// call this on one of the threads **being run by the runtime**. Calling this from within a + /// thread created by `std::thread::spawn` (for example) will cause a panic. + /// + /// # Examples + /// + /// This can be used to obtain the handle of the surrounding runtime from an async + /// block or function running on that runtime. + /// + /// ``` + /// # use std::thread; + /// # use tokio::runtime::Runtime; + /// # fn dox() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.spawn(async { + /// use tokio::runtime::Handle; + /// + /// // Inside an async block or function. + /// let handle = Handle::current(); + /// handle.spawn(async { + /// println!("now running in the existing Runtime"); + /// }); + /// + /// # let handle = + /// thread::spawn(move || { + /// // Notice that the handle is created outside of this thread and then moved in + /// handle.spawn(async { /* ... */ }) + /// // This next line would cause a panic + /// // let handle2 = Handle::current(); + /// }); + /// # handle.join().unwrap(); + /// # }); + /// # } + /// ``` + pub fn current() -> Self { + context::current().expect("not currently running on the Tokio runtime.") + } + + /// Returns a Handle view over the currently running Runtime + /// + /// Returns an error if no Runtime has been started + /// + /// Contrary to `current`, this never panics + pub fn try_current() -> Result { + context::current().ok_or(TryCurrentError(())) + } + + /// Spawn a future onto the Tokio runtime. + /// + /// This spawns the given future onto the runtime's executor, usually a + /// thread pool. The thread pool is then responsible for polling the future + /// until it completes. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// // Get a handle from this runtime + /// let handle = rt.handle(); + /// + /// // Spawn a future onto the runtime using the handle + /// handle.spawn(async { + /// println!("now running on a worker thread"); + /// }); + /// # } + /// ``` + #[cfg_attr(tokio_track_caller, track_caller)] + pub fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + #[cfg(feature = "tracing")] + let future = crate::util::trace::task(future, "task"); + self.spawner.spawn(future) + } /// Run the provided function on an executor dedicated to blocking /// operations. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// // Get a handle from this runtime + /// let handle = rt.handle(); + /// + /// // Spawn a blocking function onto the runtime using the handle + /// handle.spawn_blocking(|| { + /// println!("now running on a worker thread"); + /// }); + /// # } #[cfg_attr(tokio_track_caller, track_caller)] - pub(crate) fn spawn_blocking(&self, func: F) -> JoinHandle + pub fn spawn_blocking(&self, func: F) -> JoinHandle where F: FnOnce() -> R + Send + 'static, R: Send + 'static, @@ -76,3 +201,20 @@ impl Handle { handle } } + +/// Error returned by `try_current` when no Runtime has been started +pub struct TryCurrentError(()); + +impl fmt::Debug for TryCurrentError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TryCurrentError").finish() + } +} + +impl fmt::Display for TryCurrentError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("no tokio Runtime has been initialized") + } +} + +impl error::Error for TryCurrentError {} diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index f85344db..d7f068ec 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -198,7 +198,7 @@ cfg_rt! { use self::enter::enter; mod handle; - use handle::Handle; + pub use handle::{EnterGuard, Handle}; mod spawner; use self::spawner::Spawner; @@ -272,16 +272,6 @@ cfg_rt! { blocking_pool: BlockingPool, } - /// Runtime context guard. - /// - /// Returned by [`Runtime::enter`], the context guard exits the runtime - /// context on drop. - #[derive(Debug)] - pub struct EnterGuard<'a> { - rt: &'a Runtime, - guard: context::EnterGuard, - } - /// The runtime executor is either a thread-pool or a current-thread executor. #[derive(Debug)] enum Kind { @@ -332,6 +322,27 @@ cfg_rt! { Builder::new_multi_thread().enable_all().build() } + /// Return a handle to the runtime's spawner. + /// + /// The returned handle can be used to spawn tasks that run on this runtime, and can + /// be cloned to allow moving the `Handle` to other threads. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// let handle = rt.handle(); + /// + /// // Use the handle... + /// ``` + pub fn handle(&self) -> &Handle { + &self.handle + } + /// Spawn a future onto the Tokio runtime. /// /// This spawns the given future onto the runtime's executor, usually a @@ -363,13 +374,7 @@ cfg_rt! { F: Future + Send + 'static, F::Output: Send + 'static, { - #[cfg(feature = "tracing")] - let future = crate::util::trace::task(future, "task"); - match &self.kind { - #[cfg(feature = "rt-multi-thread")] - Kind::ThreadPool(exec) => exec.spawn(future), - Kind::CurrentThread(exec) => exec.spawn(future), - } + self.handle.spawn(future) } /// Run the provided function on an executor dedicated to blocking operations. @@ -481,10 +486,7 @@ cfg_rt! { /// } /// ``` pub fn enter(&self) -> EnterGuard<'_> { - EnterGuard { - rt: self, - guard: context::enter(self.handle.clone()), - } + self.handle.enter() } /// Shutdown the runtime, waiting for at most `duration` for all spawned diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index e39695a9..47f8ee34 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -59,15 +59,6 @@ impl ThreadPool { &self.spawner } - /// Spawns a task - pub(crate) fn spawn(&self, future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - self.spawner.spawn(future) - } - /// Blocks the current thread waiting for the future to complete. /// /// The future will execute on the current thread, but all spawned tasks -- cgit v1.2.3