diff options
-rw-r--r-- | tokio/src/macros/cfg.rs | 9 | ||||
-rw-r--r-- | tokio/src/task/blocking.rs | 1 | ||||
-rw-r--r-- | tokio/src/task/error.rs | 8 | ||||
-rw-r--r-- | tokio/src/task/join.rs | 144 | ||||
-rw-r--r-- | tokio/src/task/local.rs | 1 | ||||
-rw-r--r-- | tokio/src/task/mod.rs | 256 | ||||
-rw-r--r-- | tokio/src/task/spawn.rs | 96 | ||||
-rw-r--r-- | tokio/src/task/yield_now.rs | 36 |
8 files changed, 285 insertions, 266 deletions
diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 4aee3b7f..cc93cc8a 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -213,6 +213,15 @@ macro_rules! cfg_rt_core { ($($item:item)*) => { $( #[cfg(feature = "rt-core")] + $item + )* + } +} + +macro_rules! doc_rt_core { + ($($item:item)*) => { + $( + #[cfg(feature = "rt-core")] #[cfg_attr(docsrs, doc(cfg(feature = "rt-core")))] $item )* diff --git a/tokio/src/task/blocking.rs b/tokio/src/task/blocking.rs index 71fbbe72..a155197f 100644 --- a/tokio/src/task/blocking.rs +++ b/tokio/src/task/blocking.rs @@ -21,6 +21,7 @@ cfg_rt_threaded! { /// }); /// # } /// ``` + #[cfg_attr(docsrs, doc(cfg(feature = "blocking")))] pub fn block_in_place<F, R>(f: F) -> R where F: FnOnce() -> R, diff --git a/tokio/src/task/error.rs b/tokio/src/task/error.rs index e5eea465..7f7f53be 100644 --- a/tokio/src/task/error.rs +++ b/tokio/src/task/error.rs @@ -2,9 +2,11 @@ use std::any::Any; use std::fmt; use std::io; -/// Task failed to execute to completion. -pub struct JoinError { - repr: Repr, +doc_rt_core! { + /// Task failed to execute to completion. + pub struct JoinError { + repr: Repr, + } } enum Repr { diff --git a/tokio/src/task/join.rs b/tokio/src/task/join.rs index 5ae93ea8..8a8f2571 100644 --- a/tokio/src/task/join.rs +++ b/tokio/src/task/join.rs @@ -7,77 +7,79 @@ use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; -/// An owned permission to join on a task (await its termination). -/// -/// This can be thought of as the equivalent of [`std::thread::JoinHandle`] for -/// a task rather than a thread. -/// -/// A `JoinHandle` *detaches* the associated task when it is dropped, which -/// means that there is no longer any handle to the task, and no way to `join` -/// on it. -/// -/// This `struct` is created by the [`task::spawn`] and [`task::spawn_blocking`] -/// functions. -/// -/// # Examples -/// -/// Creation from [`task::spawn`]: -/// -/// ``` -/// use tokio::task; -/// -/// # async fn doc() { -/// let join_handle: task::JoinHandle<_> = task::spawn(async { -/// // some work here -/// }); -/// # } -/// ``` -/// -/// Creation from [`task::spawn_blocking`]: -/// -/// ``` -/// use tokio::task; -/// -/// # async fn doc() { -/// let join_handle: task::JoinHandle<_> = task::spawn_blocking(|| { -/// // some blocking work here -/// }); -/// # } -/// ``` -/// -/// Child being detached and outliving its parent: -/// -/// ```no_run -/// use tokio::task; -/// use tokio::time; -/// use std::time::Duration; -/// -/// # #[tokio::main] async fn main() { -/// let original_task = task::spawn(async { -/// let _detached_task = task::spawn(async { -/// // Here we sleep to make sure that the first task returns before. -/// time::delay_for(Duration::from_millis(10)).await; -/// // This will be called, even though the JoinHandle is dropped. -/// println!("♫ Still alive ♫"); -/// }); -/// }); -/// -/// original_task.await.expect("The task being joined has panicked"); -/// println!("Original task is joined."); -/// -/// // We make sure that the new task has time to run, before the main -/// // task returns. -/// -/// time::delay_for(Duration::from_millis(1000)).await; -/// # } -/// ``` -/// -/// [`task::spawn`]: crate::task::spawn() -/// [`task::spawn_blocking`]: crate::task::spawn_blocking -/// [`std::thread::JoinHandle`]: std::thread::JoinHandle -pub struct JoinHandle<T> { - raw: Option<RawTask>, - _p: PhantomData<T>, +doc_rt_core! { + /// An owned permission to join on a task (await its termination). + /// + /// This can be thought of as the equivalent of [`std::thread::JoinHandle`] for + /// a task rather than a thread. + /// + /// A `JoinHandle` *detaches* the associated task when it is dropped, which + /// means that there is no longer any handle to the task, and no way to `join` + /// on it. + /// + /// This `struct` is created by the [`task::spawn`] and [`task::spawn_blocking`] + /// functions. + /// + /// # Examples + /// + /// Creation from [`task::spawn`]: + /// + /// ``` + /// use tokio::task; + /// + /// # async fn doc() { + /// let join_handle: task::JoinHandle<_> = task::spawn(async { + /// // some work here + /// }); + /// # } + /// ``` + /// + /// Creation from [`task::spawn_blocking`]: + /// + /// ``` + /// use tokio::task; + /// + /// # async fn doc() { + /// let join_handle: task::JoinHandle<_> = task::spawn_blocking(|| { + /// // some blocking work here + /// }); + /// # } + /// ``` + /// + /// Child being detached and outliving its parent: + /// + /// ```no_run + /// use tokio::task; + /// use tokio::time; + /// use std::time::Duration; + /// + /// # #[tokio::main] async fn main() { + /// let original_task = task::spawn(async { + /// let _detached_task = task::spawn(async { + /// // Here we sleep to make sure that the first task returns before. + /// time::delay_for(Duration::from_millis(10)).await; + /// // This will be called, even though the JoinHandle is dropped. + /// println!("♫ Still alive ♫"); + /// }); + /// }); + /// + /// original_task.await.expect("The task being joined has panicked"); + /// println!("Original task is joined."); + /// + /// // We make sure that the new task has time to run, before the main + /// // task returns. + /// + /// time::delay_for(Duration::from_millis(1000)).await; + /// # } + /// ``` + /// + /// [`task::spawn`]: crate::task::spawn() + /// [`task::spawn_blocking`]: crate::task::spawn_blocking + /// [`std::thread::JoinHandle`]: std::thread::JoinHandle + pub struct JoinHandle<T> { + raw: Option<RawTask>, + _p: PhantomData<T>, + } } unsafe impl<T: Send> Send for JoinHandle<T> {} diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 6d0adf31..7f29e3d1 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -13,6 +13,7 @@ use std::sync::Mutex; use std::task::{Context, Poll}; use pin_project_lite::pin_project; + cfg_rt_util! { /// A set of tasks which are executed on the same thread. /// diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index f84659e0..8b84f210 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -216,119 +216,99 @@ cfg_blocking! { } } -mod core; -use self::core::Cell; -pub(crate) use self::core::Header; +cfg_rt_core! { + mod core; + use self::core::Cell; + pub(crate) use self::core::Header; -mod error; -pub use self::error::JoinError; + mod error; + pub use self::error::JoinError; -mod harness; -use self::harness::Harness; + mod harness; + use self::harness::Harness; -cfg_rt_core! { mod join; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::join::JoinHandle; -} - -cfg_rt_util! { - mod local; - pub use local::{spawn_local, LocalSet}; -} -mod list; -pub(crate) use self::list::OwnedList; + mod list; + pub(crate) use self::list::OwnedList; -mod raw; -use self::raw::RawTask; + mod raw; + use self::raw::RawTask; -cfg_rt_core! { mod spawn; pub use spawn::spawn; -} - -mod stack; -pub(crate) use self::stack::TransferStack; - -mod state; -use self::state::{Snapshot, State}; -mod waker; + mod stack; + pub(crate) use self::stack::TransferStack; -mod yield_now; -pub use yield_now::yield_now; + mod state; + use self::state::{Snapshot, State}; -/// Unit tests -#[cfg(test)] -mod tests; + mod waker; -use std::future::Future; -use std::marker::PhantomData; -use std::ptr::NonNull; -use std::{fmt, mem}; + mod yield_now; + pub use yield_now::yield_now; +} -/// An owned handle to the task, tracked by ref count -pub(crate) struct Task<S: 'static> { - raw: RawTask, - _p: PhantomData<S>, +cfg_rt_util! { + mod local; + pub use local::{spawn_local, LocalSet}; } -unsafe impl<S: ScheduleSendOnly + 'static> Send for Task<S> {} +cfg_rt_core! { + /// Unit tests + #[cfg(test)] + mod tests; + + use std::future::Future; + use std::marker::PhantomData; + use std::ptr::NonNull; + use std::{fmt, mem}; + + /// An owned handle to the task, tracked by ref count + pub(crate) struct Task<S: 'static> { + raw: RawTask, + _p: PhantomData<S>, + } -/// Task result sent back -pub(crate) type Result<T> = std::result::Result<T, JoinError>; + unsafe impl<S: ScheduleSendOnly + 'static> Send for Task<S> {} -pub(crate) trait Schedule: Sized + 'static { - /// Bind a task to the executor. - /// - /// Guaranteed to be called from the thread that called `poll` on the task. - fn bind(&self, task: &Task<Self>); + /// Task result sent back + pub(crate) type Result<T> = std::result::Result<T, JoinError>; - /// The task has completed work and is ready to be released. The scheduler - /// is free to drop it whenever. - fn release(&self, task: Task<Self>); + pub(crate) trait Schedule: Sized + 'static { + /// Bind a task to the executor. + /// + /// Guaranteed to be called from the thread that called `poll` on the task. + fn bind(&self, task: &Task<Self>); - /// The has been completed by the executor it was bound to. - fn release_local(&self, task: &Task<Self>); + /// The task has completed work and is ready to be released. The scheduler + /// is free to drop it whenever. + fn release(&self, task: Task<Self>); - /// Schedule the task - fn schedule(&self, task: Task<Self>); -} + /// The has been completed by the executor it was bound to. + fn release_local(&self, task: &Task<Self>); -/// Marker trait indicating that a scheduler can only schedule tasks which -/// implement `Send`. -/// -/// Schedulers that implement this trait may not schedule `!Send` futures. If -/// trait is implemented, the corresponding `Task` type will implement `Send`. -pub(crate) trait ScheduleSendOnly: Schedule + Send + Sync {} - -/// Create a new task with an associated join handle -pub(crate) fn joinable<T, S>(task: T) -> (Task<S>, JoinHandle<T::Output>) -where - T: Future + Send + 'static, - S: ScheduleSendOnly, -{ - let raw = RawTask::new_joinable::<_, S>(task); - - let task = Task { - raw, - _p: PhantomData, - }; - - let join = JoinHandle::new(raw); - - (task, join) -} + /// Schedule the task + fn schedule(&self, task: Task<Self>); + } -cfg_rt_util! { - /// Create a new `!Send` task with an associated join handle - pub(crate) fn joinable_local<T, S>(task: T) -> (Task<S>, JoinHandle<T::Output>) + /// Marker trait indicating that a scheduler can only schedule tasks which + /// implement `Send`. + /// + /// Schedulers that implement this trait may not schedule `!Send` futures. If + /// trait is implemented, the corresponding `Task` type will implement `Send`. + pub(crate) trait ScheduleSendOnly: Schedule + Send + Sync {} + + /// Create a new task with an associated join handle + pub(crate) fn joinable<T, S>(task: T) -> (Task<S>, JoinHandle<T::Output>) where - T: Future + 'static, - S: Schedule, + T: Future + Send + 'static, + S: ScheduleSendOnly, { - let raw = RawTask::new_joinable_local::<_, S>(task); + let raw = RawTask::new_joinable::<_, S>(task); let task = Task { raw, @@ -339,61 +319,81 @@ cfg_rt_util! { (task, join) } -} -impl<S: 'static> Task<S> { - pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> { - Task { - raw: RawTask::from_raw(ptr), - _p: PhantomData, + cfg_rt_util! { + /// Create a new `!Send` task with an associated join handle + pub(crate) fn joinable_local<T, S>(task: T) -> (Task<S>, JoinHandle<T::Output>) + where + T: Future + 'static, + S: Schedule, + { + let raw = RawTask::new_joinable_local::<_, S>(task); + + let task = Task { + raw, + _p: PhantomData, + }; + + let join = JoinHandle::new(raw); + + (task, join) } } - pub(crate) fn header(&self) -> &Header { - self.raw.header() - } + impl<S: 'static> Task<S> { + pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> { + Task { + raw: RawTask::from_raw(ptr), + _p: PhantomData, + } + } - pub(crate) fn into_raw(self) -> NonNull<Header> { - let raw = self.raw.into_raw(); - mem::forget(self); - raw - } -} + pub(crate) fn header(&self) -> &Header { + self.raw.header() + } -impl<S: Schedule> Task<S> { - /// Returns `self` when the task needs to be immediately re-scheduled - pub(crate) fn run<F>(self, mut executor: F) -> Option<Self> - where - F: FnMut() -> Option<NonNull<S>>, - { - if unsafe { - self.raw - .poll(&mut || executor().map(|ptr| ptr.cast::<()>())) - } { - Some(self) - } else { - // Cleaning up the `Task` instance is done from within the poll - // function. + pub(crate) fn into_raw(self) -> NonNull<Header> { + let raw = self.raw.into_raw(); mem::forget(self); - None + raw } } - /// Pre-emptively cancel the task as part of the shutdown process. - pub(crate) fn shutdown(self) { - self.raw.cancel_from_queue(); - mem::forget(self); + impl<S: Schedule> Task<S> { + /// Returns `self` when the task needs to be immediately re-scheduled + pub(crate) fn run<F>(self, mut executor: F) -> Option<Self> + where + F: FnMut() -> Option<NonNull<S>>, + { + if unsafe { + self.raw + .poll(&mut || executor().map(|ptr| ptr.cast::<()>())) + } { + Some(self) + } else { + // Cleaning up the `Task` instance is done from within the poll + // function. + mem::forget(self); + None + } + } + + /// Pre-emptively cancel the task as part of the shutdown process. + pub(crate) fn shutdown(self) { + self.raw.cancel_from_queue(); + mem::forget(self); + } } -} -impl<S: 'static> Drop for Task<S> { - fn drop(&mut self) { - self.raw.drop_task(); + impl<S: 'static> Drop for Task<S> { + fn drop(&mut self) { + self.raw.drop_task(); + } } -} -impl<S> fmt::Debug for Task<S> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Task").finish() + impl<S> fmt::Debug for Task<S> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Task").finish() + } } } diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs index e0d19fd5..ab1356c8 100644 --- a/tokio/src/task/spawn.rs +++ b/tokio/src/task/spawn.rs @@ -3,51 +3,53 @@ use crate::task::JoinHandle; use std::future::Future; -/// Spawns a new asynchronous task, returning a -/// [`JoinHandle`](super::JoinHandle) for it. -/// -/// Spawning a task enables the task to execute concurrently to other tasks. The -/// spawned task may execute on the current thread, or it may be sent to a -/// different thread to be executed. The specifics depend on the current -/// [`Runtime`](crate::runtime::Runtime) configuration. -/// -/// # Examples -/// -/// In this example, a server is started and `spawn` is used to start a new task -/// that processes each received connection. -/// -/// ```no_run -/// use tokio::net::{TcpListener, TcpStream}; -/// -/// use std::io; -/// -/// async fn process(socket: TcpStream) { -/// // ... -/// # drop(socket); -/// } -/// -/// #[tokio::main] -/// async fn main() -> io::Result<()> { -/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?; -/// -/// loop { -/// let (socket, _) = listener.accept().await?; -/// -/// tokio::spawn(async move { -/// // Process each socket concurrently. -/// process(socket).await -/// }); -/// } -/// } -/// ``` -/// -/// # Panics -/// -/// Panics if called from **outside** of the Tokio runtime. -pub fn spawn<T>(task: T) -> JoinHandle<T::Output> -where - T: Future + Send + 'static, - T::Output: Send + 'static, -{ - runtime::spawn(task) +doc_rt_core! { + /// Spawns a new asynchronous task, returning a + /// [`JoinHandle`](super::JoinHandle) for it. + /// + /// Spawning a task enables the task to execute concurrently to other tasks. The + /// spawned task may execute on the current thread, or it may be sent to a + /// different thread to be executed. The specifics depend on the current + /// [`Runtime`](crate::runtime::Runtime) configuration. + /// + /// # Examples + /// + /// In this example, a server is started and `spawn` is used to start a new task + /// that processes each received connection. + /// + /// ```no_run + /// use tokio::net::{TcpListener, TcpStream}; + /// + /// use std::io; + /// + /// async fn process(socket: TcpStream) { + /// // ... + /// # drop(socket); + /// } + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut listener = TcpListener::bind("127.0.0.1:8080").await?; + /// + /// loop { + /// let (socket, _) = listener.accept().await?; + /// + /// tokio::spawn(async move { + /// // Process each socket concurrently. + /// process(socket).await + /// }); + /// } + /// } + /// ``` + /// + /// # Panics + /// + /// Panics if called from **outside** of the Tokio runtime. + pub fn spawn<T>(task: T) -> JoinHandle<T::Output> + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + runtime::spawn(task) + } } diff --git a/tokio/src/task/yield_now.rs b/tokio/src/task/yield_now.rs index aabd6b39..d6d94665 100644 --- a/tokio/src/task/yield_now.rs +++ b/tokio/src/task/yield_now.rs @@ -2,26 +2,28 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -/// Yield execution back to the Tokio runtime. -pub async fn yield_now() { - /// Yield implementation - struct YieldNow { - yielded: bool, - } +doc_rt_core! { + /// Yield execution back to the Tokio runtime. + pub async fn yield_now() { + /// Yield implementation + struct YieldNow { + yielded: bool, + } - impl Future for YieldNow { - type Output = (); + impl Future for YieldNow { + type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - if self.yielded { - return Poll::Ready(()); - } + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + if self.yielded { + return Poll::Ready(()); + } - self.yielded = true; - cx.waker().wake_by_ref(); - Poll::Pending + self.yielded = true; + cx.waker().wake_by_ref(); + Poll::Pending + } } - } - YieldNow { yielded: false }.await + YieldNow { yielded: false }.await + } } |