summaryrefslogtreecommitdiffstats
path: root/tokio-sync/src/oneshot.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-sync/src/oneshot.rs')
-rw-r--r--tokio-sync/src/oneshot.rs90
1 files changed, 41 insertions, 49 deletions
diff --git a/tokio-sync/src/oneshot.rs b/tokio-sync/src/oneshot.rs
index d3531bd9..c38bb0ce 100644
--- a/tokio-sync/src/oneshot.rs
+++ b/tokio-sync/src/oneshot.rs
@@ -1,15 +1,15 @@
//! A channel for sending a single message between asynchronous tasks.
-use crate::loom::{
- futures::task::{self, Task},
- sync::atomic::AtomicUsize,
- sync::CausalCell,
-};
-use futures::{Async, Future, Poll};
+use crate::loom::{sync::atomic::AtomicUsize, sync::CausalCell};
+
use std::fmt;
+use std::future::Future;
use std::mem::{self, ManuallyDrop};
+use std::pin::Pin;
use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
use std::sync::Arc;
+use std::task::Poll::{Pending, Ready};
+use std::task::{Context, Poll, Waker};
/// Sends a value to the associated `Receiver`.
///
@@ -82,10 +82,10 @@ struct Inner<T> {
value: CausalCell<Option<T>>,
/// The task to notify when the receiver drops without consuming the value.
- tx_task: CausalCell<ManuallyDrop<Task>>,
+ tx_task: CausalCell<ManuallyDrop<Waker>>,
/// The task to notify when the value is sent.
- rx_task: CausalCell<ManuallyDrop<Task>>,
+ rx_task: CausalCell<ManuallyDrop<Waker>>,
}
#[derive(Clone, Copy)]
@@ -167,33 +167,33 @@ impl<T> Sender<T> {
///
/// # Return values
///
- /// If `Ok(Ready)` is returned then the associated `Receiver` has been
+ /// If `Ready(Ok(_))` is returned then the associated `Receiver` has been
/// dropped, which means any work required for sending should be canceled.
///
- /// If `Ok(NotReady)` is returned then the associated `Receiver` is still
+ /// If `Pending` is returned then the associated `Receiver` is still
/// alive and may be able to receive a message if sent. The current task is
/// registered to receive a notification if the `Receiver` handle goes away.
///
/// [`Receiver`]: struct.Receiver.html
- pub fn poll_close(&mut self) -> Poll<(), ()> {
+ pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> {
let inner = self.inner.as_ref().unwrap();
let mut state = State::load(&inner.state, Acquire);
if state.is_closed() {
- return Ok(Async::Ready(()));
+ return Poll::Ready(());
}
if state.is_tx_task_set() {
let will_notify = inner
.tx_task
- .with(|ptr| unsafe { (&*ptr).will_notify_current() });
+ .with(|ptr| unsafe { (&*ptr).will_wake(cx.waker()) });
if !will_notify {
state = State::unset_tx_task(&inner.state);
if state.is_closed() {
- return Ok(Async::Ready(()));
+ return Ready(());
} else {
unsafe { inner.drop_tx_task() };
}
@@ -203,18 +203,18 @@ impl<T> Sender<T> {
if !state.is_tx_task_set() {
// Attempt to set the task
unsafe {
- inner.set_tx_task();
+ inner.set_tx_task(cx);
}
// Update the state
state = State::set_tx_task(&inner.state);
if state.is_closed() {
- return Ok(Async::Ready(()));
+ return Ready(());
}
}
- Ok(Async::NotReady)
+ Pending
}
/// Check if the associated [`Receiver`] handle has been dropped.
@@ -297,25 +297,18 @@ impl<T> Drop for Receiver<T> {
}
impl<T> Future for Receiver<T> {
- type Item = T;
- type Error = RecvError;
-
- fn poll(&mut self) -> Poll<T, RecvError> {
- use futures::Async::{NotReady, Ready};
+ type Output = Result<T, RecvError>;
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// If `inner` is `None`, then `poll()` has already completed.
- let ret = if let Some(inner) = self.inner.as_ref() {
- match inner.poll_recv() {
- Ok(Ready(v)) => Ok(Ready(v)),
- Ok(NotReady) => return Ok(NotReady),
- Err(e) => Err(e),
- }
+ let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
+ ready!(inner.poll_recv(cx))?
} else {
panic!("called after complete");
};
self.inner = None;
- ret
+ Ready(Ok(ret))
}
}
@@ -328,30 +321,29 @@ impl<T> Inner<T> {
}
if prev.is_rx_task_set() {
- self.rx_task.with(|ptr| unsafe { (&*ptr).notify() });
+ // TODO: Consume waker?
+ self.rx_task.with(|ptr| unsafe { (&*ptr).wake_by_ref() });
}
true
}
- fn poll_recv(&self) -> Poll<T, RecvError> {
- use futures::Async::{NotReady, Ready};
-
+ fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
// Load the state
let mut state = State::load(&self.state, Acquire);
if state.is_complete() {
match unsafe { self.consume_value() } {
- Some(value) => Ok(Ready(value)),
- None => Err(RecvError(())),
+ Some(value) => Ready(Ok(value)),
+ None => Ready(Err(RecvError(()))),
}
} else if state.is_closed() {
- Err(RecvError(()))
+ Ready(Err(RecvError(())))
} else {
if state.is_rx_task_set() {
let will_notify = self
.rx_task
- .with(|ptr| unsafe { (&*ptr).will_notify_current() });
+ .with(|ptr| unsafe { (&*ptr).will_wake(cx.waker()) });
// Check if the task is still the same
if !will_notify {
@@ -359,8 +351,8 @@ impl<T> Inner<T> {
state = State::unset_rx_task(&self.state);
if state.is_complete() {
return match unsafe { self.consume_value() } {
- Some(value) => Ok(Ready(value)),
- None => Err(RecvError(())),
+ Some(value) => Ready(Ok(value)),
+ None => Ready(Err(RecvError(()))),
};
} else {
unsafe { self.drop_rx_task() };
@@ -371,7 +363,7 @@ impl<T> Inner<T> {
if !state.is_rx_task_set() {
// Attempt to set the task
unsafe {
- self.set_rx_task();
+ self.set_rx_task(cx);
}
// Update the state
@@ -379,14 +371,14 @@ impl<T> Inner<T> {
if state.is_complete() {
match unsafe { self.consume_value() } {
- Some(value) => Ok(Ready(value)),
- None => Err(RecvError(())),
+ Some(value) => Ready(Ok(value)),
+ None => Ready(Err(RecvError(()))),
}
} else {
- return Ok(NotReady);
+ return Pending;
}
} else {
- return Ok(NotReady);
+ return Pending;
}
}
}
@@ -396,7 +388,7 @@ impl<T> Inner<T> {
let prev = State::set_closed(&self.state);
if prev.is_tx_task_set() && !prev.is_complete() {
- self.tx_task.with(|ptr| unsafe { (&*ptr).notify() });
+ self.tx_task.with(|ptr| unsafe { (&*ptr).wake_by_ref() });
}
}
@@ -413,14 +405,14 @@ impl<T> Inner<T> {
self.tx_task.with_mut(|ptr| ManuallyDrop::drop(&mut *ptr))
}
- unsafe fn set_rx_task(&self) {
+ unsafe fn set_rx_task(&self, cx: &mut Context<'_>) {
self.rx_task
- .with_mut(|ptr| *ptr = ManuallyDrop::new(task::current()));
+ .with_mut(|ptr| *ptr = ManuallyDrop::new(cx.waker().clone()));
}
- unsafe fn set_tx_task(&self) {
+ unsafe fn set_tx_task(&self, cx: &mut Context<'_>) {
self.tx_task
- .with_mut(|ptr| *ptr = ManuallyDrop::new(task::current()));
+ .with_mut(|ptr| *ptr = ManuallyDrop::new(cx.waker().clone()));
}
}