summaryrefslogtreecommitdiffstats
path: root/tokio-sync/src/mpsc/unbounded.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-sync/src/mpsc/unbounded.rs')
-rw-r--r--tokio-sync/src/mpsc/unbounded.rs44
1 files changed, 26 insertions, 18 deletions
diff --git a/tokio-sync/src/mpsc/unbounded.rs b/tokio-sync/src/mpsc/unbounded.rs
index 58967c91..960bee41 100644
--- a/tokio-sync/src/mpsc/unbounded.rs
+++ b/tokio-sync/src/mpsc/unbounded.rs
@@ -1,7 +1,11 @@
use super::chan;
use crate::loom::sync::atomic::AtomicUsize;
-use futures::{Poll, Sink, StartSend, Stream};
+
use std::fmt;
+use std::task::{Context, Poll};
+
+#[cfg(feature = "async-traits")]
+use std::pin::Pin;
/// Send values to the associated `UnboundedReceiver`.
///
@@ -83,6 +87,11 @@ impl<T> UnboundedReceiver<T> {
UnboundedReceiver { chan }
}
+ /// TODO: dox
+ pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.chan.recv(cx)
+ }
+
/// Closes the receiving half of a channel, without dropping it.
///
/// This prevents any further messages from being sent on the channel while
@@ -92,12 +101,12 @@ impl<T> UnboundedReceiver<T> {
}
}
-impl<T> Stream for UnboundedReceiver<T> {
+#[cfg(feature = "async-traits")]
+impl<T> futures_core::Stream for UnboundedReceiver<T> {
type Item = T;
- type Error = UnboundedRecvError;
- fn poll(&mut self) -> Poll<Option<T>, Self::Error> {
- self.chan.recv().map_err(|_| UnboundedRecvError(()))
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.chan.recv(cx)
}
}
@@ -113,25 +122,24 @@ impl<T> UnboundedSender<T> {
}
}
-impl<T> Sink for UnboundedSender<T> {
- type SinkItem = T;
- type SinkError = UnboundedSendError;
+#[cfg(feature = "async-traits")]
+impl<T> async_sink::Sink<T> for UnboundedSender<T> {
+ type Error = UnboundedSendError;
- fn start_send(&mut self, msg: T) -> StartSend<T, Self::SinkError> {
- use futures::AsyncSink;
+ fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
- self.try_send(msg).map_err(|_| UnboundedSendError(()))?;
- Ok(AsyncSink::Ready)
+ fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
+ self.try_send(msg).map_err(|_| UnboundedSendError(()))
}
- fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
- use futures::Async::Ready;
- Ok(Ready(()))
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
}
- fn close(&mut self) -> Poll<(), Self::SinkError> {
- use futures::Async::Ready;
- Ok(Ready(()))
+ fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
}
}