diff options
author | John-John Tedro <udoprog@tedro.se> | 2020-12-04 06:29:59 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-03 21:29:59 -0800 |
commit | a125ebd745f31098aa170cb1009ff0fe34508d37 (patch) | |
tree | 8dab5d17383a5f63f7554ec009cf6e1408c46d96 | |
parent | 00500d1b35f00c68117d8f4e7320303e967e92e3 (diff) |
rt: fix panic in task abort when off rt (#3159)
A call to `JoinHandle::abort` releases a task. When called from outside of the runtime,
this panics due to the current implementation checking for a thread-local worker context.
This change makes accessing the thread-local context optional under release, by falling
back to remotely marking a task remotely as dropped. Behaving the same as if the core
was stolen by another worker.
Fixes #3157
-rw-r--r-- | tokio/src/runtime/thread_pool/worker.rs | 94 | ||||
-rw-r--r-- | tokio/tests/task_abort.rs | 26 |
2 files changed, 83 insertions, 37 deletions
diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index bc544c9b..400ddee3 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -629,52 +629,72 @@ impl task::Schedule for Arc<Worker> { fn release(&self, task: &Task) -> Option<Task> { use std::ptr::NonNull; - CURRENT.with(|maybe_cx| { - let cx = maybe_cx.expect("scheduler context missing"); + enum Immediate { + Removed(Option<Task>), + Core(bool), + } - if self.eq(&cx.worker) { - let mut maybe_core = cx.core.borrow_mut(); + let immediate = CURRENT.with(|maybe_cx| { + let cx = match maybe_cx { + Some(cx) => cx, + None => return Immediate::Core(false), + }; - if let Some(core) = &mut *maybe_core { - // Directly remove the task - // - // safety: the task is inserted in the list in `bind`. - unsafe { - let ptr = NonNull::from(task.header()); - return core.tasks.remove(ptr); - } + if !self.eq(&cx.worker) { + return Immediate::Core(cx.core.borrow().is_some()); + } + + let mut maybe_core = cx.core.borrow_mut(); + + if let Some(core) = &mut *maybe_core { + // Directly remove the task + // + // safety: the task is inserted in the list in `bind`. + unsafe { + let ptr = NonNull::from(task.header()); + return Immediate::Removed(core.tasks.remove(ptr)); } } - // Track the task to be released by the worker that owns it - // - // Safety: We get a new handle without incrementing the ref-count. - // A ref-count is held by the "owned" linked list and it is only - // ever removed from that list as part of the release process: this - // method or popping the task from `pending_drop`. Thus, we can rely - // on the ref-count held by the linked-list to keep the memory - // alive. - // - // When the task is removed from the stack, it is forgotten instead - // of dropped. - let task = unsafe { Task::from_raw(task.header().into()) }; + Immediate::Core(false) + }); - self.remote().pending_drop.push(task); + // Checks if we were called from within a worker, allowing for immediate + // removal of a scheduled task. Else we have to go through the slower + // process below where we remotely mark a task as dropped. + let worker_has_core = match immediate { + Immediate::Removed(task) => return task, + Immediate::Core(worker_has_core) => worker_has_core, + }; - if cx.core.borrow().is_some() { - return None; - } + // Track the task to be released by the worker that owns it + // + // Safety: We get a new handle without incrementing the ref-count. + // A ref-count is held by the "owned" linked list and it is only + // ever removed from that list as part of the release process: this + // method or popping the task from `pending_drop`. Thus, we can rely + // on the ref-count held by the linked-list to keep the memory + // alive. + // + // When the task is removed from the stack, it is forgotten instead + // of dropped. + let task = unsafe { Task::from_raw(task.header().into()) }; - // The worker core has been handed off to another thread. In the - // event that the scheduler is currently shutting down, the thread - // that owns the task may be waiting on the release to complete - // shutdown. - if self.inject().is_closed() { - self.remote().unpark.unpark(); - } + self.remote().pending_drop.push(task); - None - }) + if worker_has_core { + return None; + } + + // The worker core has been handed off to another thread. In the + // event that the scheduler is currently shutting down, the thread + // that owns the task may be waiting on the release to complete + // shutdown. + if self.inject().is_closed() { + self.remote().unpark.unpark(); + } + + None } fn schedule(&self, task: Notified) { diff --git a/tokio/tests/task_abort.rs b/tokio/tests/task_abort.rs new file mode 100644 index 00000000..e84f19c3 --- /dev/null +++ b/tokio/tests/task_abort.rs @@ -0,0 +1,26 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +/// Checks that a suspended task can be aborted without panicking as reported in +/// issue #3157: <https://github.com/tokio-rs/tokio/issues/3157>. +#[test] +fn test_abort_without_panic_3157() { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_time() + .worker_threads(1) + .build() + .unwrap(); + + rt.block_on(async move { + let handle = tokio::spawn(async move { + println!("task started"); + tokio::time::sleep(std::time::Duration::new(100, 0)).await + }); + + // wait for task to sleep. + tokio::time::sleep(std::time::Duration::new(1, 0)).await; + + handle.abort(); + let _ = handle.await; + }); +} |