diff options
-rw-r--r-- | tokio-util/src/context.rs | 154 |
1 files changed, 133 insertions, 21 deletions
diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs index 990c0f14..a7a5e029 100644 --- a/tokio-util/src/context.rs +++ b/tokio-util/src/context.rs @@ -12,21 +12,123 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use tokio::runtime::Runtime; +use tokio::runtime::{Handle, Runtime}; pin_project! { - /// `TokioContext` allows connecting a custom executor with the tokio runtime. + /// `TokioContext` allows running futures that must be inside Tokio's + /// context on a non-Tokio runtime. /// - /// It contains a `Handle` to the runtime. A handle to the runtime can be - /// obtain by calling the `Runtime::handle()` method. - pub struct TokioContext<'a, F> { + /// It contains a [`Handle`] to the runtime. A handle to the runtime can be + /// obtain by calling the [`Runtime::handle()`] method. + /// + /// Note that the `TokioContext` wrapper only works if the `Runtime` it is + /// connected to has not yet been destroyed. You must keep the `Runtime` + /// alive until the future has finished executing. + /// + /// **Warning:** If `TokioContext` is used together with a [current thread] + /// runtime, that runtime must be inside a call to `block_on` for the + /// wrapped future to work. For this reason, it is recommended to use a + /// [multi thread] runtime, even if you configure it to only spawn one + /// worker thread. + /// + /// # Examples + /// + /// This example creates two runtimes, but only [enables time] on one of + /// them. It then uses the context of the runtime with the timer enabled to + /// execute a [`sleep`] future on the runtime with timing disabled. + /// ``` + /// use tokio::time::{sleep, Duration}; + /// use tokio_util::context::RuntimeExt; + /// + /// // This runtime has timers enabled. + /// let rt = tokio::runtime::Builder::new_multi_thread() + /// .enable_all() + /// .build() + /// .unwrap(); + /// + /// // This runtime has timers disabled. + /// let rt2 = tokio::runtime::Builder::new_multi_thread() + /// .build() + /// .unwrap(); + /// + /// // Wrap the sleep future in the context of rt. + /// let fut = rt.wrap(async { sleep(Duration::from_millis(2)).await }); + /// + /// // Execute the future on rt2. + /// rt2.block_on(fut); + /// ``` + /// + /// [`Handle`]: struct@tokio::runtime::Handle + /// [`Runtime::handle()`]: fn@tokio::runtime::Runtime::handle + /// [`RuntimeExt`]: trait@crate::context::RuntimeExt + /// [`new_static`]: fn@Self::new_static + /// [`sleep`]: fn@tokio::time::sleep + /// [current thread]: fn@tokio::runtime::Builder::new_current_thread + /// [enables time]: fn@tokio::runtime::Builder::enable_time + /// [multi thread]: fn@tokio::runtime::Builder::new_multi_thread + pub struct TokioContext<F> { #[pin] inner: F, - handle: &'a Runtime, + handle: Handle, + } +} + +impl<F> TokioContext<F> { + /// Associate the provided future with the context of the runtime behind + /// the provided `Handle`. + /// + /// This constructor uses a `'static` lifetime to opt-out of checking that + /// the runtime still exists. + /// + /// # Examples + /// + /// This is the same as the example above, but uses the `new` constructor + /// rather than [`RuntimeExt::wrap`]. + /// + /// [`RuntimeExt::wrap`]: fn@RuntimeExt::wrap + /// + /// ``` + /// use tokio::time::{sleep, Duration}; + /// use tokio_util::context::TokioContext; + /// + /// // This runtime has timers enabled. + /// let rt = tokio::runtime::Builder::new_multi_thread() + /// .enable_all() + /// .build() + /// .unwrap(); + /// + /// // This runtime has timers disabled. + /// let rt2 = tokio::runtime::Builder::new_multi_thread() + /// .build() + /// .unwrap(); + /// + /// let fut = TokioContext::new( + /// async { sleep(Duration::from_millis(2)).await }, + /// rt.handle().clone(), + /// ); + /// + /// // Execute the future on rt2. + /// rt2.block_on(fut); + /// ``` + pub fn new(future: F, handle: Handle) -> TokioContext<F> { + TokioContext { + inner: future, + handle, + } + } + + /// Obtain a reference to the handle inside this `TokioContext`. + pub fn handle(&self) -> &Handle { + &self.handle + } + + /// Remove the association between the Tokio runtime and the wrapped future. + pub fn into_inner(self) -> F { + self.inner } } -impl<F: Future> Future for TokioContext<'_, F> { +impl<F: Future> Future for TokioContext<F> { type Output = F::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { @@ -39,40 +141,50 @@ impl<F: Future> Future for TokioContext<'_, F> { } } -/// Trait extension that simplifies bundling a `Handle` with a `Future`. +/// Extension trait that simplifies bundling a `Handle` with a `Future`. pub trait RuntimeExt { - /// Convenience method that takes a Future and returns a `TokioContext`. + /// Create a [`TokioContext`] that wraps the provided future and runs it in + /// this runtime's context. /// - /// # Example: calling Tokio Runtime from a custom ThreadPool + /// # Examples /// - /// ```no_run - /// use tokio_util::context::RuntimeExt; + /// This example creates two runtimes, but only [enables time] on one of + /// them. It then uses the context of the runtime with the timer enabled to + /// execute a [`sleep`] future on the runtime with timing disabled. + /// + /// ``` /// use tokio::time::{sleep, Duration}; + /// use tokio_util::context::RuntimeExt; /// + /// // This runtime has timers enabled. /// let rt = tokio::runtime::Builder::new_multi_thread() /// .enable_all() /// .build() /// .unwrap(); /// + /// // This runtime has timers disabled. /// let rt2 = tokio::runtime::Builder::new_multi_thread() /// .build() /// .unwrap(); /// - /// let fut = sleep(Duration::from_millis(2)); + /// // Wrap the sleep future in the context of rt. + /// let fut = rt.wrap(async { sleep(Duration::from_millis(2)).await }); /// - /// rt.block_on( - /// rt2 - /// .wrap(async { sleep(Duration::from_millis(2)).await }), - /// ); - ///``` - fn wrap<F: Future>(&self, fut: F) -> TokioContext<'_, F>; + /// // Execute the future on rt2. + /// rt2.block_on(fut); + /// ``` + /// + /// [`TokioContext`]: struct@crate::context::TokioContext + /// [`sleep`]: fn@tokio::time::sleep + /// [enables time]: fn@tokio::runtime::Builder::enable_time + fn wrap<F: Future>(&self, fut: F) -> TokioContext<F>; } impl RuntimeExt for Runtime { - fn wrap<F: Future>(&self, fut: F) -> TokioContext<'_, F> { + fn wrap<F: Future>(&self, fut: F) -> TokioContext<F> { TokioContext { inner: fut, - handle: self, + handle: self.handle().clone(), } } } |