diff options
author | Jon Gjengset <jon@thesquareplanet.com> | 2020-02-04 11:27:18 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-04 11:27:18 -0500 |
commit | 55b5e1b6adcc3f85a35d16444e496b203648c99d (patch) | |
tree | ccdee17dc2c5341eea5d318e02412e450c91626a /tokio/src/runtime/blocking | |
parent | 513671f8dece002191feb4f2b1a97bd66306350c (diff) |
Fix #2119 and failing state assertion (#2212)
Add a test for #2119 and failing state assertion,
and a fix to go with it.
Co-authored-by: Carl Lerche <me@carllerche.com>
Diffstat (limited to 'tokio/src/runtime/blocking')
-rw-r--r-- | tokio/src/runtime/blocking/mod.rs | 2 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/pool.rs | 22 |
2 files changed, 17 insertions, 7 deletions
diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index ff400b33..5c808335 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -5,7 +5,7 @@ cfg_blocking_impl! { mod pool; - pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner}; + pub(crate) use pool::{spawn_blocking, try_spawn_blocking, BlockingPool, Spawner}; mod schedule; mod shutdown; diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 1784312d..0b9d2209 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -65,10 +65,21 @@ where let rt = Handle::current(); let (task, handle) = task::joinable(BlockingTask::new(func)); - rt.blocking_spawner.spawn(task, &rt); + let _ = rt.blocking_spawner.spawn(task, &rt); handle } +#[allow(dead_code)] +pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()> +where + F: FnOnce() -> R + Send + 'static, +{ + let rt = Handle::current(); + + let (task, _handle) = task::joinable(BlockingTask::new(func)); + rt.blocking_spawner.spawn(task, &rt) +} + // ===== impl BlockingPool ===== impl BlockingPool { @@ -137,7 +148,7 @@ impl fmt::Debug for BlockingPool { // ===== impl Spawner ===== impl Spawner { - fn spawn(&self, task: Task, rt: &Handle) { + fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> { let shutdown_tx = { let mut shared = self.inner.shared.lock().unwrap(); @@ -146,7 +157,7 @@ impl Spawner { task.shutdown(); // no need to even push this task; it would never get picked up - return; + return Err(()); } shared.queue.push_back(task); @@ -178,6 +189,8 @@ impl Spawner { if let Some(shutdown_tx) = shutdown_tx { self.spawn_thread(shutdown_tx, rt); } + + Ok(()) } fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) { @@ -217,9 +230,6 @@ impl Inner { run_task(task); shared = self.shared.lock().unwrap(); - if shared.shutdown { - break; // Need to increment idle before we exit - } } // IDLE |