diff options
Diffstat (limited to 'tokio/src/runtime/task')
-rw-r--r-- | tokio/src/runtime/task/core.rs | 280 | ||||
-rw-r--r-- | tokio/src/runtime/task/error.rs | 163 | ||||
-rw-r--r-- | tokio/src/runtime/task/harness.rs | 369 | ||||
-rw-r--r-- | tokio/src/runtime/task/join.rs | 149 | ||||
-rw-r--r-- | tokio/src/runtime/task/mod.rs | 219 | ||||
-rw-r--r-- | tokio/src/runtime/task/raw.rs | 131 | ||||
-rw-r--r-- | tokio/src/runtime/task/stack.rs | 81 | ||||
-rw-r--r-- | tokio/src/runtime/task/state.rs | 447 | ||||
-rw-r--r-- | tokio/src/runtime/task/waker.rs | 101 |
9 files changed, 1940 insertions, 0 deletions
diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs new file mode 100644 index 00000000..43e6b471 --- /dev/null +++ b/tokio/src/runtime/task/core.rs @@ -0,0 +1,280 @@ +use crate::loom::cell::CausalCell; +use crate::runtime::task::raw::{self, Vtable}; +use crate::runtime::task::state::State; +use crate::runtime::task::waker::waker_ref; +use crate::runtime::task::{Notified, Schedule, Task}; +use crate::util::linked_list; + +use std::cell::UnsafeCell; +use std::future::Future; +use std::pin::Pin; +use std::ptr::NonNull; +use std::task::{Context, Poll, Waker}; + +/// The task cell. Contains the components of the task. +/// +/// It is critical for `Header` to be the first field as the task structure will +/// be referenced by both *mut Cell and *mut Header. +#[repr(C)] +pub(super) struct Cell<T: Future, S> { + /// Hot task state data + pub(super) header: Header, + + /// Either the future or output, depending on the execution stage. + pub(super) core: Core<T, S>, + + /// Cold data + pub(super) trailer: Trailer, +} + +/// The core of the task. +/// +/// Holds the future or output, depending on the stage of execution. +pub(super) struct Core<T: Future, S> { + /// Scheduler used to drive this future + pub(super) scheduler: CausalCell<Option<S>>, + + /// Either the future or the output + pub(super) stage: CausalCell<Stage<T>>, +} + +/// Crate public as this is also needed by the pool. +#[repr(C)] +pub(crate) struct Header { + /// Task state + pub(super) state: State, + + pub(crate) owned: UnsafeCell<linked_list::Pointers<Header>>, + + /// Pointer to next task, used with the injection queue + pub(crate) queue_next: UnsafeCell<Option<NonNull<Header>>>, + + /// Pointer to the next task in the transfer stack + pub(super) stack_next: UnsafeCell<Option<NonNull<Header>>>, + + /// Table of function pointers for executing actions on the task. + pub(super) vtable: &'static Vtable, +} + +unsafe impl Send for Header {} +unsafe impl Sync for Header {} + +/// Cold data is stored after the future. +pub(super) struct Trailer { + /// Consumer task waiting on completion of this task. + pub(super) waker: CausalCell<Option<Waker>>, +} + +/// Either the future or the output. +pub(super) enum Stage<T: Future> { + Running(T), + Finished(super::Result<T::Output>), + Consumed, +} + +impl<T: Future, S: Schedule> Cell<T, S> { + /// Allocates a new task cell, containing the header, trailer, and core + /// structures. + pub(super) fn new(future: T, state: State) -> Box<Cell<T, S>> { + Box::new(Cell { + header: Header { + state, + owned: UnsafeCell::new(linked_list::Pointers::new()), + queue_next: UnsafeCell::new(None), + stack_next: UnsafeCell::new(None), + vtable: raw::vtable::<T, S>(), + }, + core: Core { + scheduler: CausalCell::new(None), + stage: CausalCell::new(Stage::Running(future)), + }, + trailer: Trailer { + waker: CausalCell::new(None), + }, + }) + } +} + +impl<T: Future, S: Schedule> Core<T, S> { + /// If needed, bind a scheduler to the task. + /// + /// This only happens on the first poll. + pub(super) fn bind_scheduler(&self, task: Task<S>) { + use std::mem::ManuallyDrop; + + // TODO: it would be nice to not have to wrap with a ManuallyDrop + let task = ManuallyDrop::new(task); + + // This function may be called concurrently, but the __first__ time it + // is called, the caller has unique access to this field. All subsequent + // concurrent calls will be via the `Waker`, which will "happens after" + // the first poll. + // + // In other words, it is always safe to read the field and it is safe to + // write to the field when it is `None`. + if self.is_bound() { + return; + } + + // Bind the task to the scheduler + let scheduler = S::bind(ManuallyDrop::into_inner(task)); + + // Safety: As `scheduler` is not set, this is the first poll + self.scheduler.with_mut(|ptr| unsafe { + *ptr = Some(scheduler); + }); + } + + /// Returns true if the task is bound to a scheduler. + pub(super) fn is_bound(&self) -> bool { + // Safety: never called concurrently w/ a mutation. + self.scheduler.with(|ptr| unsafe { (*ptr).is_some() }) + } + + /// Poll the future + /// + /// # Safety + /// + /// The caller must ensure it is safe to mutate the `state` field. This + /// requires ensuring mutal exclusion between any concurrent thread that + /// might modify the future or output field. + /// + /// The mutual exclusion is implemented by `Harness` and the `Lifecycle` + /// component of the task state. + /// + /// `self` must also be pinned. This is handled by storing the task on the + /// heap. + pub(super) fn poll(&self, header: &Header) -> Poll<T::Output> { + let res = { + self.stage.with_mut(|ptr| { + // Safety: The caller ensures mutual exclusion to the field. + let future = match unsafe { &mut *ptr } { + Stage::Running(future) => future, + _ => unreachable!("unexpected stage"), + }; + + // Safety: The caller ensures the future is pinned. + let future = unsafe { Pin::new_unchecked(future) }; + + // The waker passed into the `poll` function does not require a ref + // count increment. + let waker_ref = waker_ref::<T, S>(header); + let mut cx = Context::from_waker(&*waker_ref); + + future.poll(&mut cx) + }) + }; + + if res.is_ready() { + self.drop_future_or_output(); + } + + res + } + + /// Drop the future + /// + /// # Safety + /// + /// The caller must ensure it is safe to mutate the `stage` field. + pub(super) fn drop_future_or_output(&self) { + self.stage.with_mut(|ptr| { + // Safety: The caller ensures mutal exclusion to the field. + unsafe { *ptr = Stage::Consumed }; + }); + } + + /// Store the task output + /// + /// # Safety + /// + /// The caller must ensure it is safe to mutate the `stage` field. + pub(super) fn store_output(&self, output: super::Result<T::Output>) { + self.stage.with_mut(|ptr| { + // Safety: the caller ensures mutual exclusion to the field. + unsafe { *ptr = Stage::Finished(output) }; + }); + } + + /// Take the task output + /// + /// # Safety + /// + /// The caller must ensure it is safe to mutate the `stage` field. + pub(super) fn take_output(&self) -> super::Result<T::Output> { + use std::mem; + + self.stage.with_mut(|ptr| { + // Safety:: the caller ensures mutal exclusion to the field. + match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) { + Stage::Finished(output) => output, + _ => panic!("unexpected task state"), + } + }) + } + + /// Schedule the future for execution + pub(super) fn schedule(&self, task: Notified<S>) { + self.scheduler.with(|ptr| { + // Safety: Can only be called after initial `poll`, which is the + // only time the field is mutated. + match unsafe { &*ptr } { + Some(scheduler) => scheduler.schedule(task), + None => panic!("no scheduler set"), + } + }); + } + + /// Schedule the future for execution in the near future, yielding the + /// thread to other tasks. + pub(super) fn yield_now(&self, task: Notified<S>) { + self.scheduler.with(|ptr| { + // Safety: Can only be called after initial `poll`, which is the + // only time the field is mutated. + match unsafe { &*ptr } { + Some(scheduler) => scheduler.yield_now(task), + None => panic!("no scheduler set"), + } + }); + } + + /// Release the task + /// + /// If the `Scheduler` implementation is able to, it returns the `Task` + /// handle immediately. The caller of this function will batch a ref-dec + /// with a state change. + pub(super) fn release(&self, task: Task<S>) -> Option<Task<S>> { + use std::mem::ManuallyDrop; + + let task = ManuallyDrop::new(task); + + self.scheduler.with(|ptr| { + // Safety: Can only be called after initial `poll`, which is the + // only time the field is mutated. + match unsafe { &*ptr } { + Some(scheduler) => scheduler.release(&*task), + // Task was never polled + None => None, + } + }) + } +} + +cfg_rt_threaded! { + impl Header { + pub(crate) fn shutdown(&self) { + use crate::runtime::task::RawTask; + + let task = unsafe { RawTask::from_raw(self.into()) }; + task.shutdown(); + } + } +} + +#[test] +#[cfg(not(loom))] +fn header_lte_cache_line() { + use std::mem::size_of; + + assert!(size_of::<Header>() <= 8 * size_of::<*const ()>()); +} diff --git a/tokio/src/runtime/task/error.rs b/tokio/src/runtime/task/error.rs new file mode 100644 index 00000000..d5f65a49 --- /dev/null +++ b/tokio/src/runtime/task/error.rs @@ -0,0 +1,163 @@ +use std::any::Any; +use std::fmt; +use std::io; +use std::sync::Mutex; + +doc_rt_core! { + /// Task failed to execute to completion. + pub struct JoinError { + repr: Repr, + } +} + +enum Repr { + Cancelled, + Panic(Mutex<Box<dyn Any + Send + 'static>>), +} + +impl JoinError { + #[doc(hidden)] + #[deprecated] + pub fn cancelled() -> JoinError { + Self::cancelled2() + } + + pub(crate) fn cancelled2() -> JoinError { + JoinError { + repr: Repr::Cancelled, + } + } + + #[doc(hidden)] + #[deprecated] + pub fn panic(err: Box<dyn Any + Send + 'static>) -> JoinError { + Self::panic2(err) + } + + pub(crate) fn panic2(err: Box<dyn Any + Send + 'static>) -> JoinError { + JoinError { + repr: Repr::Panic(Mutex::new(err)), + } + } + + /// Returns true if the error was caused by the task being cancelled + pub fn is_cancelled(&self) -> bool { + match &self.repr { + Repr::Cancelled => true, + _ => false, + } + } + + /// Returns true if the error was caused by the task panicking + /// + /// # Examples + /// + /// ``` + /// use std::panic; + /// + /// #[tokio::main] + /// async fn main() { + /// let err = tokio::spawn(async { + /// panic!("boom"); + /// }).await.unwrap_err(); + /// + /// assert!(err.is_panic()); + /// } + /// ``` + pub fn is_panic(&self) -> bool { + match &self.repr { + Repr::Panic(_) => true, + _ => false, + } + } + + /// Consumes the join error, returning the object with which the task panicked. + /// + /// # Panics + /// + /// `into_panic()` panics if the `Error` does not represent the underlying + /// task terminating with a panic. Use `is_panic` to check the error reason + /// or `try_into_panic` for a variant that does not panic. + /// + /// # Examples + /// + /// ```should_panic + /// use std::panic; + /// + /// #[tokio::main] + /// async fn main() { + /// let err = tokio::spawn(async { + /// panic!("boom"); + /// }).await.unwrap_err(); + /// + /// if err.is_panic() { + /// // Resume the panic on the main task + /// panic::resume_unwind(err.into_panic()); + /// } + /// } + /// ``` + pub fn into_panic(self) -> Box<dyn Any + Send + 'static> { + self.try_into_panic() + .expect("`JoinError` reason is not a panic.") + } + + /// Consumes the join error, returning the object with which the task + /// panicked if the task terminated due to a panic. Otherwise, `self` is + /// returned. + /// + /// # Examples + /// + /// ```should_panic + /// use std::panic; + /// + /// #[tokio::main] + /// async fn main() { + /// let err = tokio::spawn(async { + /// panic!("boom"); + /// }).await.unwrap_err(); + /// + /// if let Ok(reason) = err.try_into_panic() { + /// // Resume the panic on the main task + /// panic::resume_unwind(reason); + /// } + /// } + /// ``` + pub fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, JoinError> { + match self.repr { + Repr::Panic(p) => Ok(p.into_inner().expect("Extracting panic from mutex")), + _ => Err(self), + } + } +} + +impl fmt::Display for JoinError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.repr { + Repr::Cancelled => write!(fmt, "cancelled"), + Repr::Panic(_) => write!(fmt, "panic"), + } + } +} + +impl fmt::Debug for JoinError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.repr { + Repr::Cancelled => write!(fmt, "JoinError::Cancelled"), + Repr::Panic(_) => write!(fmt, "JoinError::Panic(...)"), + } + } +} + +impl std::error::Error for JoinError {} + +impl From<JoinError> for io::Error { + fn from(src: JoinError) -> io::Error { + io::Error::new( + io::ErrorKind::Other, + match src.repr { + Repr::Cancelled => "task was cancelled", + Repr::Panic(_) => "task panicked", + }, + ) + } +} diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs new file mode 100644 index 00000000..f9cf5e75 --- /dev/null +++ b/tokio/src/runtime/task/harness.rs @@ -0,0 +1,369 @@ +use crate::runtime::task::core::{Cell, Core, Header, Trailer}; +use crate::runtime::task::state::Snapshot; +use crate::runtime::task::{JoinError, Notified, Schedule, Task}; + +use std::future::Future; +use std::mem; +use std::panic; +use std::ptr::NonNull; +use std::task::{Poll, Waker}; + +/// Typed raw task handle +pub(super) struct Harness<T: Future, S: 'static> { + cell: NonNull<Cell<T, S>>, +} + +impl<T, S> Harness<T, S> +where + T: Future, + S: 'static, +{ + pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> { + Harness { + cell: ptr.cast::<Cell<T, S>>(), + } + } + + fn header(&self) -> &Header { + unsafe { &self.cell.as_ref().header } + } + + fn trailer(&self) -> &Trailer { + unsafe { &self.cell.as_ref().trailer } + } + + fn core(&self) -> &Core<T, S> { + unsafe { &self.cell.as_ref().core } + } +} + +impl<T, S> Harness<T, S> +where + T: Future, + S: Schedule, +{ + /// Polls the inner future. + /// + /// All necessary state checks and transitions are performed. + /// + /// Panics raised while polling the future are handled. + pub(super) fn poll(self) { + // If this is the first time the task is polled, the task will be bound + // to the scheduler, in which case the task ref count must be + // incremented. + let ref_inc = !self.core().is_bound(); + + // Transition the task to the running state. + // + // A failure to transition here indicates the task has been cancelled + // while in the run queue pending execution. + let snapshot = match self.header().state.transition_to_running(ref_inc) { + Ok(snapshot) => snapshot, + Err(_) => { + // The task was shutdown while in the run queue. At this point, + // we just hold a ref counted reference. Drop it here. + self.drop_reference(); + return; + } + }; + + // Ensure the task is bound to a scheduler instance. If this is the + // first time polling the task, a scheduler instance is pulled from the + // local context and assigned to the task. + // + // The scheduler maintains ownership of the task and responds to `wake` + // calls. + // + // The task reference count has been incremented. + self.core().bind_scheduler(self.to_task()); + + // The transition to `Running` done above ensures that a lock on the + // future has been obtained. This also ensures the `*mut T` pointer + // contains the future (as opposed to the output) and is initialized. + + let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { + struct Guard<'a, T: Future, S: Schedule> { + core: &'a Core<T, S>, + polled: bool, + } + + impl<T: Future, S: Schedule> Drop for Guard<'_, T, S> { + fn drop(&mut self) { + if !self.polled { + self.core.drop_future_or_output(); + } + } + } + + let mut guard = Guard { + core: self.core(), + polled: false, + }; + + // If the task is cancelled, avoid polling it, instead signalling it + // is complete. + if snapshot.is_cancelled() { + Poll::Ready(Err(JoinError::cancelled2())) + } else { + let res = guard.core.poll(self.header()); + + // prevent the guard from dropping the future + guard.polled = true; + + res.map(Ok) + } + })); + + match res { + Ok(Poll::Ready(out)) => { + self.complete(out, snapshot.is_join_interested()); + } + Ok(Poll::Pending) => { + match self.header().state.transition_to_idle() { + Ok(snapshot) => { + if snapshot.is_notified() { + // Signal yield + self.core().yield_now(Notified(self.to_task())); + } + } + Err(_) => self.cancel_task(), + } + } + Err(err) => { + self.complete(Err(JoinError::panic2(err)), snapshot.is_join_interested()); + } + } + } + + pub(super) fn dealloc(self) { + // Release the join waker, if there is one. + self.trailer().waker.with_mut(|_| ()); + + // Check causality + self.core().stage.with_mut(|_| {}); + self.core().scheduler.with_mut(|_| {}); + + unsafe { + drop(Box::from_raw(self.cell.as_ptr())); + } + } + + // ===== join handle ===== + + /// Read the task output into `dst`. + pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) { + // Load a snapshot of the current task state + let snapshot = self.header().state.load(); + + debug_assert!(snapshot.is_join_interested()); + + if !snapshot.is_complete() { + // The waker must be stored in the task struct. + let res = if snapshot.has_join_waker() { + // There already is a waker stored in the struct. If it matches + // the provided waker, then there is no further work to do. + // Otherwise, the waker must be swapped. + let will_wake = unsafe { + // Safety: when `JOIN_INTEREST` is set, only `JOIN_HANDLE` + // may mutate the `waker` field. + self.trailer() + .waker + .with(|ptr| (*ptr).as_ref().unwrap().will_wake(waker)) + }; + + if will_wake { + // The task is not complete **and** the waker is up to date, + // there is nothing further that needs to be done. + return; + } + + // Unset the `JOIN_WAKER` to gain mutable access to the `waker` + // field then update the field with the new join worker. + // + // This requires two atomic operations, unsetting the bit and + // then resetting it. If the task transitions to complete + // concurrently to either one of those operations, then setting + // the join waker fails and we proceed to reading the task + // output. + self.header() + .state + .unset_waker() + .and_then(|snapshot| self.set_join_waker(waker.clone(), snapshot)) + } else { + self.set_join_waker(waker.clone(), snapshot) + }; + + match res { + Ok(_) => return, + Err(snapshot) => { + assert!(snapshot.is_complete()); + } + } + } + + *dst = Poll::Ready(self.core().take_output()); + } + + fn set_join_waker(&self, waker: Waker, snapshot: Snapshot) -> Result<Snapshot, Snapshot> { + assert!(snapshot.is_join_interested()); + assert!(!snapshot.has_join_waker()); + + // Safety: Only the `JoinHandle` may set the `waker` field. When + // `JOIN_INTEREST` is **not** set, nothing else will touch the field. + unsafe { + self.trailer().waker.with_mut(|ptr| { + *ptr = Some(waker); + }); + } + + // Update the `JoinWaker` state accordingly + let res = self.header().state.set_join_waker(); + + // If the state could not be updated, then clear the join waker + if res.is_err() { + unsafe { + self.trailer().waker.with_mut(|ptr| { + *ptr = None; + }); + } + } + + res + } + + pub(super) fn drop_join_handle_slow(self) { + // Try to unset `JOIN_INTEREST`. This must be done as a first step in + // case the task concurrently completed. + if self.header().state.unset_join_interested().is_err() { + // It is our responsibility to drop the output. This is critical as + // the task output may not be `Send` and as such must remain with + // the scheduler or `JoinHandle`. i.e. if the output remains in the + // task structure until the task is deallocated, it may be dropped + // by a Waker on any arbitrary thread. + self.core().drop_future_or_output(); + } + + // Drop the `JoinHandle` reference, possibly deallocating the task + self.drop_reference(); + } + + // ===== waker behavior ===== + + pub(super) fn wake_by_val(self) { + self.wake_by_ref(); + self.drop_reference(); + } + + pub(super) fn wake_by_ref(&self) { + if self.header().state.transition_to_notified() { + self.core().schedule(Notified(self.to_task())); + } + } + + pub(super) fn drop_reference(self) { + if self.header().state.ref_dec() { + self.dealloc(); + } + } + + /// Forcibly shutdown the task + /// + /// Attempt to transition to `Running` in order to forcibly shutdown the + /// task. If the task is currently running or in a state of completion, then + /// there is nothing further to do. When the task completes running, it will + /// notice the `CANCELLED` bit and finalize the task. + pub(super) fn shutdown(self) { + if !self.header().state.transition_to_shutdown() { + // The task is concurrently running. No further work needed. + return; + } + + // By transitioning the lifcycle to `Running`, we have permission to + // drop the future. + self.cancel_task(); + } + + // ====== internal ====== + + fn cancel_task(self) { + // Drop the future from a panic guard. + let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { + self.core().drop_future_or_output(); + })); + + if let Err(err) = res { + // Dropping the future panicked, complete the join + // handle with the panic to avoid dropping the panic + // on the ground. + self.complete(Err(JoinError::panic2(err)), true); + } else { + self.complete(Err(JoinError::cancelled2()), true); + } + } + + fn complete(mut self, output: super::Result<T::Output>, is_join_interested: bool) { + if is_join_interested { + // Store the output. The future has already been dropped + // + // Safety: Mutual exclusion is obtained by having transitioned the task + // state -> Running + self.core().store_output(output); + + // Transition to `Complete`, notifying the `JoinHandle` if necessary. + self.transition_to_complete(); + } + + // The task has completed execution and will no longer be scheduled. + // + // Attempts to batch a ref-dec with the state transition below. + let ref_dec = if self.core().is_bound() { + if let Some(task) = self.core().release(self.to_task()) { + mem::forget(task); + true + } else { + false + } + } else { + false + }; + + // This might deallocate + let snapshot = self + .header() + .state + .transition_to_terminal(!is_join_interested, ref_dec); + + if snapshot.ref_count() == 0 { + self.dealloc() + } + } + + /// Transitions the task's lifecycle to `Complete`. Notifies the + /// `JoinHandle` if it still has interest in the completion. + fn transition_to_complete(&mut self) { + // Transition the task's lifecycle to `Complete` and get a snapshot of + // the task's sate. + let snapshot = self.header().state.transition_to_complete(); + + if !snapshot.is_join_interested() { + // The `JoinHandle` is not interested in the output of this task. It + // is our responsibility to drop the output. + self.core().drop_future_or_output(); + } else if snapshot.has_join_waker() { + // Notify the join handle. The previous transition obtains the + // lock on the waker cell. + self.wake_join(); + } + } + + fn wake_join(&self) { + self.trailer().waker.with(|ptr| match unsafe { &*ptr } { + Some(waker) => waker.wake_by_ref(), + None => panic!("waker missing"), + }); + } + + fn to_task(&self) -> Task<S> { + unsafe { Task::from_raw(self.header().into()) } + } +} diff --git a/tokio/src/runtime/task/join.rs b/tokio/src/runtime/task/join.rs new file mode 100644 index 00000000..ed893a35 --- /dev/null +++ b/tokio/src/runtime/task/join.rs @@ -0,0 +1,149 @@ +use crate::runtime::task::RawTask; + +use std::fmt; +use std::future::Future; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; + +doc_rt_core! { + /// An owned permission to join on a task (await its termination). + /// + /// This can be thought of as the equivalent of [`std::thread::JoinHandle`] for + /// a task rather than a thread. + /// + /// A `JoinHandle` *detaches* the associated task when it is dropped, which + /// means that there is no longer any handle to the task, and no way to `join` + /// on it. + /// + /// This `struct` is created by the [`task::spawn`] and [`task::spawn_blocking`] + /// functions. + /// + /// # Examples + /// + /// Creation from [`task::spawn`]: + /// + /// ``` + /// use tokio::task; + /// + /// # async fn doc() { + /// let join_handle: task::JoinHandle<_> = task::spawn(async { + /// // some work here + /// }); + /// # } + /// ``` + /// + /// Creation from [`task::spawn_blocking`]: + /// + /// ``` + /// use tokio::task; + /// + /// # async fn doc() { + /// let join_handle: task::JoinHandle<_> = task::spawn_blocking(|| { + /// // some blocking work here + /// }); + /// # } + /// ``` + /// + /// Child being detached and outliving its parent: + /// + /// ```no_run + /// use tokio::task; + /// use tokio::time; + /// use std::time::Duration; + /// + /// # #[tokio::main] async fn main() { + /// let original_task = task::spawn(async { + /// let _detached_task = task::spawn(async { + /// // Here we sleep to make sure that the first task returns before. + /// time::delay_for(Duration::from_millis(10)).await; + /// // This will be called, even though the JoinHandle is dropped. + /// println!("♫ Still alive ♫"); + /// }); + /// }); + /// + /// original_task.await.expect("The task being joined has panicked"); + /// println!("Original task is joined."); + /// + /// // We make sure that the new task has time to run, before the main + /// // task returns. + /// + /// time::delay_for(Duration::from_millis(1000)).await; + /// # } + /// ``` + /// + /// [`task::spawn`]: crate::task::spawn() + /// [`task::spawn_blocking`]: crate::task::spawn_blocking + /// [`std::thread::JoinHandle`]: std::thread::JoinHandle + pub struct JoinHandle<T> { + raw: Option<RawTask>, + _p: PhantomData<T>, + } +} + +unsafe impl<T: Send> Send for JoinHandle<T> {} +unsafe impl<T: Send> Sync for JoinHandle<T> {} + +impl<T> JoinHandle<T> { + pub(super) fn new(raw: RawTask) -> JoinHandle<T> { + JoinHandle { + raw: Some(raw), + _p: PhantomData, + } + } +} + +impl<T> Unpin for JoinHandle<T> {} + +impl<T> Future for JoinHandle<T> { + type Output = super::Result<T>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut ret = Poll::Pending; + + // Raw should always be set. If it is not, this is due to polling after + // completion + let raw = self + .raw + .as_ref() + .expect("polling after `JoinHandle` already completed"); + + // Try to read the task output. If the task is not yet complete, the + // waker is stored and is notified once the task does complete. + // + // The function must go via the vtable, which requires erasing generic + // types. To do this, the function "return" is placed on the stack + // **before** calling the function and is passed into the function using + // `*mut ()`. + // + // Safety: + // + // The type of `T` must match the task's output type. + unsafe { + raw.try_read_output(&mut ret as *mut _ as *mut (), cx.waker()); + } + + ret + } +} + +impl<T> Drop for JoinHandle<T> { + fn drop(&mut self) { + if let Some(raw) = self.raw.take() { + if raw.header().state.drop_join_handle_fast().is_ok() { + return; + } + + raw.drop_join_handle_slow(); + } + } +} + +impl<T> fmt::Debug for JoinHandle<T> +where + T: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("JoinHandle").finish() + } +} diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs new file mode 100644 index 00000000..1ea60a9b --- /dev/null +++ b/tokio/src/runtime/task/mod.rs @@ -0,0 +1,219 @@ +mod core; +use self::core::Cell; +pub(crate) use self::core::Header; + +mod error; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::error::JoinError; + +mod harness; +use self::harness::Harness; + +mod join; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::join::JoinHandle; + +mod raw; +use self::raw::RawTask; + +mod state; +use self::state::State; + +mod waker; + +cfg_rt_threaded! { + mod stack; + pub(crate) use self::stack::TransferStack; +} + +use crate::util::linked_list; + +use std::future::Future; +use std::marker::PhantomData; +use std::ptr::NonNull; +use std::{fmt, mem}; + +/// An owned handle to the task, tracked by ref count +#[repr(transparent)] +pub(crate) struct Task<S: 'static> { + raw: RawTask, + _p: PhantomData<S>, +} + +unsafe impl<S> Send for Task<S> {} |