diff options
Diffstat (limited to 'tokio/src/executor/task/mod.rs')
-rw-r--r-- | tokio/src/executor/task/mod.rs | 130 |
1 files changed, 130 insertions, 0 deletions
diff --git a/tokio/src/executor/task/mod.rs b/tokio/src/executor/task/mod.rs new file mode 100644 index 00000000..dfc6628e --- /dev/null +++ b/tokio/src/executor/task/mod.rs @@ -0,0 +1,130 @@ +mod core; +pub(crate) use self::core::Header; + +mod error; +pub use self::error::Error; + +mod harness; + +mod join; +pub(crate) use self::join::JoinHandle; + +mod list; +pub(crate) use self::list::OwnedList; + +mod raw; + +mod stack; +pub(crate) use self::stack::TransferStack; + +mod state; +mod waker; + +/// Unit tests +#[cfg(test)] +mod tests; + +use self::raw::RawTask; + +use std::future::Future; +use std::ptr::NonNull; +use std::{fmt, mem}; + +/// An owned handle to the task, tracked by ref count +pub(crate) struct Task<S: 'static> { + raw: RawTask<S>, +} + +unsafe impl<S: Send + Sync + 'static> Send for Task<S> {} + +/// Task result sent back +pub(crate) type Result<T> = std::result::Result<T, Error>; + +pub(crate) trait Schedule: Send + Sync + Sized + 'static { + /// Bind a task to the executor. + /// + /// Guaranteed to be called from the thread that called `poll` on the task. + fn bind(&self, task: &Task<Self>); + + /// The task has completed work and is ready to be released. The scheduler + /// is free to drop it whenever. + fn release(&self, task: Task<Self>); + + /// The has been completed by the executor it was bound to. + fn release_local(&self, task: &Task<Self>); + + /// Schedule the task + fn schedule(&self, task: Task<Self>); +} + +/// Create a new task without an associated join handle +pub(crate) fn background<T, S>(task: T) -> Task<S> +where + T: Future + Send + 'static, + S: Schedule, +{ + let raw = RawTask::new_background(task); + Task { raw } +} + +/// Create a new task with an associated join handle +pub(crate) fn joinable<T, S>(task: T) -> (Task<S>, JoinHandle<T::Output, S>) +where + T: Future + Send + 'static, + S: Schedule, +{ + let raw = RawTask::new_joinable(task); + let task = Task { raw }; + let join = JoinHandle::new(raw); + + (task, join) +} + +impl<S: 'static> Task<S> { + pub(crate) unsafe fn from_raw(ptr: NonNull<Header<S>>) -> Task<S> { + let raw = RawTask::from_raw(ptr); + Task { raw } + } + + pub(crate) fn header(&self) -> &Header<S> { + self.raw.header() + } + + pub(crate) fn into_raw(self) -> NonNull<Header<S>> { + let raw = self.raw.into_raw(); + mem::forget(self); + raw + } +} + +impl<S: Schedule> Task<S> { + /// Returns `self` when the task needs to be immediately re-scheduled + pub(crate) fn run(self, executor: NonNull<S>) -> Option<Self> { + if unsafe { self.raw.poll(executor) } { + Some(self) + } else { + // Cleaning up the `Task` instance is done from within the poll + // function. + mem::forget(self); + None + } + } + + /// Pre-emptively cancel the task as part of the shutdown process. + pub(crate) fn shutdown(self) { + self.raw.cancel_from_queue(); + mem::forget(self); + } +} + +impl<S: 'static> Drop for Task<S> { + fn drop(&mut self) { + self.raw.drop_task(); + } +} + +impl<S> fmt::Debug for Task<S> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Task").finish() + } +} |