summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-12-17 21:24:26 -0800
committerGitHub <noreply@github.com>2019-12-17 21:24:26 -0800
commit83cd754bc80dc8718b65fd32f54e53b4d7ba8332 (patch)
treee36b9b429a1f552510f85e983cbede3b70fcf5ed /tokio
parent17e424112d53385142aa430641910c384c4cbe5a (diff)
rt: fix blocking pool shutdown logic (#1978)
The blocking task queue was not explicitly drained as part of the blocking pool shutdown logic. It was originally assumed that the contents of the queue would be dropped when the blocking pool structure is dropped. However, tasks must be explicitly shutdown, so we must drain the queue can call `shutdown` on each task. Fixes #1970, #1946
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/runtime/blocking/pool.rs12
-rw-r--r--tokio/src/runtime/tests/loom_blocking.rs31
-rw-r--r--tokio/src/runtime/tests/mod.rs3
3 files changed, 45 insertions, 1 deletions
diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs
index 3e7d401b..a25f1549 100644
--- a/tokio/src/runtime/blocking/pool.rs
+++ b/tokio/src/runtime/blocking/pool.rs
@@ -303,7 +303,9 @@ impl Inner {
break;
}
- if timeout_result.timed_out() {
+ // Even if the condvar "timed out", if the pool is entering the
+ // shutdown phase, we want to perform the cleanup logic.
+ if !shared.shutdown && timeout_result.timed_out() {
break 'main;
}
@@ -311,6 +313,14 @@ impl Inner {
}
if shared.shutdown {
+ // Drain the queue
+ while let Some(task) = shared.queue.pop_front() {
+ drop(shared);
+ task.shutdown();
+
+ shared = self.shared.lock().unwrap();
+ }
+
// Work was produced, and we "took" it (by decrementing num_notify).
// This means that num_idle was decremented once for our wakeup.
// But, since we are exiting, we need to "undo" that, as we'll stay idle.
diff --git a/tokio/src/runtime/tests/loom_blocking.rs b/tokio/src/runtime/tests/loom_blocking.rs
new file mode 100644
index 00000000..85e6fb12
--- /dev/null
+++ b/tokio/src/runtime/tests/loom_blocking.rs
@@ -0,0 +1,31 @@
+use crate::runtime::{self, Runtime};
+
+use std::sync::Arc;
+
+#[test]
+fn blocking_shutdown() {
+ loom::model(|| {
+ let v = Arc::new(());
+
+ let rt = mk_runtime(1);
+ rt.enter(|| {
+ for _ in 0..2 {
+ let v = v.clone();
+ crate::task::spawn_blocking(move || {
+ assert!(1 < Arc::strong_count(&v));
+ });
+ }
+ });
+
+ drop(rt);
+ assert_eq!(1, Arc::strong_count(&v));
+ });
+}
+
+fn mk_runtime(num_threads: usize) -> Runtime {
+ runtime::Builder::new()
+ .threaded_scheduler()
+ .num_threads(num_threads)
+ .build()
+ .unwrap()
+}
diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs
index a1910a44..da592f76 100644
--- a/tokio/src/runtime/tests/mod.rs
+++ b/tokio/src/runtime/tests/mod.rs
@@ -2,3 +2,6 @@
#[cfg(loom)]
pub(crate) mod loom_oneshot;
+
+#[cfg(loom)]
+pub(crate) mod loom_blocking;