summaryrefslogtreecommitdiffstats
path: root/tokio/src/executor/global.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/executor/global.rs')
-rw-r--r--tokio/src/executor/global.rs233
1 files changed, 233 insertions, 0 deletions
diff --git a/tokio/src/executor/global.rs b/tokio/src/executor/global.rs
new file mode 100644
index 00000000..c60d4511
--- /dev/null
+++ b/tokio/src/executor/global.rs
@@ -0,0 +1,233 @@
+#[cfg(feature = "rt-full")]
+use crate::executor::thread_pool::ThreadPool;
+use crate::executor::{Executor, SpawnError};
+
+use std::cell::Cell;
+use std::future::Future;
+use std::pin::Pin;
+
+/// Executes futures on the default executor for the current execution context.
+///
+/// `DefaultExecutor` implements `Executor` and can be used to spawn futures
+/// without referencing a specific executor.
+///
+/// When an executor starts, it sets the `DefaultExecutor` handle to point to an
+/// executor (usually itself) that is used to spawn new tasks.
+///
+/// The current `DefaultExecutor` reference is tracked using a thread-local
+/// variable and is set using `tokio::executor::with_default`
+#[derive(Debug, Clone)]
+pub struct DefaultExecutor {
+ _dummy: (),
+}
+
+impl DefaultExecutor {
+ /// Returns a handle to the default executor for the current context.
+ ///
+ /// Futures may be spawned onto the default executor using this handle.
+ ///
+ /// The returned handle will reference whichever executor is configured as
+ /// the default **at the time `spawn` is called**. This enables
+ /// `DefaultExecutor::current()` to be called before an execution context is
+ /// setup, then passed **into** an execution context before it is used.
+ ///
+ /// This is also true for sending the handle across threads, so calling
+ /// `DefaultExecutor::current()` on thread A and then sending the result to
+ /// thread B will _not_ reference the default executor that was set on thread A.
+ pub fn current() -> DefaultExecutor {
+ DefaultExecutor { _dummy: () }
+ }
+
+ #[inline]
+ fn with_current<F: FnOnce(&mut dyn Executor) -> R, R>(f: F) -> Option<R> {
+ EXECUTOR.with(|current_executor| match current_executor.get() {
+ State::Ready(executor_ptr) => {
+ let executor = unsafe { &mut *executor_ptr };
+ Some(f(executor))
+ }
+ #[cfg(feature = "rt-full")]
+ State::ThreadPool(threadpool_ptr) => {
+ let mut thread_pool = unsafe { &*threadpool_ptr };
+ Some(f(&mut thread_pool))
+ }
+ State::Empty => None,
+ })
+ }
+}
+
+#[derive(Clone, Copy)]
+enum State {
+ // default executor not defined
+ Empty,
+
+ // default executor is a thread pool instance.
+ #[cfg(feature = "rt-full")]
+ ThreadPool(*const ThreadPool),
+
+ // default executor is set to a custom executor.
+ Ready(*mut dyn Executor),
+}
+
+thread_local! {
+ /// Thread-local tracking the current executor
+ static EXECUTOR: Cell<State> = Cell::new(State::Empty)
+}
+
+// ===== impl DefaultExecutor =====
+
+impl super::Executor for DefaultExecutor {
+ fn spawn(
+ &mut self,
+ future: Pin<Box<dyn Future<Output = ()> + Send>>,
+ ) -> Result<(), SpawnError> {
+ DefaultExecutor::with_current(|executor| executor.spawn(future))
+ .unwrap_or_else(|| Err(SpawnError::shutdown()))
+ }
+
+ fn status(&self) -> Result<(), SpawnError> {
+ DefaultExecutor::with_current(|executor| executor.status())
+ .unwrap_or_else(|| Err(SpawnError::shutdown()))
+ }
+}
+
+impl<T> super::TypedExecutor<T> for DefaultExecutor
+where
+ T: Future<Output = ()> + Send + 'static,
+{
+ fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
+ super::Executor::spawn(self, Box::pin(future))
+ }
+
+ fn status(&self) -> Result<(), SpawnError> {
+ super::Executor::status(self)
+ }
+}
+
+// ===== global spawn fns =====
+
+/// Spawns a future on the default executor.
+///
+/// In order for a future to do work, it must be spawned on an executor. The
+/// `spawn` function is the easiest way to do this. It spawns a future on the
+/// [default executor] for the current execution context (tracked using a
+/// thread-local variable).
+///
+/// The default executor is **usually** a thread pool.
+///
+/// # Examples
+///
+/// In this example, a server is started and `spawn` is used to start a new task
+/// that processes each received connection.
+///
+/// ```
+/// use tokio::net::TcpListener;
+///
+/// # async fn process<T>(_t: T) {}
+/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
+///
+/// loop {
+/// let (socket, _) = listener.accept().await?;
+///
+/// tokio::spawn(async move {
+/// // Process each socket concurrently.
+/// process(socket).await
+/// });
+/// }
+/// # }
+/// ```
+///
+/// [default executor]: struct.DefaultExecutor.html
+///
+/// # Panics
+///
+/// This function will panic if the default executor is not set or if spawning
+/// onto the default executor returns an error. To avoid the panic, use
+/// [`DefaultExecutor`].
+///
+/// [`DefaultExecutor`]: struct.DefaultExecutor.html
+pub fn spawn<T>(future: T)
+where
+ T: Future<Output = ()> + Send + 'static,
+{
+ EXECUTOR.with(|current_executor| match current_executor.get() {
+ State::Ready(executor_ptr) => {
+ let executor = unsafe { &mut *executor_ptr };
+ executor.spawn(Box::pin(future)).unwrap();
+ }
+ #[cfg(feature = "rt-full")]
+ State::ThreadPool(threadpool_ptr) => {
+ let thread_pool = unsafe { &*threadpool_ptr };
+ thread_pool.spawn_background(future);
+ }
+ State::Empty => panic!("must be called from the context of Tokio runtime"),
+ })
+}
+
+#[cfg(feature = "rt-full")]
+pub(crate) fn with_threadpool<F, R>(thread_pool: &ThreadPool, f: F) -> R
+where
+ F: FnOnce() -> R,
+{
+ with_state(State::ThreadPool(thread_pool as *const ThreadPool), f)
+}
+
+/// Set the default executor for the duration of the closure
+///
+/// If a default executor is already set, it will be restored when the closure returns or if it
+/// panics.
+pub fn with_default<T, F, R>(executor: &mut T, f: F) -> R
+where
+ T: Executor,
+ F: FnOnce() -> R,
+{
+ // While scary, this is safe. The function takes a
+ // `&mut Executor`, which guarantees that the reference lives for the
+ // duration of `with_default`.
+ //
+ // Because we are always clearing the TLS value at the end of the
+ // function, we can cast the reference to 'static which thread-local
+ // cells require.
+ let executor = unsafe { hide_lt(executor as &mut _ as *mut _) };
+ with_state(State::Ready(executor), f)
+}
+
+fn with_state<F, R>(state: State, f: F) -> R
+where
+ F: FnOnce() -> R,
+{
+ EXECUTOR.with(|cell| {
+ let was = cell.replace(State::Empty);
+
+ // Ensure that the executor is removed from the thread-local context
+ // when leaving the scope. This handles cases that involve panicking.
+ struct Reset<'a>(&'a Cell<State>, State);
+
+ impl Drop for Reset<'_> {
+ fn drop(&mut self) {
+ self.0.set(self.1);
+ }
+ }
+
+ let _reset = Reset(cell, was);
+
+ if let State::Ready(executor) = state {
+ let executor = unsafe { &mut *executor };
+
+ if executor.status().is_err() {
+ panic!("executor not active; is this because `with_default` is called with `DefaultExecutor`?");
+ }
+ }
+
+ cell.set(state);
+
+ f()
+ })
+}
+
+unsafe fn hide_lt<'a>(p: *mut (dyn Executor + 'a)) -> *mut (dyn Executor + 'static) {
+ use std::mem;
+ // false positive: https://github.com/rust-lang/rust-clippy/issues/2906
+ #[allow(clippy::transmute_ptr_to_ptr)]
+ mem::transmute(p)
+}