summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarc-Antoine Perennou <Marc-Antoine@Perennou.com>2020-11-05 13:00:13 +0100
committerGitHub <noreply@github.com>2020-11-05 13:00:13 +0100
commit0b3918bce956567cccc617213a56c339a5a21d6f (patch)
treefbbb4f81f135282e9f4f9b39c0ad79200cb05e32
parente309da0beeb48a0c2ecd0f15eb600f871ec98e19 (diff)
rt: bring back a public Handle type (#3076)
Signed-off-by: Marc-Antoine Perennou <Marc-Antoine@Perennou.com> Co-authored-by: Alice Ryhl <alice@ryhl.io> Co-authored-by: Carl Lerche <me@carllerche.com>
-rw-r--r--tokio/src/runtime/basic_scheduler.rs9
-rw-r--r--tokio/src/runtime/handle.rs166
-rw-r--r--tokio/src/runtime/mod.rs46
-rw-r--r--tokio/src/runtime/thread_pool/mod.rs9
4 files changed, 178 insertions, 52 deletions
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs
index 5ca84671..860eab40 100644
--- a/tokio/src/runtime/basic_scheduler.rs
+++ b/tokio/src/runtime/basic_scheduler.rs
@@ -125,15 +125,6 @@ impl<P: Park> BasicScheduler<P> {
&self.spawner
}
- /// Spawns a future onto the thread pool
- pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
- where
- F: Future + Send + 'static,
- F::Output: Send + 'static,
- {
- self.spawner.spawn(future)
- }
-
pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output {
pin!(future);
diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs
index 72b9c065..138d13b2 100644
--- a/tokio/src/runtime/handle.rs
+++ b/tokio/src/runtime/handle.rs
@@ -1,6 +1,9 @@
use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::task::{self, JoinHandle};
-use crate::runtime::{blocking, driver, Spawner};
+use crate::runtime::{blocking, context, driver, Spawner};
+
+use std::future::Future;
+use std::{error, fmt};
/// Handle to the runtime.
///
@@ -9,7 +12,7 @@ use crate::runtime::{blocking, driver, Spawner};
///
/// [`Runtime::handle`]: crate::runtime::Runtime::handle()
#[derive(Debug, Clone)]
-pub(crate) struct Handle {
+pub struct Handle {
pub(super) spawner: Spawner,
/// Handles to the I/O drivers
@@ -28,21 +31,143 @@ pub(crate) struct Handle {
pub(super) blocking_spawner: blocking::Spawner,
}
+/// Runtime context guard.
+///
+/// Returned by [`Runtime::enter`] and [`Handle::enter`], the context guard exits
+/// the runtime context on drop.
+///
+/// [`Runtime::enter`]: fn@crate::runtime::Runtime::enter
+#[derive(Debug)]
+pub struct EnterGuard<'a> {
+ handle: &'a Handle,
+ guard: context::EnterGuard,
+}
+
impl Handle {
- // /// Enter the runtime context. This allows you to construct types that must
- // /// have an executor available on creation such as [`Sleep`] or [`TcpStream`].
- // /// It will also allow you to call methods such as [`tokio::spawn`].
- // pub(crate) fn enter<F, R>(&self, f: F) -> R
- // where
- // F: FnOnce() -> R,
- // {
- // context::enter(self.clone(), f)
- // }
+ /// Enter the runtime context. This allows you to construct types that must
+ /// have an executor available on creation such as [`Sleep`] or [`TcpStream`].
+ /// It will also allow you to call methods such as [`tokio::spawn`].
+ ///
+ /// [`Sleep`]: struct@crate::time::Sleep
+ /// [`TcpStream`]: struct@crate::net::TcpStream
+ /// [`tokio::spawn`]: fn@crate::spawn
+ pub fn enter(&self) -> EnterGuard<'_> {
+ EnterGuard {
+ handle: self,
+ guard: context::enter(self.clone()),
+ }
+ }
+
+ /// Returns a `Handle` view over the currently running `Runtime`
+ ///
+ /// # Panic
+ ///
+ /// This will panic if called outside the context of a Tokio runtime. That means that you must
+ /// call this on one of the threads **being run by the runtime**. Calling this from within a
+ /// thread created by `std::thread::spawn` (for example) will cause a panic.
+ ///
+ /// # Examples
+ ///
+ /// This can be used to obtain the handle of the surrounding runtime from an async
+ /// block or function running on that runtime.
+ ///
+ /// ```
+ /// # use std::thread;
+ /// # use tokio::runtime::Runtime;
+ /// # fn dox() {
+ /// # let rt = Runtime::new().unwrap();
+ /// # rt.spawn(async {
+ /// use tokio::runtime::Handle;
+ ///
+ /// // Inside an async block or function.
+ /// let handle = Handle::current();
+ /// handle.spawn(async {
+ /// println!("now running in the existing Runtime");
+ /// });
+ ///
+ /// # let handle =
+ /// thread::spawn(move || {
+ /// // Notice that the handle is created outside of this thread and then moved in
+ /// handle.spawn(async { /* ... */ })
+ /// // This next line would cause a panic
+ /// // let handle2 = Handle::current();
+ /// });
+ /// # handle.join().unwrap();
+ /// # });
+ /// # }
+ /// ```
+ pub fn current() -> Self {
+ context::current().expect("not currently running on the Tokio runtime.")
+ }
+
+ /// Returns a Handle view over the currently running Runtime
+ ///
+ /// Returns an error if no Runtime has been started
+ ///
+ /// Contrary to `current`, this never panics
+ pub fn try_current() -> Result<Self, TryCurrentError> {
+ context::current().ok_or(TryCurrentError(()))
+ }
+
+ /// Spawn a future onto the Tokio runtime.
+ ///
+ /// This spawns the given future onto the runtime's executor, usually a
+ /// thread pool. The thread pool is then responsible for polling the future
+ /// until it completes.
+ ///
+ /// See [module level][mod] documentation for more details.
+ ///
+ /// [mod]: index.html
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ ///
+ /// # fn dox() {
+ /// // Create the runtime
+ /// let rt = Runtime::new().unwrap();
+ /// // Get a handle from this runtime
+ /// let handle = rt.handle();
+ ///
+ /// // Spawn a future onto the runtime using the handle
+ /// handle.spawn(async {
+ /// println!("now running on a worker thread");
+ /// });
+ /// # }
+ /// ```
+ #[cfg_attr(tokio_track_caller, track_caller)]
+ pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ #[cfg(feature = "tracing")]
+ let future = crate::util::trace::task(future, "task");
+ self.spawner.spawn(future)
+ }
/// Run the provided function on an executor dedicated to blocking
/// operations.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ ///
+ /// # fn dox() {
+ /// // Create the runtime
+ /// let rt = Runtime::new().unwrap();
+ /// // Get a handle from this runtime
+ /// let handle = rt.handle();
+ ///
+ /// // Spawn a blocking function onto the runtime using the handle
+ /// handle.spawn_blocking(|| {
+ /// println!("now running on a worker thread");
+ /// });
+ /// # }
#[cfg_attr(tokio_track_caller, track_caller)]
- pub(crate) fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
+ pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
@@ -76,3 +201,20 @@ impl Handle {
handle
}
}
+
+/// Error returned by `try_current` when no Runtime has been started
+pub struct TryCurrentError(());
+
+impl fmt::Debug for TryCurrentError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TryCurrentError").finish()
+ }
+}
+
+impl fmt::Display for TryCurrentError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.write_str("no tokio Runtime has been initialized")
+ }
+}
+
+impl error::Error for TryCurrentError {}
diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs
index f85344db..d7f068ec 100644
--- a/tokio/src/runtime/mod.rs
+++ b/tokio/src/runtime/mod.rs
@@ -198,7 +198,7 @@ cfg_rt! {
use self::enter::enter;
mod handle;
- use handle::Handle;
+ pub use handle::{EnterGuard, Handle};
mod spawner;
use self::spawner::Spawner;
@@ -272,16 +272,6 @@ cfg_rt! {
blocking_pool: BlockingPool,
}
- /// Runtime context guard.
- ///
- /// Returned by [`Runtime::enter`], the context guard exits the runtime
- /// context on drop.
- #[derive(Debug)]
- pub struct EnterGuard<'a> {
- rt: &'a Runtime,
- guard: context::EnterGuard,
- }
-
/// The runtime executor is either a thread-pool or a current-thread executor.
#[derive(Debug)]
enum Kind {
@@ -332,6 +322,27 @@ cfg_rt! {
Builder::new_multi_thread().enable_all().build()
}
+ /// Return a handle to the runtime's spawner.
+ ///
+ /// The returned handle can be used to spawn tasks that run on this runtime, and can
+ /// be cloned to allow moving the `Handle` to other threads.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ ///
+ /// let rt = Runtime::new()
+ /// .unwrap();
+ ///
+ /// let handle = rt.handle();
+ ///
+ /// // Use the handle...
+ /// ```
+ pub fn handle(&self) -> &Handle {
+ &self.handle
+ }
+
/// Spawn a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
@@ -363,13 +374,7 @@ cfg_rt! {
F: Future + Send + 'static,
F::Output: Send + 'static,
{
- #[cfg(feature = "tracing")]
- let future = crate::util::trace::task(future, "task");
- match &self.kind {
- #[cfg(feature = "rt-multi-thread")]
- Kind::ThreadPool(exec) => exec.spawn(future),
- Kind::CurrentThread(exec) => exec.spawn(future),
- }
+ self.handle.spawn(future)
}
/// Run the provided function on an executor dedicated to blocking operations.
@@ -481,10 +486,7 @@ cfg_rt! {
/// }
/// ```
pub fn enter(&self) -> EnterGuard<'_> {
- EnterGuard {
- rt: self,
- guard: context::enter(self.handle.clone()),
- }
+ self.handle.enter()
}
/// Shutdown the runtime, waiting for at most `duration` for all spawned
diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs
index e39695a9..47f8ee34 100644
--- a/tokio/src/runtime/thread_pool/mod.rs
+++ b/tokio/src/runtime/thread_pool/mod.rs
@@ -59,15 +59,6 @@ impl ThreadPool {
&self.spawner
}
- /// Spawns a task
- pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
- where
- F: Future + Send + 'static,
- F::Output: Send + 'static,
- {
- self.spawner.spawn(future)
- }
-
/// Blocks the current thread waiting for the future to complete.
///
/// The future will execute on the current thread, but all spawned tasks