From d9d909cb4c6d326423ee02fbcf6bbfe5553d2c0a Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Thu, 27 Aug 2020 17:33:43 +0200 Subject: util: Add `TokioContext` future (#2791) Co-authored-by: Lucio Franco --- tokio-util/src/context.rs | 78 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 tokio-util/src/context.rs (limited to 'tokio-util/src/context.rs') diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs new file mode 100644 index 00000000..f6289093 --- /dev/null +++ b/tokio-util/src/context.rs @@ -0,0 +1,78 @@ +//! Tokio context aware futures utilities. +//! +//! This module includes utilities around integrating tokio with other runtimes +//! by allowing the context to be attached to futures. This allows spawning +//! futures on other executors while still using tokio to drive them. This +//! can be useful if you need to use a tokio based library in an executor/runtime +//! that does not provide a tokio context. + +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::runtime::Handle; + +pin_project! { + /// `TokioContext` allows connecting a custom executor with the tokio runtime. + /// + /// It contains a `Handle` to the runtime. A handle to the runtime can be + /// obtain by calling the `Runtime::handle()` method. + pub struct TokioContext { + #[pin] + inner: F, + handle: Handle, + } +} + +impl Future for TokioContext { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + let handle = me.handle; + let fut = me.inner; + + handle.enter(|| fut.poll(cx)) + } +} + +/// Trait extension that simplifies bundling a `Handle` with a `Future`. +pub trait HandleExt { + /// Convenience method that takes a Future and returns a `TokioContext`. + /// + /// # Example: calling Tokio Runtime from a custom ThreadPool + /// + /// ```no_run + /// use tokio_util::context::HandleExt; + /// use tokio::time::{delay_for, Duration}; + /// + /// let mut rt = tokio::runtime::Builder::new() + /// .threaded_scheduler() + /// .enable_all() + /// .build().unwrap(); + /// + /// let rt2 = tokio::runtime::Builder::new() + /// .threaded_scheduler() + /// .build().unwrap(); + /// + /// let fut = delay_for(Duration::from_millis(2)); + /// + /// rt.block_on( + /// rt2 + /// .handle() + /// .wrap(async { delay_for(Duration::from_millis(2)).await }), + /// ); + ///``` + fn wrap(&self, fut: F) -> TokioContext; +} + +impl HandleExt for Handle { + fn wrap(&self, fut: F) -> TokioContext { + TokioContext { + inner: fut, + handle: self.clone(), + } + } +} -- cgit v1.2.3