summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--tokio-util/src/context.rs154
1 files changed, 133 insertions, 21 deletions
diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs
index 990c0f14..a7a5e029 100644
--- a/tokio-util/src/context.rs
+++ b/tokio-util/src/context.rs
@@ -12,21 +12,123 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
-use tokio::runtime::Runtime;
+use tokio::runtime::{Handle, Runtime};
pin_project! {
- /// `TokioContext` allows connecting a custom executor with the tokio runtime.
+ /// `TokioContext` allows running futures that must be inside Tokio's
+ /// context on a non-Tokio runtime.
///
- /// It contains a `Handle` to the runtime. A handle to the runtime can be
- /// obtain by calling the `Runtime::handle()` method.
- pub struct TokioContext<'a, F> {
+ /// It contains a [`Handle`] to the runtime. A handle to the runtime can be
+ /// obtain by calling the [`Runtime::handle()`] method.
+ ///
+ /// Note that the `TokioContext` wrapper only works if the `Runtime` it is
+ /// connected to has not yet been destroyed. You must keep the `Runtime`
+ /// alive until the future has finished executing.
+ ///
+ /// **Warning:** If `TokioContext` is used together with a [current thread]
+ /// runtime, that runtime must be inside a call to `block_on` for the
+ /// wrapped future to work. For this reason, it is recommended to use a
+ /// [multi thread] runtime, even if you configure it to only spawn one
+ /// worker thread.
+ ///
+ /// # Examples
+ ///
+ /// This example creates two runtimes, but only [enables time] on one of
+ /// them. It then uses the context of the runtime with the timer enabled to
+ /// execute a [`sleep`] future on the runtime with timing disabled.
+ /// ```
+ /// use tokio::time::{sleep, Duration};
+ /// use tokio_util::context::RuntimeExt;
+ ///
+ /// // This runtime has timers enabled.
+ /// let rt = tokio::runtime::Builder::new_multi_thread()
+ /// .enable_all()
+ /// .build()
+ /// .unwrap();
+ ///
+ /// // This runtime has timers disabled.
+ /// let rt2 = tokio::runtime::Builder::new_multi_thread()
+ /// .build()
+ /// .unwrap();
+ ///
+ /// // Wrap the sleep future in the context of rt.
+ /// let fut = rt.wrap(async { sleep(Duration::from_millis(2)).await });
+ ///
+ /// // Execute the future on rt2.
+ /// rt2.block_on(fut);
+ /// ```
+ ///
+ /// [`Handle`]: struct@tokio::runtime::Handle
+ /// [`Runtime::handle()`]: fn@tokio::runtime::Runtime::handle
+ /// [`RuntimeExt`]: trait@crate::context::RuntimeExt
+ /// [`new_static`]: fn@Self::new_static
+ /// [`sleep`]: fn@tokio::time::sleep
+ /// [current thread]: fn@tokio::runtime::Builder::new_current_thread
+ /// [enables time]: fn@tokio::runtime::Builder::enable_time
+ /// [multi thread]: fn@tokio::runtime::Builder::new_multi_thread
+ pub struct TokioContext<F> {
#[pin]
inner: F,
- handle: &'a Runtime,
+ handle: Handle,
+ }
+}
+
+impl<F> TokioContext<F> {
+ /// Associate the provided future with the context of the runtime behind
+ /// the provided `Handle`.
+ ///
+ /// This constructor uses a `'static` lifetime to opt-out of checking that
+ /// the runtime still exists.
+ ///
+ /// # Examples
+ ///
+ /// This is the same as the example above, but uses the `new` constructor
+ /// rather than [`RuntimeExt::wrap`].
+ ///
+ /// [`RuntimeExt::wrap`]: fn@RuntimeExt::wrap
+ ///
+ /// ```
+ /// use tokio::time::{sleep, Duration};
+ /// use tokio_util::context::TokioContext;
+ ///
+ /// // This runtime has timers enabled.
+ /// let rt = tokio::runtime::Builder::new_multi_thread()
+ /// .enable_all()
+ /// .build()
+ /// .unwrap();
+ ///
+ /// // This runtime has timers disabled.
+ /// let rt2 = tokio::runtime::Builder::new_multi_thread()
+ /// .build()
+ /// .unwrap();
+ ///
+ /// let fut = TokioContext::new(
+ /// async { sleep(Duration::from_millis(2)).await },
+ /// rt.handle().clone(),
+ /// );
+ ///
+ /// // Execute the future on rt2.
+ /// rt2.block_on(fut);
+ /// ```
+ pub fn new(future: F, handle: Handle) -> TokioContext<F> {
+ TokioContext {
+ inner: future,
+ handle,
+ }
+ }
+
+ /// Obtain a reference to the handle inside this `TokioContext`.
+ pub fn handle(&self) -> &Handle {
+ &self.handle
+ }
+
+ /// Remove the association between the Tokio runtime and the wrapped future.
+ pub fn into_inner(self) -> F {
+ self.inner
}
}
-impl<F: Future> Future for TokioContext<'_, F> {
+impl<F: Future> Future for TokioContext<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -39,40 +141,50 @@ impl<F: Future> Future for TokioContext<'_, F> {
}
}
-/// Trait extension that simplifies bundling a `Handle` with a `Future`.
+/// Extension trait that simplifies bundling a `Handle` with a `Future`.
pub trait RuntimeExt {
- /// Convenience method that takes a Future and returns a `TokioContext`.
+ /// Create a [`TokioContext`] that wraps the provided future and runs it in
+ /// this runtime's context.
///
- /// # Example: calling Tokio Runtime from a custom ThreadPool
+ /// # Examples
///
- /// ```no_run
- /// use tokio_util::context::RuntimeExt;
+ /// This example creates two runtimes, but only [enables time] on one of
+ /// them. It then uses the context of the runtime with the timer enabled to
+ /// execute a [`sleep`] future on the runtime with timing disabled.
+ ///
+ /// ```
/// use tokio::time::{sleep, Duration};
+ /// use tokio_util::context::RuntimeExt;
///
+ /// // This runtime has timers enabled.
/// let rt = tokio::runtime::Builder::new_multi_thread()
/// .enable_all()
/// .build()
/// .unwrap();
///
+ /// // This runtime has timers disabled.
/// let rt2 = tokio::runtime::Builder::new_multi_thread()
/// .build()
/// .unwrap();
///
- /// let fut = sleep(Duration::from_millis(2));
+ /// // Wrap the sleep future in the context of rt.
+ /// let fut = rt.wrap(async { sleep(Duration::from_millis(2)).await });
///
- /// rt.block_on(
- /// rt2
- /// .wrap(async { sleep(Duration::from_millis(2)).await }),
- /// );
- ///```
- fn wrap<F: Future>(&self, fut: F) -> TokioContext<'_, F>;
+ /// // Execute the future on rt2.
+ /// rt2.block_on(fut);
+ /// ```
+ ///
+ /// [`TokioContext`]: struct@crate::context::TokioContext
+ /// [`sleep`]: fn@tokio::time::sleep
+ /// [enables time]: fn@tokio::runtime::Builder::enable_time
+ fn wrap<F: Future>(&self, fut: F) -> TokioContext<F>;
}
impl RuntimeExt for Runtime {
- fn wrap<F: Future>(&self, fut: F) -> TokioContext<'_, F> {
+ fn wrap<F: Future>(&self, fut: F) -> TokioContext<F> {
TokioContext {
inner: fut,
- handle: self,
+ handle: self.handle().clone(),
}
}
}