diff options
-rw-r--r-- | tokio/src/runtime/blocking/mod.rs | 2 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/pool.rs | 2 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/schedule.rs | 2 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/task.rs | 4 | ||||
-rw-r--r-- | tokio/src/runtime/handle.rs | 78 |
5 files changed, 83 insertions, 5 deletions
diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index 5c808335..0b36a75f 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -9,7 +9,7 @@ cfg_blocking_impl! { mod schedule; mod shutdown; - mod task; + pub(crate) mod task; use crate::runtime::Builder; diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index a3b208d1..40d417b1 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -148,7 +148,7 @@ impl fmt::Debug for BlockingPool { // ===== impl Spawner ===== impl Spawner { - fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> { + pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> { let shutdown_tx = { let mut shared = self.inner.shared.lock().unwrap(); diff --git a/tokio/src/runtime/blocking/schedule.rs b/tokio/src/runtime/blocking/schedule.rs index e10778d5..4e044ab2 100644 --- a/tokio/src/runtime/blocking/schedule.rs +++ b/tokio/src/runtime/blocking/schedule.rs @@ -6,7 +6,7 @@ use crate::runtime::task::{self, Task}; /// /// We avoid storing the task by forgetting it in `bind` and re-materializing it /// in `release. -pub(super) struct NoopSchedule; +pub(crate) struct NoopSchedule; impl task::Schedule for NoopSchedule { fn bind(_task: Task<Self>) -> NoopSchedule { diff --git a/tokio/src/runtime/blocking/task.rs b/tokio/src/runtime/blocking/task.rs index f98b8549..e0ae6e4e 100644 --- a/tokio/src/runtime/blocking/task.rs +++ b/tokio/src/runtime/blocking/task.rs @@ -3,13 +3,13 @@ use std::pin::Pin; use std::task::{Context, Poll}; /// Converts a function to a future that completes on poll -pub(super) struct BlockingTask<T> { +pub(crate) struct BlockingTask<T> { func: Option<T>, } impl<T> BlockingTask<T> { /// Initializes a new blocking task from the given function - pub(super) fn new(func: T) -> BlockingTask<T> { + pub(crate) fn new(func: T) -> BlockingTask<T> { BlockingTask { func: Some(func) } } } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index a5761b6a..c356f053 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -1,6 +1,11 @@ use crate::runtime::{blocking, context, io, time, Spawner}; use std::{error, fmt}; +cfg_blocking! { + use crate::runtime::task; + use crate::runtime::blocking::task::BlockingTask; +} + cfg_rt_core! { use crate::task::JoinHandle; @@ -263,6 +268,79 @@ cfg_rt_core! { } } +cfg_blocking! { + impl Handle { + /// Runs the provided closure on a thread where blocking is acceptable. + /// + /// In general, issuing a blocking call or performing a lot of compute in a + /// future without yielding is not okay, as it may prevent the executor from + /// driving other futures forward. This function runs the provided closure + /// on a thread dedicated to blocking operations. See the [CPU-bound tasks + /// and blocking code][blocking] section for more information. + /// + /// Tokio will spawn more blocking threads when they are requested through + /// this function until the upper limit configured on the [`Builder`] is + /// reached. This limit is very large by default, because `spawn_blocking` is + /// often used for various kinds of IO operations that cannot be performed + /// asynchronously. When you run CPU-bound code using `spawn_blocking`, you + /// should keep this large upper limit in mind; to run your CPU-bound + /// computations on only a few threads, you should use a separate thread + /// pool such as [rayon] rather than configuring the number of blocking + /// threads. + /// + /// This function is intended for non-async operations that eventually + /// finish on their own. If you want to spawn an ordinary thread, you should + /// use [`thread::spawn`] instead. + /// + /// Closures spawned using `spawn_blocking` cannot be cancelled. When you + /// shut down the executor, it will wait indefinitely for all blocking + /// operations to finish. You can use [`shutdown_timeout`] to stop waiting + /// for them after a certain timeout. Be aware that this will still not + /// cancel the tasks — they are simply allowed to keep running after the + /// method returns. + /// + /// Note that if you are using the [basic scheduler], this function will + /// still spawn additional threads for blocking operations. The basic + /// scheduler's single thread is only used for asynchronous code. + /// + /// [`Builder`]: struct@crate::runtime::Builder + /// [blocking]: ../index.html#cpu-bound-tasks-and-blocking-code + /// [rayon]: https://docs.rs/rayon + /// [basic scheduler]: fn@crate::runtime::Builder::basic_scheduler + /// [`thread::spawn`]: fn@std::thread::spawn + /// [`shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// # async fn docs() -> Result<(), Box<dyn std::error::Error>>{ + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// let handle = rt.handle(); + /// + /// let res = handle.spawn_blocking(move || { + /// // do some compute-heavy work or call synchronous code + /// "done computing" + /// }).await?; + /// + /// assert_eq!(res, "done computing"); + /// # Ok(()) + /// # } + /// ``` + pub fn spawn_blocking<F, R>(&self, f: F) -> JoinHandle<R> + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let (task, handle) = task::joinable(BlockingTask::new(f)); + let _ = self.blocking_spawner.spawn(task, self); + handle + } + } +} + /// Error returned by `try_current` when no Runtime has been started pub struct TryCurrentError(()); |