summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJohn-John Tedro <udoprog@tedro.se>2020-12-04 06:29:59 +0100
committerGitHub <noreply@github.com>2020-12-03 21:29:59 -0800
commita125ebd745f31098aa170cb1009ff0fe34508d37 (patch)
tree8dab5d17383a5f63f7554ec009cf6e1408c46d96
parent00500d1b35f00c68117d8f4e7320303e967e92e3 (diff)
rt: fix panic in task abort when off rt (#3159)
A call to `JoinHandle::abort` releases a task. When called from outside of the runtime, this panics due to the current implementation checking for a thread-local worker context. This change makes accessing the thread-local context optional under release, by falling back to remotely marking a task remotely as dropped. Behaving the same as if the core was stolen by another worker. Fixes #3157
-rw-r--r--tokio/src/runtime/thread_pool/worker.rs94
-rw-r--r--tokio/tests/task_abort.rs26
2 files changed, 83 insertions, 37 deletions
diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs
index bc544c9b..400ddee3 100644
--- a/tokio/src/runtime/thread_pool/worker.rs
+++ b/tokio/src/runtime/thread_pool/worker.rs
@@ -629,52 +629,72 @@ impl task::Schedule for Arc<Worker> {
fn release(&self, task: &Task) -> Option<Task> {
use std::ptr::NonNull;
- CURRENT.with(|maybe_cx| {
- let cx = maybe_cx.expect("scheduler context missing");
+ enum Immediate {
+ Removed(Option<Task>),
+ Core(bool),
+ }
- if self.eq(&cx.worker) {
- let mut maybe_core = cx.core.borrow_mut();
+ let immediate = CURRENT.with(|maybe_cx| {
+ let cx = match maybe_cx {
+ Some(cx) => cx,
+ None => return Immediate::Core(false),
+ };
- if let Some(core) = &mut *maybe_core {
- // Directly remove the task
- //
- // safety: the task is inserted in the list in `bind`.
- unsafe {
- let ptr = NonNull::from(task.header());
- return core.tasks.remove(ptr);
- }
+ if !self.eq(&cx.worker) {
+ return Immediate::Core(cx.core.borrow().is_some());
+ }
+
+ let mut maybe_core = cx.core.borrow_mut();
+
+ if let Some(core) = &mut *maybe_core {
+ // Directly remove the task
+ //
+ // safety: the task is inserted in the list in `bind`.
+ unsafe {
+ let ptr = NonNull::from(task.header());
+ return Immediate::Removed(core.tasks.remove(ptr));
}
}
- // Track the task to be released by the worker that owns it
- //
- // Safety: We get a new handle without incrementing the ref-count.
- // A ref-count is held by the "owned" linked list and it is only
- // ever removed from that list as part of the release process: this
- // method or popping the task from `pending_drop`. Thus, we can rely
- // on the ref-count held by the linked-list to keep the memory
- // alive.
- //
- // When the task is removed from the stack, it is forgotten instead
- // of dropped.
- let task = unsafe { Task::from_raw(task.header().into()) };
+ Immediate::Core(false)
+ });
- self.remote().pending_drop.push(task);
+ // Checks if we were called from within a worker, allowing for immediate
+ // removal of a scheduled task. Else we have to go through the slower
+ // process below where we remotely mark a task as dropped.
+ let worker_has_core = match immediate {
+ Immediate::Removed(task) => return task,
+ Immediate::Core(worker_has_core) => worker_has_core,
+ };
- if cx.core.borrow().is_some() {
- return None;
- }
+ // Track the task to be released by the worker that owns it
+ //
+ // Safety: We get a new handle without incrementing the ref-count.
+ // A ref-count is held by the "owned" linked list and it is only
+ // ever removed from that list as part of the release process: this
+ // method or popping the task from `pending_drop`. Thus, we can rely
+ // on the ref-count held by the linked-list to keep the memory
+ // alive.
+ //
+ // When the task is removed from the stack, it is forgotten instead
+ // of dropped.
+ let task = unsafe { Task::from_raw(task.header().into()) };
- // The worker core has been handed off to another thread. In the
- // event that the scheduler is currently shutting down, the thread
- // that owns the task may be waiting on the release to complete
- // shutdown.
- if self.inject().is_closed() {
- self.remote().unpark.unpark();
- }
+ self.remote().pending_drop.push(task);
- None
- })
+ if worker_has_core {
+ return None;
+ }
+
+ // The worker core has been handed off to another thread. In the
+ // event that the scheduler is currently shutting down, the thread
+ // that owns the task may be waiting on the release to complete
+ // shutdown.
+ if self.inject().is_closed() {
+ self.remote().unpark.unpark();
+ }
+
+ None
}
fn schedule(&self, task: Notified) {
diff --git a/tokio/tests/task_abort.rs b/tokio/tests/task_abort.rs
new file mode 100644
index 00000000..e84f19c3
--- /dev/null
+++ b/tokio/tests/task_abort.rs
@@ -0,0 +1,26 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+/// Checks that a suspended task can be aborted without panicking as reported in
+/// issue #3157: <https://github.com/tokio-rs/tokio/issues/3157>.
+#[test]
+fn test_abort_without_panic_3157() {
+ let rt = tokio::runtime::Builder::new_multi_thread()
+ .enable_time()
+ .worker_threads(1)
+ .build()
+ .unwrap();
+
+ rt.block_on(async move {
+ let handle = tokio::spawn(async move {
+ println!("task started");
+ tokio::time::sleep(std::time::Duration::new(100, 0)).await
+ });
+
+ // wait for task to sleep.
+ tokio::time::sleep(std::time::Duration::new(1, 0)).await;
+
+ handle.abort();
+ let _ = handle.await;
+ });
+}