summaryrefslogtreecommitdiffstats
path: root/tokio-sync/src/mpsc/bounded.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-sync/src/mpsc/bounded.rs')
-rw-r--r--tokio-sync/src/mpsc/bounded.rs65
1 files changed, 36 insertions, 29 deletions
diff --git a/tokio-sync/src/mpsc/bounded.rs b/tokio-sync/src/mpsc/bounded.rs
index b2fce168..7f8cb905 100644
--- a/tokio-sync/src/mpsc/bounded.rs
+++ b/tokio-sync/src/mpsc/bounded.rs
@@ -1,6 +1,10 @@
use super::chan;
-use futures::{Poll, Sink, StartSend, Stream};
+
use std::fmt;
+use std::task::{Context, Poll};
+
+#[cfg(feature = "async-traits")]
+use std::pin::Pin;
/// Send values to the associated `Receiver`.
///
@@ -127,6 +131,11 @@ impl<T> Receiver<T> {
Receiver { chan }
}
+ /// TODO: Dox
+ pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.chan.recv(cx)
+ }
+
/// Closes the receiving half of a channel, without dropping it.
///
/// This prevents any further messages from being sent on the channel while
@@ -136,12 +145,12 @@ impl<T> Receiver<T> {
}
}
-impl<T> Stream for Receiver<T> {
+#[cfg(feature = "async-traits")]
+impl<T> futures_core::Stream for Receiver<T> {
type Item = T;
- type Error = RecvError;
- fn poll(&mut self) -> Poll<Option<T>, Self::Error> {
- self.chan.recv().map_err(|_| RecvError(()))
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ Receiver::poll_next(self.get_mut(), cx)
}
}
@@ -165,13 +174,13 @@ impl<T> Sender<T> {
///
/// This method returns:
///
- /// - `Ok(Async::Ready(_))` if capacity is reserved for a single message.
- /// - `Ok(Async::NotReady)` if the channel may not have capacity, in which
+ /// - `Poll::Ready(Ok(_))` if capacity is reserved for a single message.
+ /// - `Poll::Pending` if the channel may not have capacity, in which
/// case the current task is queued to be notified once
/// capacity is available;
- /// - `Err(SendError)` if the receiver has been dropped.
- pub fn poll_ready(&mut self) -> Poll<(), SendError> {
- self.chan.poll_ready().map_err(|_| SendError(()))
+ /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
+ pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
+ self.chan.poll_ready(cx).map_err(|_| SendError(()))
}
/// Attempts to send a message on this `Sender`, returning the message
@@ -182,31 +191,29 @@ impl<T> Sender<T> {
}
}
-impl<T> Sink for Sender<T> {
- type SinkItem = T;
- type SinkError = SendError;
+#[cfg(feature = "async-traits")]
+impl<T> async_sink::Sink<T> for Sender<T> {
+ type Error = SendError;
- fn start_send(&mut self, msg: T) -> StartSend<T, Self::SinkError> {
- use futures::Async::*;
- use futures::AsyncSink;
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Sender::poll_ready(self.get_mut(), cx)
+ }
- match self.poll_ready()? {
- Ready(_) => {
- self.try_send(msg).map_err(|_| SendError(()))?;
- Ok(AsyncSink::Ready)
- }
- NotReady => Ok(AsyncSink::NotReady(msg)),
- }
+ fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
+ self.as_mut()
+ .try_send(msg)
+ .map_err(|err| {
+ assert!(err.is_full(), "call `poll_ready` before sending");
+ SendError(())
+ })
}
- fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
- use futures::Async::Ready;
- Ok(Ready(()))
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
}
- fn close(&mut self) -> Poll<(), Self::SinkError> {
- use futures::Async::Ready;
- Ok(Ready(()))
+ fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
}
}