diff options
Diffstat (limited to 'tokio-sync/src/mpsc/bounded.rs')
-rw-r--r-- | tokio-sync/src/mpsc/bounded.rs | 65 |
1 files changed, 36 insertions, 29 deletions
diff --git a/tokio-sync/src/mpsc/bounded.rs b/tokio-sync/src/mpsc/bounded.rs index b2fce168..7f8cb905 100644 --- a/tokio-sync/src/mpsc/bounded.rs +++ b/tokio-sync/src/mpsc/bounded.rs @@ -1,6 +1,10 @@ use super::chan; -use futures::{Poll, Sink, StartSend, Stream}; + use std::fmt; +use std::task::{Context, Poll}; + +#[cfg(feature = "async-traits")] +use std::pin::Pin; /// Send values to the associated `Receiver`. /// @@ -127,6 +131,11 @@ impl<T> Receiver<T> { Receiver { chan } } + /// TODO: Dox + pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.chan.recv(cx) + } + /// Closes the receiving half of a channel, without dropping it. /// /// This prevents any further messages from being sent on the channel while @@ -136,12 +145,12 @@ impl<T> Receiver<T> { } } -impl<T> Stream for Receiver<T> { +#[cfg(feature = "async-traits")] +impl<T> futures_core::Stream for Receiver<T> { type Item = T; - type Error = RecvError; - fn poll(&mut self) -> Poll<Option<T>, Self::Error> { - self.chan.recv().map_err(|_| RecvError(())) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + Receiver::poll_next(self.get_mut(), cx) } } @@ -165,13 +174,13 @@ impl<T> Sender<T> { /// /// This method returns: /// - /// - `Ok(Async::Ready(_))` if capacity is reserved for a single message. - /// - `Ok(Async::NotReady)` if the channel may not have capacity, in which + /// - `Poll::Ready(Ok(_))` if capacity is reserved for a single message. + /// - `Poll::Pending` if the channel may not have capacity, in which /// case the current task is queued to be notified once /// capacity is available; - /// - `Err(SendError)` if the receiver has been dropped. - pub fn poll_ready(&mut self) -> Poll<(), SendError> { - self.chan.poll_ready().map_err(|_| SendError(())) + /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { + self.chan.poll_ready(cx).map_err(|_| SendError(())) } /// Attempts to send a message on this `Sender`, returning the message @@ -182,31 +191,29 @@ impl<T> Sender<T> { } } -impl<T> Sink for Sender<T> { - type SinkItem = T; - type SinkError = SendError; +#[cfg(feature = "async-traits")] +impl<T> async_sink::Sink<T> for Sender<T> { + type Error = SendError; - fn start_send(&mut self, msg: T) -> StartSend<T, Self::SinkError> { - use futures::Async::*; - use futures::AsyncSink; + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Sender::poll_ready(self.get_mut(), cx) + } - match self.poll_ready()? { - Ready(_) => { - self.try_send(msg).map_err(|_| SendError(()))?; - Ok(AsyncSink::Ready) - } - NotReady => Ok(AsyncSink::NotReady(msg)), - } + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + self.as_mut() + .try_send(msg) + .map_err(|err| { + assert!(err.is_full(), "call `poll_ready` before sending"); + SendError(()) + }) } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - use futures::Async::Ready; - Ok(Ready(())) + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) } - fn close(&mut self) -> Poll<(), Self::SinkError> { - use futures::Async::Ready; - Ok(Ready(())) + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) } } |