diff options
Diffstat (limited to 'tokio-sync/src/mpsc/chan.rs')
-rw-r--r-- | tokio-sync/src/mpsc/chan.rs | 46 |
1 files changed, 23 insertions, 23 deletions
diff --git a/tokio-sync/src/mpsc/chan.rs b/tokio-sync/src/mpsc/chan.rs index fae3159a..93e62824 100644 --- a/tokio-sync/src/mpsc/chan.rs +++ b/tokio-sync/src/mpsc/chan.rs @@ -1,13 +1,14 @@ use super::list; use crate::loom::{ - futures::AtomicTask, + futures::AtomicWaker, sync::atomic::AtomicUsize, sync::{Arc, CausalCell}, }; -use futures::Poll; use std::fmt; use std::process; use std::sync::atomic::Ordering::{AcqRel, Relaxed}; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll}; /// Channel sender pub(crate) struct Tx<T, S: Semaphore> { @@ -61,7 +62,8 @@ pub(crate) trait Semaphore { fn add_permit(&self); - fn poll_acquire(&self, permit: &mut Self::Permit) -> Poll<(), ()>; + fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Self::Permit) + -> Poll<Result<(), ()>>; fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>; @@ -81,8 +83,8 @@ struct Chan<T, S> { /// Coordinates access to channel's capacity. semaphore: S, - /// Receiver task. Notified when a value is pushed into the channel. - rx_task: AtomicTask, + /// Receiver waker. Notified when a value is pushed into the channel. + rx_waker: AtomicWaker, /// Tracks the number of outstanding sender handles. /// @@ -101,7 +103,7 @@ where fmt.debug_struct("Chan") .field("tx", &self.tx) .field("semaphore", &self.semaphore) - .field("rx_task", &self.rx_task) + .field("rx_waker", &self.rx_waker) .field("tx_count", &self.tx_count) .field("rx_fields", &"...") .finish() @@ -138,7 +140,7 @@ where let chan = Arc::new(Chan { tx, semaphore, - rx_task: AtomicTask::new(), + rx_waker: AtomicWaker::new(), tx_count: AtomicUsize::new(1), rx_fields: CausalCell::new(RxFields { list: rx, @@ -163,8 +165,8 @@ where } /// TODO: Docs - pub(crate) fn poll_ready(&mut self) -> Poll<(), ()> { - self.inner.semaphore.poll_acquire(&mut self.permit) + pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> { + self.inner.semaphore.poll_acquire(cx, &mut self.permit) } /// Send a message and notify the receiver. @@ -177,7 +179,7 @@ where self.inner.tx.push(value); // Notify the rx task - self.inner.rx_task.notify(); + self.inner.rx_waker.wake(); // Release the permit self.inner.semaphore.forget(&mut self.permit); @@ -217,7 +219,7 @@ where self.inner.tx.close(); // Notify the receiver - self.inner.rx_task.notify(); + self.inner.rx_waker.wake(); } } @@ -246,9 +248,8 @@ where } /// Receive the next value - pub(crate) fn recv(&mut self) -> Poll<Option<T>, ()> { + pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { use super::block::Read::*; - use futures::Async::*; self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; @@ -258,7 +259,7 @@ where match rx_fields.list.pop(&self.inner.tx) { Some(Value(value)) => { self.inner.semaphore.add_permit(); - return Ok(Ready(Some(value))); + return Ready(Some(value)); } Some(Closed) => { // TODO: This check may not be required as it most @@ -268,7 +269,7 @@ where // which ensures that if dropping the tx handle is // visible, then all messages sent are also visible. assert!(self.inner.semaphore.is_idle()); - return Ok(Ready(None)); + return Ready(None); } None => {} // fall through } @@ -277,7 +278,7 @@ where try_recv!(); - self.inner.rx_task.register(); + self.inner.rx_waker.register_by_ref(cx.waker()); // It is possible that a value was pushed between attempting to read // and registering the task, so we have to check the channel a @@ -291,9 +292,9 @@ where ); if rx_fields.rx_closed && self.inner.semaphore.is_idle() { - Ok(Ready(None)) + Ready(None) } else { - Ok(NotReady) + Pending } }) } @@ -372,8 +373,8 @@ impl Semaphore for (crate::semaphore::Semaphore, usize) { self.0.available_permits() == self.1 } - fn poll_acquire(&self, permit: &mut Permit) -> Poll<(), ()> { - permit.poll_acquire(&self.0).map_err(|_| ()) + fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Permit) -> Poll<Result<(), ()>> { + permit.poll_acquire(cx, &self.0).map_err(|_| ()) } fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> { @@ -415,9 +416,8 @@ impl Semaphore for AtomicUsize { self.load(Acquire) >> 1 == 0 } - fn poll_acquire(&self, permit: &mut ()) -> Poll<(), ()> { - use futures::Async::Ready; - self.try_acquire(permit).map(Ready).map_err(|_| ()) + fn poll_acquire(&self, _cx: &mut Context<'_>, permit: &mut ()) -> Poll<Result<(), ()>> { + Ready(self.try_acquire(permit).map_err(|_| ())) } fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> { |