//! A concurrent, lock-free, FIFO list. use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize}; use crate::loom::thread; use crate::sync::mpsc::block::{self, Block}; use std::fmt; use std::ptr::NonNull; use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; /// List queue transmit handle pub(crate) struct Tx { /// Tail in the `Block` mpmc list. block_tail: AtomicPtr>, /// Position to push the next message. This reference a block and offset /// into the block. tail_position: AtomicUsize, } /// List queue receive handle pub(crate) struct Rx { /// Pointer to the block being processed head: NonNull>, /// Next slot index to process index: usize, /// Pointer to the next block pending release free_head: NonNull>, } pub(crate) fn channel() -> (Tx, Rx) { // Create the initial block shared between the tx and rx halves. let initial_block = Box::new(Block::new(0)); let initial_block_ptr = Box::into_raw(initial_block); let tx = Tx { block_tail: AtomicPtr::new(initial_block_ptr), tail_position: AtomicUsize::new(0), }; let head = NonNull::new(initial_block_ptr).unwrap(); let rx = Rx { head, index: 0, free_head: head, }; (tx, rx) } impl Tx { /// Pushes a value into the list. pub(crate) fn push(&self, value: T) { // First, claim a slot for the value. `Acquire` is used here to // synchronize with the `fetch_add` in `reclaim_blocks`. let slot_index = self.tail_position.fetch_add(1, Acquire); // Load the current block and write the value let block = self.find_block(slot_index); unsafe { // Write the value to the block block.as_ref().write(slot_index, value); } } /// Closes the send half of the list /// /// Similar process as pushing a value, but instead of writing the value & /// setting the ready flag, the TX_CLOSED flag is set on the block. pub(crate) fn close(&self) { // First, claim a slot for the value. This is the last slot that will be // claimed. let slot_index = self.tail_position.fetch_add(1, Acquire); let block = self.find_block(slot_index); unsafe { block.as_ref().tx_close() } } fn find_block(&self, slot_index: usize) -> NonNull> { // The start index of the block that contains `index`. let start_index = block::start_index(slot_index); // The index offset into the block let offset = block::offset(slot_index); // Load the current head of the block let mut block_ptr = self.block_tail.load(Acquire); let block = unsafe { &*block_ptr }; // Calculate the distance between the tail ptr and the target block let distance = block.distance(start_index); // Decide if this call to `find_block` should attempt to update the // `block_tail` pointer. // // Updating `block_tail` is not always performed in order to reduce // contention. // // When set, as the routine walks the linked list, it attempts to update // `block_tail`. If the update cannot be performed, `try_updating_tail` // is unset. let mut try_updating_tail = distance > offset; // Walk the linked list of blocks until the block with `start_index` is // found. loop { let block = unsafe { &(*block_ptr) }; if block.is_at_index(start_index) { return unsafe { NonNull::new_unchecked(block_ptr) }; } let next_block = block .load_next(Acquire) // There is no allocated next block, grow the linked list. .unwrap_or_else(|| block.grow()); // If the block is **not** final, then the tail pointer cannot be // advanced any more. try_updating_tail &= block.is_final(); if try_updating_tail { // Advancing `block_tail` must happen when walking the linked // list. `block_tail` may not advance passed any blocks that are // not "final". At the point a block is finalized, it is unknown // if there are any prior blocks that are unfinalized, which // makes it impossible to advance `block_tail`. // // While walking the linked list, `block_tail` can be advanced // as long as finalized blocks are traversed. // // Release ordering is used to ensure that any subsequent reads // are able to see the memory pointed to by `block_tail`. // // Acquire is not needed as any "actual" value is not accessed. // At this point, the linked list is walked to acquire blocks. let actual = self.block_tail .compare_and_swap(block_ptr, next_block.as_ptr(), Release); if actual == block_ptr { // Synchronize with any senders let tail_position = self.tail_position.fetch_add(0, Release); unsafe { block.tx_release(tail_position); } } else { // A concurrent sender is also working on advancing // `block_tail` and this thread is falling behind. // // Stop trying to advance the tail pointer try_updating_tail = false; } } block_ptr = next_block.as_ptr(); thread::yield_now(); } } pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull>) { // The block has been removed from the linked list and ownership // is reclaimed. // // Before dropping the block, see if it can be reused by // inserting it back at the end of the linked list. // // First, reset the data block.as_mut().reclaim(); let mut reused = false; // Attempt to insert the block at the end // // Walk at most three times // let curr_ptr = self.block_tail.load(Acquire); // The pointer can never be null debug_assert!(!curr_ptr.is_null()); let mut curr = NonNull::new_unchecked(curr_ptr); // TODO: Unify this logic with Block::grow for _ in 0..3 { match curr.as_ref().try_push(&mut block, AcqRel) { Ok(_) => { reused = true; break; } Err(next) => { curr = next; } } } if !reused { let _ = Box::from_raw(block.as_ptr()); } } } impl fmt::Debug for Tx { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Tx") .field("block_tail", &self.block_tail.load(Relaxed)) .field("tail_position", &self.tail_position.load(Relaxed)) .finish() } } impl Rx { /// Pops the next value off the queue pub(crate) fn pop(&mut self, tx: &Tx) -> Option> { // Advance `head`, if needed if !self.try_advancing_head() { return None; } self.reclaim_blocks(tx); unsafe { let block = self.head.as_ref(); let ret = block.read(self.index); if let Some(block::Read::Value(..)) = ret { self.index = self.index.wrapping_add(1); } ret } } /// Tries advancing the block pointer to the block referenced by `self.index`. /// /// Returns `true` if successful, `false` if there is no next block to load. fn try_advancing_head(&mut self) -> bool { let block_index = block::start_index(self.index); loop { let next_block = { let block = unsafe { self.head.as_ref() }; if block.is_at_index(block_index) { return true; } block.load_next(Acquire) }; let next_block = match next_block { Some(next_block) => next_block, None => { return false; } }; self.head = next_block; thread::yield_now(); } } fn reclaim_blocks(&mut self, tx: &Tx) { while self.free_head != self.head { unsafe { // Get a handle to the block that will be freed and update // `free_head` to point to the next block. let block = self.free_head; let observed_tail_position = block.as_ref().observed_tail_position(); let required_index = match observed_tail_position { Some(i) => i, None => return, }; if required_index > self.index { return; } // We may read the next pointer with `Relaxed` ordering as it is // guaranteed that the `reclaim_blocks` routine trails the `recv` // routine. Any memory accessed by `reclaim_blocks` has already // been acquired by `recv`. let next_block = block.as_ref().load_next(Relaxed); // Update the free list head self.free_head = next_block.unwrap(); // Push the emptied block onto the back of the queue, making it // available to senders. tx.reclaim_block(block); } thread::yield_now(); } } /// Effectively `Drop` all the blocks. Should only be called once, when /// the list is dropping. pub(super) unsafe fn free_blocks(&mut self) { debug_assert_ne!(self.free_head, NonNull::dangling()); let mut cur = Some(self.free_head); #[cfg(debug_assertions)] { // to trigger the debug assert above so as to catch that we // don't call `free_blocks` more than once. self.free_head = NonNull::dangling(); self.head = NonNull::dangling(); } while let Some(block) = cur { cur = block.as_ref().load_next(Relaxed); drop(Box::from_raw(block.as_ptr())); } } } impl fmt::Debug for Rx { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Rx") .field("head", &self.head) .field("index", &self.index) .field("free_head", &self.free_head) .finish() } }