diff options
Diffstat (limited to 'tokio/src/executor/thread_pool/queue/global.rs')
-rw-r--r-- | tokio/src/executor/thread_pool/queue/global.rs | 195 |
1 files changed, 195 insertions, 0 deletions
diff --git a/tokio/src/executor/thread_pool/queue/global.rs b/tokio/src/executor/thread_pool/queue/global.rs new file mode 100644 index 00000000..d5e6575f --- /dev/null +++ b/tokio/src/executor/thread_pool/queue/global.rs @@ -0,0 +1,195 @@ +use crate::executor::loom::sync::atomic::AtomicUsize; +use crate::executor::loom::sync::Mutex; +use crate::executor::task::{Header, Task}; + +use std::ptr::{self, NonNull}; +use std::sync::atomic::Ordering::{Acquire, Release}; +use std::usize; + +pub(super) struct Queue<T: 'static> { + /// Pointers to the head and tail of the queue + pointers: Mutex<Pointers<T>>, + + /// Number of pending tasks in the queue. This helps prevent unnecessary + /// locking in the hot path. + /// + /// The LSB is a flag tracking whether or not the queue is open or not. + len: AtomicUsize, +} + +struct Pointers<T: 'static> { + head: *const Header<T>, + tail: *const Header<T>, +} + +const CLOSED: usize = 1; +const MAX_LEN: usize = usize::MAX >> 1; + +impl<T: 'static> Queue<T> { + pub(super) fn new() -> Queue<T> { + Queue { + pointers: Mutex::new(Pointers { + head: ptr::null(), + tail: ptr::null(), + }), + len: AtomicUsize::new(0), + } + } + + pub(super) fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub(super) fn is_closed(&self) -> bool { + self.len.load(Acquire) & CLOSED == CLOSED + } + + /// Close the worker queue + pub(super) fn close(&self) -> bool { + // Acquire the lock + let _p = self.pointers.lock().unwrap(); + + let len = unsafe { + // Set the queue as closed. Because all mutations are synchronized by + // the mutex, a read followed by a write is acceptable. + self.len.unsync_load() + }; + + let ret = len & CLOSED == 0; + + self.len.store(len | CLOSED, Release); + + ret + } + + fn len(&self) -> usize { + self.len.load(Acquire) >> 1 + } + + pub(super) fn wait_for_unlocked(&self) { + // Acquire and release the lock immediately. This synchronizes the + // caller **after** all external waiters are done w/ the scheduler + // struct. + drop(self.pointers.lock().unwrap()); + } + + /// Push a value into the queue and call the closure **while still holding + /// the push lock** + pub(super) fn push<F>(&self, task: Task<T>, f: F) + where + F: FnOnce(Result<(), Task<T>>), + { + unsafe { + // Acquire queue lock + let mut p = self.pointers.lock().unwrap(); + + // Check if the queue is closed. This must happen in the lock. + let len = self.len.unsync_load(); + if len & CLOSED == CLOSED { + f(Err(task)); + return; + } + + let task = task.into_raw(); + + // The next pointer should already be null + debug_assert!(get_next(task).is_null()); + + if let Some(tail) = NonNull::new(p.tail as *mut _) { + set_next(tail, task.as_ptr()); + } else { + p.head = task.as_ptr(); + } + + p.tail = task.as_ptr(); + + // Increment the count. + // + // All updates to the len atomic are guarded by the mutex. As such, + // a non-atomic load followed by a store is safe. + // + // We increment by 2 to avoid touching the shutdown flag + if (len >> 1) == MAX_LEN { + eprintln!("[ERROR] overflowed task counter. This is a bug and should be reported."); + std::process::abort(); + } + + self.len.store(len + 2, Release); + f(Ok(())); + } + } + + pub(super) fn push_batch(&self, batch_head: Task<T>, batch_tail: Task<T>, num: usize) { + unsafe { + let batch_head = batch_head.into_raw().as_ptr(); + let batch_tail = batch_tail.into_raw(); + + debug_assert!(get_next(batch_tail).is_null()); + + let mut p = self.pointers.lock().unwrap(); + + if let Some(tail) = NonNull::new(p.tail as *mut _) { + set_next(tail, batch_head); + } else { + p.head = batch_head; + } + + p.tail = batch_tail.as_ptr(); + + // Increment the count. + // + // All updates to the len atomic are guarded by the mutex. As such, + // a non-atomic load followed by a store is safe. + // + // Left shift by 1 to avoid touching the shutdown flag. + let len = self.len.unsync_load(); + + if (len >> 1) >= (MAX_LEN - num) { + std::process::abort(); + } + + self.len.store(len + (num << 1), Release); + } + } + + pub(super) fn pop(&self) -> Option<Task<T>> { + // Fast path, if len == 0, then there are no values + if self.is_empty() { + return None; + } + + unsafe { + let mut p = self.pointers.lock().unwrap(); + + // It is possible to hit null here if another thread poped the last + // task between us checking `len` and acquiring the lock. + let task = NonNull::new(p.head as *mut _)?; + + p.head = get_next(task); + + if p.head.is_null() { + p.tail = ptr::null(); + } + + set_next(task, ptr::null()); + + // Decrement the count. + // + // All updates to the len atomic are guarded by the mutex. As such, + // a non-atomic load followed by a store is safe. + // + // Decrement by 2 to avoid touching the shutdown flag + self.len.store(self.len.unsync_load() - 2, Release); + + Some(Task::from_raw(task)) + } + } +} + +unsafe fn get_next<T>(meta: NonNull<Header<T>>) -> *const Header<T> { + *meta.as_ref().queue_next.get() +} + +unsafe fn set_next<T>(meta: NonNull<Header<T>>, val: *const Header<T>) { + *meta.as_ref().queue_next.get() = val; +} |