summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/park/thread.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/runtime/park/thread.rs')
-rw-r--r--tokio/src/runtime/park/thread.rs279
1 files changed, 279 insertions, 0 deletions
diff --git a/tokio/src/runtime/park/thread.rs b/tokio/src/runtime/park/thread.rs
new file mode 100644
index 00000000..beebe2a4
--- /dev/null
+++ b/tokio/src/runtime/park/thread.rs
@@ -0,0 +1,279 @@
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::loom::sync::{Arc, Condvar, Mutex};
+use crate::runtime::park::{Park, Unpark};
+
+use std::marker::PhantomData;
+use std::rc::Rc;
+use std::sync::atomic::Ordering;
+use std::time::Duration;
+
+/// Blocks the current thread using a condition variable.
+///
+/// Implements the [`Park`] functionality by using a condition variable. An
+/// atomic variable is also used to avoid using the condition variable if
+/// possible.
+///
+/// The condition variable is cached in a thread-local variable and is shared
+/// across all `ParkThread` instances created on the same thread. This also
+/// means that an instance of `ParkThread` might be unblocked by a handle
+/// associated with a different `ParkThread` instance.
+#[derive(Debug)]
+pub(crate) struct CachedParkThread {
+ _anchor: PhantomData<Rc<()>>,
+}
+
+pub(crate) struct ParkThread {
+ inner: Arc<Inner>,
+}
+
+/// Error returned by [`ParkThread`]
+///
+/// This currently is never returned, but might at some point in the future.
+///
+/// [`ParkThread`]: struct.ParkThread.html
+#[derive(Debug)]
+pub(crate) struct ParkError {
+ _p: (),
+}
+
+/// Unblocks a thread that was blocked by `ParkThread`.
+#[derive(Clone, Debug)]
+pub(crate) struct UnparkThread {
+ inner: Arc<Inner>,
+}
+
+#[derive(Debug)]
+struct Inner {
+ state: AtomicUsize,
+ mutex: Mutex<()>,
+ condvar: Condvar,
+}
+
+const IDLE: usize = 0;
+const NOTIFY: usize = 1;
+const SLEEP: usize = 2;
+
+thread_local! {
+ static CURRENT_PARKER: ParkThread = ParkThread::new();
+}
+
+// ==== impl ParkThread ====
+
+impl ParkThread {
+ pub(crate) fn new() -> Self {
+ Self {
+ inner: Arc::new(Inner {
+ state: AtomicUsize::new(IDLE),
+ mutex: Mutex::new(()),
+ condvar: Condvar::new(),
+ }),
+ }
+ }
+}
+
+impl Park for ParkThread {
+ type Unpark = UnparkThread;
+ type Error = ParkError;
+
+ fn unpark(&self) -> Self::Unpark {
+ let inner = self.inner.clone();
+ UnparkThread { inner }
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.inner.park(None)
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.inner.park(Some(duration))
+ }
+}
+
+// ==== impl Inner ====
+
+impl Inner {
+ /// Park the current thread for at most `dur`.
+ fn park(&self, timeout: Option<Duration>) -> Result<(), ParkError> {
+ // If currently notified, then we skip sleeping. This is checked outside
+ // of the lock to avoid acquiring a mutex if not necessary.
+ match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
+ NOTIFY => return Ok(()),
+ IDLE => {}
+ _ => unreachable!(),
+ }
+
+ // The state is currently idle, so obtain the lock and then try to
+ // transition to a sleeping state.
+ let mut m = self.mutex.lock().unwrap();
+
+ // Transition to sleeping
+ match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) {
+ NOTIFY => {
+ // Notified before we could sleep, consume the notification and
+ // exit
+ self.state.store(IDLE, Ordering::SeqCst);
+ return Ok(());
+ }
+ IDLE => {}
+ _ => unreachable!(),
+ }
+
+ m = match timeout {
+ Some(timeout) => self.condvar.wait_timeout(m, timeout).unwrap().0,
+ None => self.condvar.wait(m).unwrap(),
+ };
+
+ // Transition back to idle. If the state has transitioned to `NOTIFY`,
+ // this will consume that notification
+ self.state.store(IDLE, Ordering::SeqCst);
+
+ // Explicitly drop the mutex guard. There is no real point in doing it
+ // except that I find it helpful to make it explicit where we want the
+ // mutex to unlock.
+ drop(m);
+
+ Ok(())
+ }
+
+ fn unpark(&self) {
+ // First, try transitioning from IDLE -> NOTIFY, this does not require a
+ // lock.
+ match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) {
+ IDLE | NOTIFY => return,
+ SLEEP => {}
+ _ => unreachable!(),
+ }
+
+ // The other half is sleeping, this requires a lock
+ let _m = self.mutex.lock().unwrap();
+
+ // Transition to NOTIFY
+ match self.state.swap(NOTIFY, Ordering::SeqCst) {
+ SLEEP => {}
+ NOTIFY => return,
+ IDLE => return,
+ _ => unreachable!(),
+ }
+
+ // Wakeup the sleeper
+ self.condvar.notify_one();
+ }
+}
+
+// ===== impl ParkThread =====
+
+impl CachedParkThread {
+ /// Create a new `ParkThread` handle for the current thread.
+ ///
+ /// This type cannot be moved to other threads, so it should be created on
+ /// the thread that the caller intends to park.
+ #[cfg(feature = "blocking")]
+ pub(crate) fn new() -> CachedParkThread {
+ CachedParkThread {
+ _anchor: PhantomData,
+ }
+ }
+
+ /// Get a reference to the `ParkThread` handle for this thread.
+ fn with_current<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(&ParkThread) -> R,
+ {
+ CURRENT_PARKER.with(|inner| f(inner))
+ }
+}
+
+impl Park for CachedParkThread {
+ type Unpark = UnparkThread;
+ type Error = ParkError;
+
+ fn unpark(&self) -> Self::Unpark {
+ self.with_current(|park_thread| park_thread.unpark())
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.with_current(|park_thread| park_thread.inner.park(None))?;
+ Ok(())
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.with_current(|park_thread| park_thread.inner.park(Some(duration)))?;
+ Ok(())
+ }
+}
+
+impl Default for ParkThread {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+// ===== impl UnparkThread =====
+
+impl Unpark for UnparkThread {
+ fn unpark(&self) {
+ self.inner.unpark();
+ }
+}
+
+#[cfg(feature = "blocking")]
+mod waker {
+ use super::{Inner, UnparkThread};
+ use crate::loom::sync::Arc;
+
+ use std::mem;
+ use std::task::{RawWaker, RawWakerVTable, Waker};
+
+ impl UnparkThread {
+ pub(crate) fn into_waker(self) -> Waker {
+ unsafe {
+ let raw = unparker_to_raw_waker(self.inner);
+ Waker::from_raw(raw)
+ }
+ }
+ }
+
+ impl Inner {
+ #[allow(clippy::wrong_self_convention)]
+ fn into_raw(this: Arc<Inner>) -> *const () {
+ Arc::into_raw(this) as *const ()
+ }
+
+ unsafe fn from_raw(ptr: *const ()) -> Arc<Inner> {
+ Arc::from_raw(ptr as *const Inner)
+ }
+ }
+
+ unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker {
+ RawWaker::new(
+ Inner::into_raw(unparker),
+ &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker),
+ )
+ }
+
+ unsafe fn clone(raw: *const ()) -> RawWaker {
+ let unparker = Inner::from_raw(raw);
+
+ // Increment the ref count
+ mem::forget(unparker.clone());
+
+ unparker_to_raw_waker(unparker)
+ }
+
+ unsafe fn drop_waker(raw: *const ()) {
+ let _ = Inner::from_raw(raw);
+ }
+
+ unsafe fn wake(raw: *const ()) {
+ let unparker = Inner::from_raw(raw);
+ unparker.unpark();
+ }
+
+ unsafe fn wake_by_ref(raw: *const ()) {
+ let unparker = Inner::from_raw(raw);
+ unparker.unpark();
+
+ // We don't actually own a reference to the unparker
+ mem::forget(unparker);
+ }
+}