summaryrefslogtreecommitdiffstats
path: root/tokio-sync/src/mpsc/chan.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-sync/src/mpsc/chan.rs')
-rw-r--r--tokio-sync/src/mpsc/chan.rs46
1 files changed, 23 insertions, 23 deletions
diff --git a/tokio-sync/src/mpsc/chan.rs b/tokio-sync/src/mpsc/chan.rs
index fae3159a..93e62824 100644
--- a/tokio-sync/src/mpsc/chan.rs
+++ b/tokio-sync/src/mpsc/chan.rs
@@ -1,13 +1,14 @@
use super::list;
use crate::loom::{
- futures::AtomicTask,
+ futures::AtomicWaker,
sync::atomic::AtomicUsize,
sync::{Arc, CausalCell},
};
-use futures::Poll;
use std::fmt;
use std::process;
use std::sync::atomic::Ordering::{AcqRel, Relaxed};
+use std::task::Poll::{Pending, Ready};
+use std::task::{Context, Poll};
/// Channel sender
pub(crate) struct Tx<T, S: Semaphore> {
@@ -61,7 +62,8 @@ pub(crate) trait Semaphore {
fn add_permit(&self);
- fn poll_acquire(&self, permit: &mut Self::Permit) -> Poll<(), ()>;
+ fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Self::Permit)
+ -> Poll<Result<(), ()>>;
fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>;
@@ -81,8 +83,8 @@ struct Chan<T, S> {
/// Coordinates access to channel's capacity.
semaphore: S,
- /// Receiver task. Notified when a value is pushed into the channel.
- rx_task: AtomicTask,
+ /// Receiver waker. Notified when a value is pushed into the channel.
+ rx_waker: AtomicWaker,
/// Tracks the number of outstanding sender handles.
///
@@ -101,7 +103,7 @@ where
fmt.debug_struct("Chan")
.field("tx", &self.tx)
.field("semaphore", &self.semaphore)
- .field("rx_task", &self.rx_task)
+ .field("rx_waker", &self.rx_waker)
.field("tx_count", &self.tx_count)
.field("rx_fields", &"...")
.finish()
@@ -138,7 +140,7 @@ where
let chan = Arc::new(Chan {
tx,
semaphore,
- rx_task: AtomicTask::new(),
+ rx_waker: AtomicWaker::new(),
tx_count: AtomicUsize::new(1),
rx_fields: CausalCell::new(RxFields {
list: rx,
@@ -163,8 +165,8 @@ where
}
/// TODO: Docs
- pub(crate) fn poll_ready(&mut self) -> Poll<(), ()> {
- self.inner.semaphore.poll_acquire(&mut self.permit)
+ pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
+ self.inner.semaphore.poll_acquire(cx, &mut self.permit)
}
/// Send a message and notify the receiver.
@@ -177,7 +179,7 @@ where
self.inner.tx.push(value);
// Notify the rx task
- self.inner.rx_task.notify();
+ self.inner.rx_waker.wake();
// Release the permit
self.inner.semaphore.forget(&mut self.permit);
@@ -217,7 +219,7 @@ where
self.inner.tx.close();
// Notify the receiver
- self.inner.rx_task.notify();
+ self.inner.rx_waker.wake();
}
}
@@ -246,9 +248,8 @@ where
}
/// Receive the next value
- pub(crate) fn recv(&mut self) -> Poll<Option<T>, ()> {
+ pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
use super::block::Read::*;
- use futures::Async::*;
self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
@@ -258,7 +259,7 @@ where
match rx_fields.list.pop(&self.inner.tx) {
Some(Value(value)) => {
self.inner.semaphore.add_permit();
- return Ok(Ready(Some(value)));
+ return Ready(Some(value));
}
Some(Closed) => {
// TODO: This check may not be required as it most
@@ -268,7 +269,7 @@ where
// which ensures that if dropping the tx handle is
// visible, then all messages sent are also visible.
assert!(self.inner.semaphore.is_idle());
- return Ok(Ready(None));
+ return Ready(None);
}
None => {} // fall through
}
@@ -277,7 +278,7 @@ where
try_recv!();
- self.inner.rx_task.register();
+ self.inner.rx_waker.register_by_ref(cx.waker());
// It is possible that a value was pushed between attempting to read
// and registering the task, so we have to check the channel a
@@ -291,9 +292,9 @@ where
);
if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
- Ok(Ready(None))
+ Ready(None)
} else {
- Ok(NotReady)
+ Pending
}
})
}
@@ -372,8 +373,8 @@ impl Semaphore for (crate::semaphore::Semaphore, usize) {
self.0.available_permits() == self.1
}
- fn poll_acquire(&self, permit: &mut Permit) -> Poll<(), ()> {
- permit.poll_acquire(&self.0).map_err(|_| ())
+ fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Permit) -> Poll<Result<(), ()>> {
+ permit.poll_acquire(cx, &self.0).map_err(|_| ())
}
fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> {
@@ -415,9 +416,8 @@ impl Semaphore for AtomicUsize {
self.load(Acquire) >> 1 == 0
}
- fn poll_acquire(&self, permit: &mut ()) -> Poll<(), ()> {
- use futures::Async::Ready;
- self.try_acquire(permit).map(Ready).map_err(|_| ())
+ fn poll_acquire(&self, _cx: &mut Context<'_>, permit: &mut ()) -> Poll<Result<(), ()>> {
+ Ready(self.try_acquire(permit).map_err(|_| ()))
}
fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> {