summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAdam C. Foltzer <acfoltzer@acfoltzer.net>2020-05-07 16:24:24 -0700
committerGitHub <noreply@github.com>2020-05-07 16:24:24 -0700
commit07533a5255a6516b6e92c45e571a9ba497cb25d4 (patch)
tree3183522ed4b0fd921e48067c6f7b63a58f8e66c6
parent4748b2571fc02d5ebbfe59e457f0e8d8ef0eb5f3 (diff)
rt: add Handle::spawn_blocking method (#2501)
This follows a similar pattern to `Handle::spawn` to add the blocking spawn capabilities to `Handle`.
-rw-r--r--tokio/src/runtime/blocking/mod.rs2
-rw-r--r--tokio/src/runtime/blocking/pool.rs2
-rw-r--r--tokio/src/runtime/blocking/schedule.rs2
-rw-r--r--tokio/src/runtime/blocking/task.rs4
-rw-r--r--tokio/src/runtime/handle.rs78
5 files changed, 83 insertions, 5 deletions
diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs
index 5c808335..0b36a75f 100644
--- a/tokio/src/runtime/blocking/mod.rs
+++ b/tokio/src/runtime/blocking/mod.rs
@@ -9,7 +9,7 @@ cfg_blocking_impl! {
mod schedule;
mod shutdown;
- mod task;
+ pub(crate) mod task;
use crate::runtime::Builder;
diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs
index a3b208d1..40d417b1 100644
--- a/tokio/src/runtime/blocking/pool.rs
+++ b/tokio/src/runtime/blocking/pool.rs
@@ -148,7 +148,7 @@ impl fmt::Debug for BlockingPool {
// ===== impl Spawner =====
impl Spawner {
- fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
+ pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
let shutdown_tx = {
let mut shared = self.inner.shared.lock().unwrap();
diff --git a/tokio/src/runtime/blocking/schedule.rs b/tokio/src/runtime/blocking/schedule.rs
index e10778d5..4e044ab2 100644
--- a/tokio/src/runtime/blocking/schedule.rs
+++ b/tokio/src/runtime/blocking/schedule.rs
@@ -6,7 +6,7 @@ use crate::runtime::task::{self, Task};
///
/// We avoid storing the task by forgetting it in `bind` and re-materializing it
/// in `release.
-pub(super) struct NoopSchedule;
+pub(crate) struct NoopSchedule;
impl task::Schedule for NoopSchedule {
fn bind(_task: Task<Self>) -> NoopSchedule {
diff --git a/tokio/src/runtime/blocking/task.rs b/tokio/src/runtime/blocking/task.rs
index f98b8549..e0ae6e4e 100644
--- a/tokio/src/runtime/blocking/task.rs
+++ b/tokio/src/runtime/blocking/task.rs
@@ -3,13 +3,13 @@ use std::pin::Pin;
use std::task::{Context, Poll};
/// Converts a function to a future that completes on poll
-pub(super) struct BlockingTask<T> {
+pub(crate) struct BlockingTask<T> {
func: Option<T>,
}
impl<T> BlockingTask<T> {
/// Initializes a new blocking task from the given function
- pub(super) fn new(func: T) -> BlockingTask<T> {
+ pub(crate) fn new(func: T) -> BlockingTask<T> {
BlockingTask { func: Some(func) }
}
}
diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs
index a5761b6a..c356f053 100644
--- a/tokio/src/runtime/handle.rs
+++ b/tokio/src/runtime/handle.rs
@@ -1,6 +1,11 @@
use crate::runtime::{blocking, context, io, time, Spawner};
use std::{error, fmt};
+cfg_blocking! {
+ use crate::runtime::task;
+ use crate::runtime::blocking::task::BlockingTask;
+}
+
cfg_rt_core! {
use crate::task::JoinHandle;
@@ -263,6 +268,79 @@ cfg_rt_core! {
}
}
+cfg_blocking! {
+ impl Handle {
+ /// Runs the provided closure on a thread where blocking is acceptable.
+ ///
+ /// In general, issuing a blocking call or performing a lot of compute in a
+ /// future without yielding is not okay, as it may prevent the executor from
+ /// driving other futures forward. This function runs the provided closure
+ /// on a thread dedicated to blocking operations. See the [CPU-bound tasks
+ /// and blocking code][blocking] section for more information.
+ ///
+ /// Tokio will spawn more blocking threads when they are requested through
+ /// this function until the upper limit configured on the [`Builder`] is
+ /// reached. This limit is very large by default, because `spawn_blocking` is
+ /// often used for various kinds of IO operations that cannot be performed
+ /// asynchronously. When you run CPU-bound code using `spawn_blocking`, you
+ /// should keep this large upper limit in mind; to run your CPU-bound
+ /// computations on only a few threads, you should use a separate thread
+ /// pool such as [rayon] rather than configuring the number of blocking
+ /// threads.
+ ///
+ /// This function is intended for non-async operations that eventually
+ /// finish on their own. If you want to spawn an ordinary thread, you should
+ /// use [`thread::spawn`] instead.
+ ///
+ /// Closures spawned using `spawn_blocking` cannot be cancelled. When you
+ /// shut down the executor, it will wait indefinitely for all blocking
+ /// operations to finish. You can use [`shutdown_timeout`] to stop waiting
+ /// for them after a certain timeout. Be aware that this will still not
+ /// cancel the tasks — they are simply allowed to keep running after the
+ /// method returns.
+ ///
+ /// Note that if you are using the [basic scheduler], this function will
+ /// still spawn additional threads for blocking operations. The basic
+ /// scheduler's single thread is only used for asynchronous code.
+ ///
+ /// [`Builder`]: struct@crate::runtime::Builder
+ /// [blocking]: ../index.html#cpu-bound-tasks-and-blocking-code
+ /// [rayon]: https://docs.rs/rayon
+ /// [basic scheduler]: fn@crate::runtime::Builder::basic_scheduler
+ /// [`thread::spawn`]: fn@std::thread::spawn
+ /// [`shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ ///
+ /// # async fn docs() -> Result<(), Box<dyn std::error::Error>>{
+ /// // Create the runtime
+ /// let rt = Runtime::new().unwrap();
+ /// let handle = rt.handle();
+ ///
+ /// let res = handle.spawn_blocking(move || {
+ /// // do some compute-heavy work or call synchronous code
+ /// "done computing"
+ /// }).await?;
+ ///
+ /// assert_eq!(res, "done computing");
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn spawn_blocking<F, R>(&self, f: F) -> JoinHandle<R>
+ where
+ F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
+ {
+ let (task, handle) = task::joinable(BlockingTask::new(f));
+ let _ = self.blocking_spawner.spawn(task, self);
+ handle
+ }
+ }
+}
+
/// Error returned by `try_current` when no Runtime has been started
pub struct TryCurrentError(());