summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/blocking
diff options
context:
space:
mode:
authorJon Gjengset <jon@thesquareplanet.com>2020-02-04 11:27:18 -0500
committerGitHub <noreply@github.com>2020-02-04 11:27:18 -0500
commit55b5e1b6adcc3f85a35d16444e496b203648c99d (patch)
treeccdee17dc2c5341eea5d318e02412e450c91626a /tokio/src/runtime/blocking
parent513671f8dece002191feb4f2b1a97bd66306350c (diff)
Fix #2119 and failing state assertion (#2212)
Add a test for #2119 and failing state assertion, and a fix to go with it. Co-authored-by: Carl Lerche <me@carllerche.com>
Diffstat (limited to 'tokio/src/runtime/blocking')
-rw-r--r--tokio/src/runtime/blocking/mod.rs2
-rw-r--r--tokio/src/runtime/blocking/pool.rs22
2 files changed, 17 insertions, 7 deletions
diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs
index ff400b33..5c808335 100644
--- a/tokio/src/runtime/blocking/mod.rs
+++ b/tokio/src/runtime/blocking/mod.rs
@@ -5,7 +5,7 @@
cfg_blocking_impl! {
mod pool;
- pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner};
+ pub(crate) use pool::{spawn_blocking, try_spawn_blocking, BlockingPool, Spawner};
mod schedule;
mod shutdown;
diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs
index 1784312d..0b9d2209 100644
--- a/tokio/src/runtime/blocking/pool.rs
+++ b/tokio/src/runtime/blocking/pool.rs
@@ -65,10 +65,21 @@ where
let rt = Handle::current();
let (task, handle) = task::joinable(BlockingTask::new(func));
- rt.blocking_spawner.spawn(task, &rt);
+ let _ = rt.blocking_spawner.spawn(task, &rt);
handle
}
+#[allow(dead_code)]
+pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()>
+where
+ F: FnOnce() -> R + Send + 'static,
+{
+ let rt = Handle::current();
+
+ let (task, _handle) = task::joinable(BlockingTask::new(func));
+ rt.blocking_spawner.spawn(task, &rt)
+}
+
// ===== impl BlockingPool =====
impl BlockingPool {
@@ -137,7 +148,7 @@ impl fmt::Debug for BlockingPool {
// ===== impl Spawner =====
impl Spawner {
- fn spawn(&self, task: Task, rt: &Handle) {
+ fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
let shutdown_tx = {
let mut shared = self.inner.shared.lock().unwrap();
@@ -146,7 +157,7 @@ impl Spawner {
task.shutdown();
// no need to even push this task; it would never get picked up
- return;
+ return Err(());
}
shared.queue.push_back(task);
@@ -178,6 +189,8 @@ impl Spawner {
if let Some(shutdown_tx) = shutdown_tx {
self.spawn_thread(shutdown_tx, rt);
}
+
+ Ok(())
}
fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) {
@@ -217,9 +230,6 @@ impl Inner {
run_task(task);
shared = self.shared.lock().unwrap();
- if shared.shutdown {
- break; // Need to increment idle before we exit
- }
}
// IDLE