diff options
Diffstat (limited to 'tokio/src/executor/task/stack.rs')
-rw-r--r-- | tokio/src/executor/task/stack.rs | 85 |
1 files changed, 85 insertions, 0 deletions
diff --git a/tokio/src/executor/task/stack.rs b/tokio/src/executor/task/stack.rs new file mode 100644 index 00000000..84345282 --- /dev/null +++ b/tokio/src/executor/task/stack.rs @@ -0,0 +1,85 @@ +use crate::executor::loom::sync::atomic::AtomicPtr; +use crate::executor::task::{Header, Task}; + +use std::ptr::{self, NonNull}; +use std::sync::atomic::Ordering::{Acquire, Relaxed, Release}; + +/// Concurrent stack of tasks, used to pass ownership of a task from one worker +/// to another. +pub(crate) struct TransferStack<T: 'static> { + head: AtomicPtr<Header<T>>, +} + +impl<T: 'static> TransferStack<T> { + pub(crate) fn new() -> TransferStack<T> { + TransferStack { + head: AtomicPtr::new(ptr::null_mut()), + } + } + + pub(crate) fn push(&self, task: Task<T>) { + unsafe { + let task = task.into_raw(); + + let next = (*task.as_ref().queue_next.get()) as usize; + + // At this point, the queue_next field may also be used to track + // whether or not the task must drop the join waker. + debug_assert_eq!(0, next & 1); + + // We don't care about any memory associated w/ setting the `head` + // field, just the current value. + let mut curr = self.head.load(Relaxed); + + loop { + *task.as_ref().queue_next.get() = (next | curr as usize) as *const _; + + let res = + self.head + .compare_exchange(curr, task.as_ptr() as *mut _, Release, Relaxed); + + match res { + Ok(_) => return, + Err(actual) => { + curr = actual; + } + } + } + } + } + + pub(crate) fn drain(&self) -> impl Iterator<Item = Task<T>> { + struct Iter<T: 'static>(*mut Header<T>); + + impl<T: 'static> Iterator for Iter<T> { + type Item = Task<T>; + + fn next(&mut self) -> Option<Task<T>> { + let task = NonNull::new(self.0)?; + + unsafe { + let next = *task.as_ref().queue_next.get() as usize; + + // remove the data bit + self.0 = (next & !1) as *mut _; + + Some(Task::from_raw(task)) + } + } + } + + impl<T: 'static> Drop for Iter<T> { + fn drop(&mut self) { + use std::process; + + if !self.0.is_null() { + // we have bugs + process::abort(); + } + } + } + + let ptr = self.head.swap(ptr::null_mut(), Acquire); + Iter(ptr) + } +} |