//! Tokio context aware futures utilities. //! //! This module includes utilities around integrating tokio with other runtimes //! by allowing the context to be attached to futures. This allows spawning //! futures on other executors while still using tokio to drive them. This //! can be useful if you need to use a tokio based library in an executor/runtime //! that does not provide a tokio context. use pin_project_lite::pin_project; use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; use tokio::runtime::{Handle, Runtime}; pin_project! { /// `TokioContext` allows running futures that must be inside Tokio's /// context on a non-Tokio runtime. /// /// It contains a [`Handle`] to the runtime. A handle to the runtime can be /// obtain by calling the [`Runtime::handle()`] method. /// /// Note that the `TokioContext` wrapper only works if the `Runtime` it is /// connected to has not yet been destroyed. You must keep the `Runtime` /// alive until the future has finished executing. /// /// **Warning:** If `TokioContext` is used together with a [current thread] /// runtime, that runtime must be inside a call to `block_on` for the /// wrapped future to work. For this reason, it is recommended to use a /// [multi thread] runtime, even if you configure it to only spawn one /// worker thread. /// /// # Examples /// /// This example creates two runtimes, but only [enables time] on one of /// them. It then uses the context of the runtime with the timer enabled to /// execute a [`sleep`] future on the runtime with timing disabled. /// ``` /// use tokio::time::{sleep, Duration}; /// use tokio_util::context::RuntimeExt; /// /// // This runtime has timers enabled. /// let rt = tokio::runtime::Builder::new_multi_thread() /// .enable_all() /// .build() /// .unwrap(); /// /// // This runtime has timers disabled. /// let rt2 = tokio::runtime::Builder::new_multi_thread() /// .build() /// .unwrap(); /// /// // Wrap the sleep future in the context of rt. /// let fut = rt.wrap(async { sleep(Duration::from_millis(2)).await }); /// /// // Execute the future on rt2. /// rt2.block_on(fut); /// ``` /// /// [`Handle`]: struct@tokio::runtime::Handle /// [`Runtime::handle()`]: fn@tokio::runtime::Runtime::handle /// [`RuntimeExt`]: trait@crate::context::RuntimeExt /// [`new_static`]: fn@Self::new_static /// [`sleep`]: fn@tokio::time::sleep /// [current thread]: fn@tokio::runtime::Builder::new_current_thread /// [enables time]: fn@tokio::runtime::Builder::enable_time /// [multi thread]: fn@tokio::runtime::Builder::new_multi_thread pub struct TokioContext { #[pin] inner: F, handle: Handle, } } impl TokioContext { /// Associate the provided future with the context of the runtime behind /// the provided `Handle`. /// /// This constructor uses a `'static` lifetime to opt-out of checking that /// the runtime still exists. /// /// # Examples /// /// This is the same as the example above, but uses the `new` constructor /// rather than [`RuntimeExt::wrap`]. /// /// [`RuntimeExt::wrap`]: fn@RuntimeExt::wrap /// /// ``` /// use tokio::time::{sleep, Duration}; /// use tokio_util::context::TokioContext; /// /// // This runtime has timers enabled. /// let rt = tokio::runtime::Builder::new_multi_thread() /// .enable_all() /// .build() /// .unwrap(); /// /// // This runtime has timers disabled. /// let rt2 = tokio::runtime::Builder::new_multi_thread() /// .build() /// .unwrap(); /// /// let fut = TokioContext::new( /// async { sleep(Duration::from_millis(2)).await }, /// rt.handle().clone(), /// ); /// /// // Execute the future on rt2. /// rt2.block_on(fut); /// ``` pub fn new(future: F, handle: Handle) -> TokioContext { TokioContext { inner: future, handle, } } /// Obtain a reference to the handle inside this `TokioContext`. pub fn handle(&self) -> &Handle { &self.handle } /// Remove the association between the Tokio runtime and the wrapped future. pub fn into_inner(self) -> F { self.inner } } impl Future for TokioContext { type Output = F::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = self.project(); let handle = me.handle; let fut = me.inner; let _enter = handle.enter(); fut.poll(cx) } } /// Extension trait that simplifies bundling a `Handle` with a `Future`. pub trait RuntimeExt { /// Create a [`TokioContext`] that wraps the provided future and runs it in /// this runtime's context. /// /// # Examples /// /// This example creates two runtimes, but only [enables time] on one of /// them. It then uses the context of the runtime with the timer enabled to /// execute a [`sleep`] future on the runtime with timing disabled. /// /// ``` /// use tokio::time::{sleep, Duration}; /// use tokio_util::context::RuntimeExt; /// /// // This runtime has timers enabled. /// let rt = tokio::runtime::Builder::new_multi_thread() /// .enable_all() /// .build() /// .unwrap(); /// /// // This runtime has timers disabled. /// let rt2 = tokio::runtime::Builder::new_multi_thread() /// .build() /// .unwrap(); /// /// // Wrap the sleep future in the context of rt. /// let fut = rt.wrap(async { sleep(Duration::from_millis(2)).await }); /// /// // Execute the future on rt2. /// rt2.block_on(fut); /// ``` /// /// [`TokioContext`]: struct@crate::context::TokioContext /// [`sleep`]: fn@tokio::time::sleep /// [enables time]: fn@tokio::runtime::Builder::enable_time fn wrap(&self, fut: F) -> TokioContext; } impl RuntimeExt for Runtime { fn wrap(&self, fut: F) -> TokioContext { TokioContext { inner: fut, handle: self.handle().clone(), } } }