summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/mpsc/list.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-29 15:11:31 -0700
committerGitHub <noreply@github.com>2019-10-29 15:11:31 -0700
commit2b909d6805990abf0bc2a5dea9e7267ff87df704 (patch)
treede255969c720c294af754b3840efabff3e6d69a0 /tokio/src/sync/mpsc/list.rs
parentc62ef2d232dea1535a8e22484fa2ca083f03e903 (diff)
sync: move into `tokio` crate (#1705)
A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The sync implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags.
Diffstat (limited to 'tokio/src/sync/mpsc/list.rs')
-rw-r--r--tokio/src/sync/mpsc/list.rs348
1 files changed, 348 insertions, 0 deletions
diff --git a/tokio/src/sync/mpsc/list.rs b/tokio/src/sync/mpsc/list.rs
new file mode 100644
index 00000000..a1295bb0
--- /dev/null
+++ b/tokio/src/sync/mpsc/list.rs
@@ -0,0 +1,348 @@
+//! A concurrent, lock-free, FIFO list.
+
+use crate::sync::loom::{
+ sync::atomic::{AtomicPtr, AtomicUsize},
+ thread,
+};
+use crate::sync::mpsc::block::{self, Block};
+
+use std::fmt;
+use std::ptr::NonNull;
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
+
+/// List queue transmit handle
+pub(crate) struct Tx<T> {
+ /// Tail in the `Block` mpmc list.
+ block_tail: AtomicPtr<Block<T>>,
+
+ /// Position to push the next message. This reference a block and offset
+ /// into the block.
+ tail_position: AtomicUsize,
+}
+
+/// List queue receive handle
+pub(crate) struct Rx<T> {
+ /// Pointer to the block being processed
+ head: NonNull<Block<T>>,
+
+ /// Next slot index to process
+ index: usize,
+
+ /// Pointer to the next block pending release
+ free_head: NonNull<Block<T>>,
+}
+
+pub(crate) fn channel<T>() -> (Tx<T>, Rx<T>) {
+ // Create the initial block shared between the tx and rx halves.
+ let initial_block = Box::new(Block::new(0));
+ let initial_block_ptr = Box::into_raw(initial_block);
+
+ let tx = Tx {
+ block_tail: AtomicPtr::new(initial_block_ptr),
+ tail_position: AtomicUsize::new(0),
+ };
+
+ let head = NonNull::new(initial_block_ptr).unwrap();
+
+ let rx = Rx {
+ head,
+ index: 0,
+ free_head: head,
+ };
+
+ (tx, rx)
+}
+
+impl<T> Tx<T> {
+ /// Push a value into the list.
+ pub(crate) fn push(&self, value: T) {
+ // First, claim a slot for the value. `Acquire` is used here to
+ // synchronize with the `fetch_add` in `reclaim_blocks`.
+ let slot_index = self.tail_position.fetch_add(1, Acquire);
+
+ // Load the current block and write the value
+ let block = self.find_block(slot_index);
+
+ unsafe {
+ // Write the value to the block
+ block.as_ref().write(slot_index, value);
+ }
+ }
+
+ /// Close the send half of the list
+ ///
+ /// Similar process as pushing a value, but instead of writing the value &
+ /// setting the ready flag, the TX_CLOSED flag is set on the block.
+ pub(crate) fn close(&self) {
+ // First, claim a slot for the value. This is the last slot that will be
+ // claimed.
+ let slot_index = self.tail_position.fetch_add(1, Acquire);
+
+ let block = self.find_block(slot_index);
+
+ unsafe { block.as_ref().tx_close() }
+ }
+
+ fn find_block(&self, slot_index: usize) -> NonNull<Block<T>> {
+ // The start index of the block that contains `index`.
+ let start_index = block::start_index(slot_index);
+
+ // The index offset into the block
+ let offset = block::offset(slot_index);
+
+ // Load the current head of the block
+ let mut block_ptr = self.block_tail.load(Acquire);
+
+ let block = unsafe { &*block_ptr };
+
+ // Calculate the distance between the tail ptr and the target block
+ let distance = block.distance(start_index);
+
+ // Decide if this call to `find_block` should attempt to update the
+ // `block_tail` pointer.
+ //
+ // Updating `block_tail` is not always performed in order to reduce
+ // contention.
+ //
+ // When set, as the routine walks the linked list, it attempts to update
+ // `block_tail`. If the update cannot be performed, `try_updating_tail`
+ // is unset.
+ let mut try_updating_tail = distance > offset;
+
+ // Walk the linked list of blocks until the block with `start_index` is
+ // found.
+ loop {
+ let block = unsafe { &(*block_ptr) };
+
+ if block.is_at_index(start_index) {
+ return unsafe { NonNull::new_unchecked(block_ptr) };
+ }
+
+ let next_block = block
+ .load_next(Acquire)
+ // There is no allocated next block, grow the linked list.
+ .unwrap_or_else(|| block.grow());
+
+ // If the block is **not** final, then the tail pointer cannot be
+ // advanced any more.
+ try_updating_tail &= block.is_final();
+
+ if try_updating_tail {
+ // Advancing `block_tail` must happen when walking the linked
+ // list. `block_tail` may not advance passed any blocks that are
+ // not "final". At the point a block is finalized, it is unknown
+ // if there are any prior blocks that are unfinalized, which
+ // makes it impossible to advance `block_tail`.
+ //
+ // While walking the linked list, `block_tail` can be advanced
+ // as long as finalized blocks are traversed.
+ //
+ // Release ordering is used to ensure that any subsequent reads
+ // are able to see the memory pointed to by `block_tail`.
+ //
+ // Acquire is not needed as any "actual" value is not accessed.
+ // At this point, the linked list is walked to acquire blocks.
+ let actual =
+ self.block_tail
+ .compare_and_swap(block_ptr, next_block.as_ptr(), Release);
+
+ if actual == block_ptr {
+ // Synchronize with any senders
+ let tail_position = self.tail_position.fetch_add(0, Release);
+
+ unsafe {
+ block.tx_release(tail_position);
+ }
+ } else {
+ // A concurrent sender is also working on advancing
+ // `block_tail` and this thread is falling behind.
+ //
+ // Stop trying to advance the tail pointer
+ try_updating_tail = false;
+ }
+ }
+
+ block_ptr = next_block.as_ptr();
+
+ thread::yield_now();
+ }
+ }
+
+ pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull<Block<T>>) {
+ debug!("+ reclaim_block({:p})", block);
+ // The block has been removed from the linked list and ownership
+ // is reclaimed.
+ //
+ // Before dropping the block, see if it can be reused by
+ // inserting it back at the end of the linked list.
+ //
+ // First, reset the data
+ block.as_mut().reclaim();
+
+ let mut reused = false;
+
+ // Attempt to insert the block at the end
+ //
+ // Walk at most three times
+ //
+ let curr_ptr = self.block_tail.load(Acquire);
+
+ // The pointer can never be null
+ debug_assert!(!curr_ptr.is_null());
+
+ let mut curr = NonNull::new_unchecked(curr_ptr);
+
+ // TODO: Unify this logic with Block::grow
+ for _ in 0..3 {
+ match curr.as_ref().try_push(&mut block, AcqRel) {
+ Ok(_) => {
+ reused = true;
+ break;
+ }
+ Err(next) => {
+ curr = next;
+ }
+ }
+ }
+
+ if !reused {
+ debug!(" + block freed {:p}", block);
+ let _ = Box::from_raw(block.as_ptr());
+ }
+ }
+}
+
+impl<T> fmt::Debug for Tx<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Tx")
+ .field("block_tail", &self.block_tail.load(Relaxed))
+ .field("tail_position", &self.tail_position.load(Relaxed))
+ .finish()
+ }
+}
+
+impl<T> Rx<T> {
+ /// Pop the next value off the queue
+ pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> {
+ // Advance `head`, if needed
+ if !self.try_advancing_head() {
+ debug!("+ !self.try_advancing_head() -> false");
+ return None;
+ }
+
+ self.reclaim_blocks(tx);
+
+ unsafe {
+ let block = self.head.as_ref();
+
+ let ret = block.read(self.index);
+
+ if let Some(block::Read::Value(..)) = ret {
+ self.index = self.index.wrapping_add(1);
+ }
+
+ ret
+ }
+ }
+
+ /// Try advancing the block pointer to the block referenced by `self.index`.
+ ///
+ /// Returns `true` if successful, `false` if there is no next block to load.
+ fn try_advancing_head(&mut self) -> bool {
+ let block_index = block::start_index(self.index);
+
+ loop {
+ let next_block = {
+ let block = unsafe { self.head.as_ref() };
+
+ if block.is_at_index(block_index) {
+ return true;
+ }
+
+ block.load_next(Acquire)
+ };
+
+ let next_block = match next_block {
+ Some(next_block) => next_block,
+ None => {
+ return false;
+ }
+ };
+
+ self.head = next_block;
+
+ thread::yield_now();
+ }
+ }
+
+ fn reclaim_blocks(&mut self, tx: &Tx<T>) {
+ debug!("+ reclaim_blocks()");
+
+ while self.free_head != self.head {
+ unsafe {
+ // Get a handle to the block that will be freed and update
+ // `free_head` to point to the next block.
+ let block = self.free_head;
+
+ let observed_tail_position = block.as_ref().observed_tail_position();
+
+ let required_index = match observed_tail_position {
+ Some(i) => i,
+ None => return,
+ };
+
+ if required_index > self.index {
+ return;
+ }
+
+ // We may read the next pointer with `Relaxed` ordering as it is
+ // guaranteed that the `reclaim_blocks` routine trails the `recv`
+ // routine. Any memory accessed by `reclaim_blocks` has already
+ // been acquired by `recv`.
+ let next_block = block.as_ref().load_next(Relaxed);
+
+ // Update the free list head
+ self.free_head = next_block.unwrap();
+
+ // Push the emptied block onto the back of the queue, making it
+ // available to senders.
+ tx.reclaim_block(block);
+ }
+
+ thread::yield_now();
+ }
+ }
+
+ /// Effectively `Drop` all the blocks. Should only be called once, when
+ /// the list is dropping.
+ pub(super) unsafe fn free_blocks(&mut self) {
+ debug!("+ free_blocks()");
+ debug_assert_ne!(self.free_head, NonNull::dangling());
+
+ let mut cur = Some(self.free_head);
+
+ #[cfg(debug_assertions)]
+ {
+ // to trigger the debug assert above so as to catch that we
+ // don't call `free_blocks` more than once.
+ self.free_head = NonNull::dangling();
+ self.head = NonNull::dangling();
+ }
+
+ while let Some(block) = cur {
+ cur = block.as_ref().load_next(Relaxed);
+ debug!(" + free: block = {:p}", block);
+ drop(Box::from_raw(block.as_ptr()));
+ }
+ }
+}
+
+impl<T> fmt::Debug for Rx<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Rx")
+ .field("head", &self.head)
+ .field("index", &self.index)
+ .field("free_head", &self.free_head)
+ .finish()
+ }
+}