summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/oneshot.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-29 15:11:31 -0700
committerGitHub <noreply@github.com>2019-10-29 15:11:31 -0700
commit2b909d6805990abf0bc2a5dea9e7267ff87df704 (patch)
treede255969c720c294af754b3840efabff3e6d69a0 /tokio/src/sync/oneshot.rs
parentc62ef2d232dea1535a8e22484fa2ca083f03e903 (diff)
sync: move into `tokio` crate (#1705)
A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The sync implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags.
Diffstat (limited to 'tokio/src/sync/oneshot.rs')
-rw-r--r--tokio/src/sync/oneshot.rs576
1 files changed, 576 insertions, 0 deletions
diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs
new file mode 100644
index 00000000..7cf4eb8f
--- /dev/null
+++ b/tokio/src/sync/oneshot.rs
@@ -0,0 +1,576 @@
+//! A channel for sending a single message between asynchronous tasks.
+
+use crate::sync::loom::sync::{atomic::AtomicUsize, Arc, CausalCell};
+
+use futures_core::ready;
+use std::fmt;
+use std::future::Future;
+use std::mem::MaybeUninit;
+use std::pin::Pin;
+use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
+use std::task::Poll::{Pending, Ready};
+use std::task::{Context, Poll, Waker};
+
+/// Sends a value to the associated `Receiver`.
+///
+/// Instances are created by the [`channel`](fn.channel.html) function.
+#[derive(Debug)]
+pub struct Sender<T> {
+ inner: Option<Arc<Inner<T>>>,
+}
+
+/// Receive a value from the associated `Sender`.
+///
+/// Instances are created by the [`channel`](fn.channel.html) function.
+#[derive(Debug)]
+pub struct Receiver<T> {
+ inner: Option<Arc<Inner<T>>>,
+}
+
+pub mod error {
+ //! Oneshot error types
+
+ use std::fmt;
+
+ /// Error returned by the `Future` implementation for `Receiver`.
+ #[derive(Debug)]
+ pub struct RecvError(pub(super) ());
+
+ /// Error returned by the `try_recv` function on `Receiver`.
+ #[derive(Debug)]
+ pub struct TryRecvError(pub(super) ());
+
+ // ===== impl RecvError =====
+
+ impl fmt::Display for RecvError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "channel closed")
+ }
+ }
+
+ impl ::std::error::Error for RecvError {}
+
+ // ===== impl TryRecvError =====
+
+ impl fmt::Display for TryRecvError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "channel closed")
+ }
+ }
+
+ impl ::std::error::Error for TryRecvError {}
+}
+
+use self::error::*;
+
+struct Inner<T> {
+ /// Manages the state of the inner cell
+ state: AtomicUsize,
+
+ /// The value. This is set by `Sender` and read by `Receiver`. The state of
+ /// the cell is tracked by `state`.
+ value: CausalCell<Option<T>>,
+
+ /// The task to notify when the receiver drops without consuming the value.
+ tx_task: CausalCell<MaybeUninit<Waker>>,
+
+ /// The task to notify when the value is sent.
+ rx_task: CausalCell<MaybeUninit<Waker>>,
+}
+
+#[derive(Clone, Copy)]
+struct State(usize);
+
+/// Create a new one-shot channel for sending single values across asynchronous
+/// tasks.
+///
+/// The function returns separate "send" and "receive" handles. The `Sender`
+/// handle is used by the producer to send the value. The `Receiver` handle is
+/// used by the consumer to receive the value.
+///
+/// Each handle can be used on separate tasks.
+///
+/// # Examples
+///
+/// ```
+/// use tokio::sync::oneshot;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let (tx, rx) = oneshot::channel();
+///
+/// tokio::spawn(async move {
+/// if let Err(_) = tx.send(3) {
+/// println!("the receiver dropped");
+/// }
+/// });
+///
+/// match rx.await {
+/// Ok(v) => println!("got = {:?}", v),
+/// Err(_) => println!("the sender dropped"),
+/// }
+/// }
+/// ```
+pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
+ #[allow(deprecated)]
+ let inner = Arc::new(Inner {
+ state: AtomicUsize::new(State::new().as_usize()),
+ value: CausalCell::new(None),
+ tx_task: CausalCell::new(MaybeUninit::uninit()),
+ rx_task: CausalCell::new(MaybeUninit::uninit()),
+ });
+
+ let tx = Sender {
+ inner: Some(inner.clone()),
+ };
+ let rx = Receiver { inner: Some(inner) };
+
+ (tx, rx)
+}
+
+impl<T> Sender<T> {
+ /// Completes this oneshot with a successful result.
+ ///
+ /// The function consumes `self` and notifies the `Receiver` handle that a
+ /// value is ready to be received.
+ ///
+ /// If the value is successfully enqueued for the remote end to receive,
+ /// then `Ok(())` is returned. If the receiving end was dropped before this
+ /// function was called, however, then `Err` is returned with the value
+ /// provided.
+ pub fn send(mut self, t: T) -> Result<(), T> {
+ let inner = self.inner.take().unwrap();
+
+ inner.value.with_mut(|ptr| unsafe {
+ *ptr = Some(t);
+ });
+
+ if !inner.complete() {
+ return Err(inner
+ .value
+ .with_mut(|ptr| unsafe { (*ptr).take() }.unwrap()));
+ }
+
+ Ok(())
+ }
+
+ #[doc(hidden)] // TODO: remove
+ pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
+ let inner = self.inner.as_ref().unwrap();
+
+ let mut state = State::load(&inner.state, Acquire);
+
+ if state.is_closed() {
+ return Poll::Ready(());
+ }
+
+ if state.is_tx_task_set() {
+ let will_notify = unsafe { inner.with_tx_task(|w| w.will_wake(cx.waker())) };
+
+ if !will_notify {
+ state = State::unset_tx_task(&inner.state);
+
+ if state.is_closed() {
+ // Set the flag again so that the waker is released in drop
+ State::set_tx_task(&inner.state);
+ return Ready(());
+ } else {
+ unsafe { inner.drop_tx_task() };
+ }
+ }
+ }
+
+ if !state.is_tx_task_set() {
+ // Attempt to set the task
+ unsafe {
+ inner.set_tx_task(cx);
+ }
+
+ // Update the state
+ state = State::set_tx_task(&inner.state);
+
+ if state.is_closed() {
+ return Ready(());
+ }
+ }
+
+ Pending
+ }
+
+ /// Wait for the associated [`Receiver`] handle to drop.
+ ///
+ /// # Return
+ ///
+ /// Returns a `Future` which must be awaited on.
+ ///
+ /// [`Receiver`]: struct.Receiver.html
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::oneshot;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, rx) = oneshot::channel::<()>();
+ ///
+ /// tokio::spawn(async move {
+ /// drop(rx);
+ /// });
+ ///
+ /// tx.closed().await;
+ /// println!("the receiver dropped");
+ /// }
+ /// ```
+ pub async fn closed(&mut self) {
+ use futures_util::future::poll_fn;
+
+ poll_fn(|cx| self.poll_closed(cx)).await
+ }
+
+ /// Check if the associated [`Receiver`] handle has been dropped.
+ ///
+ /// Unlike [`poll_closed`], this function does not register a task for
+ /// wakeup upon close.
+ ///
+ /// [`Receiver`]: struct.Receiver.html
+ /// [`poll_closed`]: struct.Sender.html#method.poll_closed
+ pub fn is_closed(&self) -> bool {
+ let inner = self.inner.as_ref().unwrap();
+
+ let state = State::load(&inner.state, Acquire);
+ state.is_closed()
+ }
+}
+
+impl<T> Drop for Sender<T> {
+ fn drop(&mut self) {
+ if let Some(inner) = self.inner.as_ref() {
+ inner.complete();
+ }
+ }
+}
+
+impl<T> Receiver<T> {
+ /// Prevent the associated [`Sender`] handle from sending a value.
+ ///
+ /// Any `send` operation which happens after calling `close` is guaranteed
+ /// to fail. After calling `close`, `Receiver::poll`] should be called to
+ /// receive a value if one was sent **before** the call to `close`
+ /// completed.
+ ///
+ /// [`Sender`]: struct.Sender.html
+ pub fn close(&mut self) {
+ let inner = self.inner.as_ref().unwrap();
+ inner.close();
+ }
+
+ /// Attempts to receive a value outside of the context of a task.
+ ///
+ /// Does not register a task if no value has been sent.
+ ///
+ /// A return value of `None` must be considered immediately stale (out of
+ /// date) unless [`close`] has been called first.
+ ///
+ /// Returns an error if the sender was dropped.
+ ///
+ /// [`close`]: #method.close
+ pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
+ let result = if let Some(inner) = self.inner.as_ref() {
+ let state = State::load(&inner.state, Acquire);
+
+ if state.is_complete() {
+ match unsafe { inner.consume_value() } {
+ Some(value) => Ok(value),
+ None => Err(TryRecvError(())),
+ }
+ } else if state.is_closed() {
+ Err(TryRecvError(()))
+ } else {
+ // Not ready, this does not clear `inner`
+ return Err(TryRecvError(()));
+ }
+ } else {
+ panic!("called after complete");
+ };
+
+ self.inner = None;
+ result
+ }
+}
+
+impl<T> Drop for Receiver<T> {
+ fn drop(&mut self) {
+ if let Some(inner) = self.inner.as_ref() {
+ inner.close();
+ }
+ }
+}
+
+impl<T> Future for Receiver<T> {
+ type Output = Result<T, RecvError>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ // If `inner` is `None`, then `poll()` has already completed.
+ let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
+ ready!(inner.poll_recv(cx))?
+ } else {
+ panic!("called after complete");
+ };
+
+ self.inner = None;
+ Ready(Ok(ret))
+ }
+}
+
+impl<T> Inner<T> {
+ fn complete(&self) -> bool {
+ let prev = State::set_complete(&self.state);
+
+ if prev.is_closed() {
+ return false;
+ }
+
+ if prev.is_rx_task_set() {
+ // TODO: Consume waker?
+ unsafe {
+ self.with_rx_task(Waker::wake_by_ref);
+ }
+ }
+
+ true
+ }
+
+ fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
+ // Load the state
+ let mut state = State::load(&self.state, Acquire);
+
+ if state.is_complete() {
+ match unsafe { self.consume_value() } {
+ Some(value) => Ready(Ok(value)),
+ None => Ready(Err(RecvError(()))),
+ }
+ } else if state.is_closed() {
+ Ready(Err(RecvError(())))
+ } else {
+ if state.is_rx_task_set() {
+ let will_notify = unsafe { self.with_rx_task(|w| w.will_wake(cx.waker())) };
+
+ // Check if the task is still the same
+ if !will_notify {
+ // Unset the task
+ state = State::unset_rx_task(&self.state);
+ if state.is_complete() {
+ // Set the flag again so that the waker is released in drop
+ State::set_rx_task(&self.state);
+
+ return match unsafe { self.consume_value() } {
+ Some(value) => Ready(Ok(value)),
+ None => Ready(Err(RecvError(()))),
+ };
+ } else {
+ unsafe { self.drop_rx_task() };
+ }
+ }
+ }
+
+ if !state.is_rx_task_set() {
+ // Attempt to set the task
+ unsafe {
+ self.set_rx_task(cx);
+ }
+
+ // Update the state
+ state = State::set_rx_task(&self.state);
+
+ if state.is_complete() {
+ match unsafe { self.consume_value() } {
+ Some(value) => Ready(Ok(value)),
+ None => Ready(Err(RecvError(()))),
+ }
+ } else {
+ Pending
+ }
+ } else {
+ Pending
+ }
+ }
+ }
+
+ /// Called by `Receiver` to indicate that the value will never be received.
+ fn close(&self) {
+ let prev = State::set_closed(&self.state);
+
+ if prev.is_tx_task_set() && !prev.is_complete() {
+ unsafe {
+ self.with_tx_task(Waker::wake_by_ref);
+ }
+ }
+ }
+
+ /// Consume the value. This function does not check `state`.
+ unsafe fn consume_value(&self) -> Option<T> {
+ self.value.with_mut(|ptr| (*ptr).take())
+ }
+
+ unsafe fn with_rx_task<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(&Waker) -> R,
+ {
+ self.rx_task.with(|ptr| {
+ let waker: *const Waker = (&*ptr).as_ptr();
+ f(&*waker)
+ })
+ }
+
+ unsafe fn with_tx_task<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(&Waker) -> R,
+ {
+ self.tx_task.with(|ptr| {
+ let waker: *const Waker = (&*ptr).as_ptr();
+ f(&*waker)
+ })
+ }
+
+ unsafe fn drop_rx_task(&self) {
+ self.rx_task.with_mut(|ptr| {
+ let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
+ ptr.drop_in_place();
+ });
+ }
+
+ unsafe fn drop_tx_task(&self) {
+ self.tx_task.with_mut(|ptr| {
+ let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
+ ptr.drop_in_place();
+ });
+ }
+
+ unsafe fn set_rx_task(&self, cx: &mut Context<'_>) {
+ self.rx_task.with_mut(|ptr| {
+ let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
+ ptr.write(cx.waker().clone());
+ });
+ }
+
+ unsafe fn set_tx_task(&self, cx: &mut Context<'_>) {
+ self.tx_task.with_mut(|ptr| {
+ let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
+ ptr.write(cx.waker().clone());
+ });
+ }
+}
+
+unsafe impl<T: Send> Send for Inner<T> {}
+unsafe impl<T: Send> Sync for Inner<T> {}
+
+impl<T> Drop for Inner<T> {
+ fn drop(&mut self) {
+ let state = State(*self.state.get_mut());
+
+ if state.is_rx_task_set() {
+ unsafe {
+ self.drop_rx_task();
+ }
+ }
+
+ if state.is_tx_task_set() {
+ unsafe {
+ self.drop_tx_task();
+ }
+ }
+ }
+}
+
+impl<T: fmt::Debug> fmt::Debug for Inner<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ use std::sync::atomic::Ordering::Relaxed;
+
+ fmt.debug_struct("Inner")
+ .field("state", &State::load(&self.state, Relaxed))
+ .finish()
+ }
+}
+
+const RX_TASK_SET: usize = 0b00001;
+const VALUE_SENT: usize = 0b00010;
+const CLOSED: usize = 0b00100;
+const TX_TASK_SET: usize = 0b01000;
+
+impl State {
+ fn new() -> State {
+ State(0)
+ }
+
+ fn is_complete(self) -> bool {
+ self.0 & VALUE_SENT == VALUE_SENT
+ }
+
+ fn set_complete(cell: &AtomicUsize) -> State {
+ // TODO: This could be `Release`, followed by an `Acquire` fence *if*
+ // the `RX_TASK_SET` flag is set. However, `loom` does not support
+ // fences yet.
+ let val = cell.fetch_or(VALUE_SENT, AcqRel);
+ State(val)
+ }
+
+ fn is_rx_task_set(self) -> bool {
+ self.0 & RX_TASK_SET == RX_TASK_SET
+ }
+
+ fn set_rx_task(cell: &AtomicUsize) -> State {
+ let val = cell.fetch_or(RX_TASK_SET, AcqRel);
+ State(val | RX_TASK_SET)
+ }
+
+ fn unset_rx_task(cell: &AtomicUsize) -> State {
+ let val = cell.fetch_and(!RX_TASK_SET, AcqRel);
+ State(val & !RX_TASK_SET)
+ }
+
+ fn is_closed(self) -> bool {
+ self.0 & CLOSED == CLOSED
+ }
+
+ fn set_closed(cell: &AtomicUsize) -> State {
+ // Acquire because we want all later writes (attempting to poll) to be
+ // ordered after this.
+ let val = cell.fetch_or(CLOSED, Acquire);
+ State(val)
+ }
+
+ fn set_tx_task(cell: &AtomicUsize) -> State {
+ let val = cell.fetch_or(TX_TASK_SET, AcqRel);
+ State(val | TX_TASK_SET)
+ }
+
+ fn unset_tx_task(cell: &AtomicUsize) -> State {
+ let val = cell.fetch_and(!TX_TASK_SET, AcqRel);
+ State(val & !TX_TASK_SET)
+ }
+
+ fn is_tx_task_set(self) -> bool {
+ self.0 & TX_TASK_SET == TX_TASK_SET
+ }
+
+ fn as_usize(self) -> usize {
+ self.0
+ }
+
+ fn load(cell: &AtomicUsize, order: Ordering) -> State {
+ let val = cell.load(order);
+ State(val)
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("State")
+ .field("is_complete", &self.is_complete())
+ .field("is_closed", &self.is_closed())
+ .field("is_rx_task_set", &self.is_rx_task_set())
+ .field("is_tx_task_set", &self.is_tx_task_set())
+ .finish()
+ }
+}