diff options
Diffstat (limited to 'tokio-sync/src/task/atomic_task.rs')
-rw-r--r-- | tokio-sync/src/task/atomic_task.rs | 336 |
1 files changed, 0 insertions, 336 deletions
diff --git a/tokio-sync/src/task/atomic_task.rs b/tokio-sync/src/task/atomic_task.rs deleted file mode 100644 index 73110da2..00000000 --- a/tokio-sync/src/task/atomic_task.rs +++ /dev/null @@ -1,336 +0,0 @@ -use crate::loom::{ - futures::task::{self, Task}, - sync::atomic::AtomicUsize, - sync::CausalCell, -}; -use std::fmt; -use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; - -/// A synchronization primitive for task notification. -/// -/// `AtomicTask` will coordinate concurrent notifications with the consumer -/// potentially "updating" the underlying task to notify. This is useful in -/// scenarios where a computation completes in another thread and wants to -/// notify the consumer, but the consumer is in the process of being migrated to -/// a new logical task. -/// -/// Consumers should call `register` before checking the result of a computation -/// and producers should call `notify` after producing the computation (this -/// differs from the usual `thread::park` pattern). It is also permitted for -/// `notify` to be called **before** `register`. This results in a no-op. -/// -/// A single `AtomicTask` may be reused for any number of calls to `register` or -/// `notify`. -/// -/// `AtomicTask` does not provide any memory ordering guarantees, as such the -/// user should use caution and use other synchronization primitives to guard -/// the result of the underlying computation. -pub struct AtomicTask { - state: AtomicUsize, - task: CausalCell<Option<Task>>, -} - -// `AtomicTask` is a multi-consumer, single-producer transfer cell. The cell -// stores a `Task` value produced by calls to `register` and many threads can -// race to take the task (to notify it) by calling `notify. -// -// If a new `Task` instance is produced by calling `register` before an existing -// one is consumed, then the existing one is overwritten. -// -// While `AtomicTask` is single-producer, the implementation ensures memory -// safety. In the event of concurrent calls to `register`, there will be a -// single winner whose task will get stored in the cell. The losers will not -// have their tasks notified. As such, callers should ensure to add -// synchronization to calls to `register`. -// -// The implementation uses a single `AtomicUsize` value to coordinate access to -// the `Task` cell. There are two bits that are operated on independently. These -// are represented by `REGISTERING` and `NOTIFYING`. -// -// The `REGISTERING` bit is set when a producer enters the critical section. The -// `NOTIFYING` bit is set when a consumer enters the critical section. Neither -// bit being set is represented by `WAITING`. -// -// A thread obtains an exclusive lock on the task cell by transitioning the -// state from `WAITING` to `REGISTERING` or `NOTIFYING`, depending on the -// operation the thread wishes to perform. When this transition is made, it is -// guaranteed that no other thread will access the task cell. -// -// # Registering -// -// On a call to `register`, an attempt to transition the state from WAITING to -// REGISTERING is made. On success, the caller obtains a lock on the task cell. -// -// If the lock is obtained, then the thread sets the task cell to the task -// provided as an argument. Then it attempts to transition the state back from -// `REGISTERING` -> `WAITING`. -// -// If this transition is successful, then the registering process is complete -// and the next call to `notify` will observe the task. -// -// If the transition fails, then there was a concurrent call to `notify` that -// was unable to access the task cell (due to the registering thread holding the -// lock). To handle this, the registering thread removes the task it just set -// from the cell and calls `notify` on it. This call to notify represents the -// attempt to notify by the other thread (that set the `NOTIFYING` bit). The -// state is then transitioned from `REGISTERING | NOTIFYING` back to `WAITING`. -// This transition must succeed because, at this point, the state cannot be -// transitioned by another thread. -// -// # Notifying -// -// On a call to `notify`, an attempt to transition the state from `WAITING` to -// `NOTIFYING` is made. On success, the caller obtains a lock on the task cell. -// -// If the lock is obtained, then the thread takes ownership of the current value -// in teh task cell, and calls `notify` on it. The state is then transitioned -// back to `WAITING`. This transition must succeed as, at this point, the state -// cannot be transitioned by another thread. -// -// If the thread is unable to obtain the lock, the `NOTIFYING` bit is still. -// This is because it has either been set by the current thread but the previous -// value included the `REGISTERING` bit **or** a concurrent thread is in the -// `NOTIFYING` critical section. Either way, no action must be taken. -// -// If the current thread is the only concurrent call to `notify` and another -// thread is in the `register` critical section, when the other thread **exits** -// the `register` critical section, it will observe the `NOTIFYING` bit and -// handle the notify itself. -// -// If another thread is in the `notify` critical section, then it will handle -// notifying the task. -// -// # A potential race (is safely handled). -// -// Imagine the following situation: -// -// * Thread A obtains the `notify` lock and notifies a task. -// -// * Before thread A releases the `notify` lock, the notified task is scheduled. -// -// * Thread B attempts to notify the task. In theory this should result in the -// task being notified, but it cannot because thread A still holds the notify -// lock. -// -// This case is handled by requiring users of `AtomicTask` to call `register` -// **before** attempting to observe the application state change that resulted -// in the task being notified. The notifiers also change the application state -// before calling notify. -// -// Because of this, the task will do one of two things. -// -// 1) Observe the application state change that Thread B is notifying on. In -// this case, it is OK for Thread B's notification to be lost. -// -// 2) Call register before attempting to observe the application state. Since -// Thread A still holds the `notify` lock, the call to `register` will result -// in the task notifying itself and get scheduled again. - -/// Idle state -const WAITING: usize = 0; - -/// A new task value is being registered with the `AtomicTask` cell. -const REGISTERING: usize = 0b01; - -/// The task currently registered with the `AtomicTask` cell is being notified. -const NOTIFYING: usize = 0b10; - -impl AtomicTask { - /// Create an `AtomicTask` initialized with the given `Task` - pub fn new() -> AtomicTask { - AtomicTask { - state: AtomicUsize::new(WAITING), - task: CausalCell::new(None), - } - } - - /// Registers the current task to be notified on calls to `notify`. - /// - /// This is the same as calling `register_task` with `task::current()`. - pub fn register(&self) { - self.do_register(CurrentTask); - } - - /// Registers the provided task to be notified on calls to `notify`. - /// - /// The new task will take place of any previous tasks that were registered - /// by previous calls to `register`. Any calls to `notify` that happen after - /// a call to `register` (as defined by the memory ordering rules), will - /// notify the `register` caller's task. - /// - /// It is safe to call `register` with multiple other threads concurrently - /// calling `notify`. This will result in the `register` caller's current - /// task being notified once. - /// - /// This function is safe to call concurrently, but this is generally a bad - /// idea. Concurrent calls to `register` will attempt to register different - /// tasks to be notified. One of the callers will win and have its task set, - /// but there is no guarantee as to which caller will succeed. - pub fn register_task(&self, task: Task) { - self.do_register(ExactTask(task)); - } - - fn do_register<R>(&self, reg: R) - where - R: Register, - { - debug!(" + register_task"); - match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) { - WAITING => { - unsafe { - // Locked acquired, update the waker cell - self.task.with_mut(|t| reg.register(&mut *t)); - - // Release the lock. If the state transitioned to include - // the `NOTIFYING` bit, this means that a notify has been - // called concurrently, so we have to remove the task and - // notify it.` - // - // Start by assuming that the state is `REGISTERING` as this - // is what we jut set it to. - let res = self - .state - .compare_exchange(REGISTERING, WAITING, AcqRel, Acquire); - - match res { - Ok(_) => {} - Err(actual) => { - // This branch can only be reached if a - // concurrent thread called `notify`. In this - // case, `actual` **must** be `REGISTERING | - // `NOTIFYING`. - debug_assert_eq!(actual, REGISTERING | NOTIFYING); - - // Take the task to notify once the atomic operation has - // completed. - let notify = self.task.with_mut(|t| (*t).take()).unwrap(); - - // Just swap, because no one could change state - // while state == `Registering | `Waking` - self.state.swap(WAITING, AcqRel); - - // The atomic swap was complete, now - // notify the task and return. - notify.notify(); - } - } - } - } - NOTIFYING => { - // Currently in the process of notifying the task, i.e., - // `notify` is currently being called on the old task handle. - // So, we call notify on the new task handle - reg.notify(); - } - state => { - // In this case, a concurrent thread is holding the - // "registering" lock. This probably indicates a bug in the - // caller's code as racing to call `register` doesn't make much - // sense. - // - // We just want to maintain memory safety. It is ok to drop the - // call to `register`. - debug_assert!(state == REGISTERING || state == REGISTERING | NOTIFYING); - } - } - } - - /// Notifies the task that last called `register`. - /// - /// If `register` has not been called yet, then this does nothing. - pub fn notify(&self) { - debug!(" + notify"); - if let Some(task) = self.take_task() { - task.notify(); - } - } - - /// Attempts to take the `Task` value out of the `AtomicTask` with the - /// intention that the caller will notify the task later. - pub fn take_task(&self) -> Option<Task> { - debug!(" + take_task"); - // AcqRel ordering is used in order to acquire the value of the `task` - // cell as well as to establish a `release` ordering with whatever - // memory the `AtomicTask` is associated with. - match self.state.fetch_or(NOTIFYING, AcqRel) { - WAITING => { - debug!(" + WAITING"); - // The notifying lock has been acquired. - let task = unsafe { self.task.with_mut(|t| (*t).take()) }; - - // Release the lock - self.state.fetch_and(!NOTIFYING, Release); - debug!(" + Done taking"); - - task - } - state => { - debug!(" + state = {:?}", state); - // There is a concurrent thread currently updating the - // associated task. - // - // Nothing more to do as the `NOTIFYING` bit has been set. It - // doesn't matter if there are concurrent registering threads or - // not. - // - debug_assert!( - state == REGISTERING || state == REGISTERING | NOTIFYING || state == NOTIFYING - ); - None - } - } - } -} - -impl Default for AtomicTask { - fn default() -> Self { - AtomicTask::new() - } -} - -impl fmt::Debug for AtomicTask { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "AtomicTask") - } -} - -unsafe impl Send for AtomicTask {} -unsafe impl Sync for AtomicTask {} - -trait Register { - fn register(self, slot: &mut Option<Task>); - fn notify(self); -} - -struct CurrentTask; - -impl Register for CurrentTask { - fn register(self, slot: &mut Option<Task>) { - let should_update = (&*slot) - .as_ref() - .map(|prev| !prev.will_notify_current()) - .unwrap_or(true); - if should_update { - *slot = Some(task::current()); - } - } - - fn notify(self) { - task::current().notify(); - } -} - -struct ExactTask(Task); - -impl Register for ExactTask { - fn register(self, slot: &mut Option<Task>) { - // When calling register_task with an exact task, it doesn't matter - // if the previous task would have notified current. We *always* want - // to save that exact task. - *slot = Some(self.0); - } - - fn notify(self) { - self.0.notify(); - } -} |