diff options
Diffstat (limited to 'tokio-sync/src/mpsc/unbounded.rs')
-rw-r--r-- | tokio-sync/src/mpsc/unbounded.rs | 44 |
1 files changed, 26 insertions, 18 deletions
diff --git a/tokio-sync/src/mpsc/unbounded.rs b/tokio-sync/src/mpsc/unbounded.rs index 58967c91..960bee41 100644 --- a/tokio-sync/src/mpsc/unbounded.rs +++ b/tokio-sync/src/mpsc/unbounded.rs @@ -1,7 +1,11 @@ use super::chan; use crate::loom::sync::atomic::AtomicUsize; -use futures::{Poll, Sink, StartSend, Stream}; + use std::fmt; +use std::task::{Context, Poll}; + +#[cfg(feature = "async-traits")] +use std::pin::Pin; /// Send values to the associated `UnboundedReceiver`. /// @@ -83,6 +87,11 @@ impl<T> UnboundedReceiver<T> { UnboundedReceiver { chan } } + /// TODO: dox + pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.chan.recv(cx) + } + /// Closes the receiving half of a channel, without dropping it. /// /// This prevents any further messages from being sent on the channel while @@ -92,12 +101,12 @@ impl<T> UnboundedReceiver<T> { } } -impl<T> Stream for UnboundedReceiver<T> { +#[cfg(feature = "async-traits")] +impl<T> futures_core::Stream for UnboundedReceiver<T> { type Item = T; - type Error = UnboundedRecvError; - fn poll(&mut self) -> Poll<Option<T>, Self::Error> { - self.chan.recv().map_err(|_| UnboundedRecvError(())) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.chan.recv(cx) } } @@ -113,25 +122,24 @@ impl<T> UnboundedSender<T> { } } -impl<T> Sink for UnboundedSender<T> { - type SinkItem = T; - type SinkError = UnboundedSendError; +#[cfg(feature = "async-traits")] +impl<T> async_sink::Sink<T> for UnboundedSender<T> { + type Error = UnboundedSendError; - fn start_send(&mut self, msg: T) -> StartSend<T, Self::SinkError> { - use futures::AsyncSink; + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } - self.try_send(msg).map_err(|_| UnboundedSendError(()))?; - Ok(AsyncSink::Ready) + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + self.try_send(msg).map_err(|_| UnboundedSendError(())) } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - use futures::Async::Ready; - Ok(Ready(())) + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) } - fn close(&mut self) -> Poll<(), Self::SinkError> { - use futures::Async::Ready; - Ok(Ready(())) + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) } } |