summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorAlice Ryhl <alice@ryhl.io>2020-09-08 09:12:32 +0200
committerGitHub <noreply@github.com>2020-09-08 09:12:32 +0200
commit37f405bd3b06921598d298b0ba5b9296656454bf (patch)
tree3098806c15ddae632e5f02706828d060608fea6c /tokio
parent7c254eca446e56bbc41cbc309c2588f2d241f46a (diff)
io: move StreamReader and ReaderStream into tokio_util (#2788)
Co-authored-by: Mikail Bagishov <bagishov.mikail@yandex.ru> Co-authored-by: Eliza Weisman <eliza@buoyant.io>
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/io/mod.rs4
-rw-r--r--tokio/src/io/util/mod.rs5
-rw-r--r--tokio/src/io/util/stream_reader.rs180
-rw-r--r--tokio/tests/stream_reader.rs35
4 files changed, 0 insertions, 224 deletions
diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs
index c43f0e83..c4b4d7d3 100644
--- a/tokio/src/io/mod.rs
+++ b/tokio/src/io/mod.rs
@@ -236,10 +236,6 @@ cfg_io_util! {
copy, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
BufReader, BufStream, BufWriter, DuplexStream, Copy, Empty, Lines, Repeat, Sink, Split, Take,
};
-
- cfg_stream! {
- pub use util::{stream_reader, StreamReader};
- }
}
cfg_not_io_util! {
diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs
index 609ff238..1bd0a3f8 100644
--- a/tokio/src/io/util/mod.rs
+++ b/tokio/src/io/util/mod.rs
@@ -63,11 +63,6 @@ cfg_io_util! {
mod split;
pub use split::Split;
- cfg_stream! {
- mod stream_reader;
- pub use stream_reader::{stream_reader, StreamReader};
- }
-
mod take;
pub use take::Take;
diff --git a/tokio/src/io/util/stream_reader.rs b/tokio/src/io/util/stream_reader.rs
deleted file mode 100644
index 2471197a..00000000
--- a/tokio/src/io/util/stream_reader.rs
+++ /dev/null
@@ -1,180 +0,0 @@
-use crate::io::{AsyncBufRead, AsyncRead, ReadBuf};
-use crate::stream::Stream;
-use bytes::{Buf, BufMut};
-use pin_project_lite::pin_project;
-use std::io;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-
-pin_project! {
- /// Convert a stream of byte chunks into an [`AsyncRead`].
- ///
- /// This type is usually created using the [`stream_reader`] function.
- ///
- /// [`AsyncRead`]: crate::io::AsyncRead
- /// [`stream_reader`]: crate::io::stream_reader
- #[derive(Debug)]
- #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
- #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
- pub struct StreamReader<S, B> {
- #[pin]
- inner: S,
- chunk: Option<B>,
- }
-}
-
-/// Convert a stream of byte chunks into an [`AsyncRead`](crate::io::AsyncRead).
-///
-/// # Example
-///
-/// ```
-/// use bytes::Bytes;
-/// use tokio::io::{stream_reader, AsyncReadExt};
-/// # #[tokio::main]
-/// # async fn main() -> std::io::Result<()> {
-///
-/// // Create a stream from an iterator.
-/// let stream = tokio::stream::iter(vec![
-/// Ok(Bytes::from_static(&[0, 1, 2, 3])),
-/// Ok(Bytes::from_static(&[4, 5, 6, 7])),
-/// Ok(Bytes::from_static(&[8, 9, 10, 11])),
-/// ]);
-///
-/// // Convert it to an AsyncRead.
-/// let mut read = stream_reader(stream);
-///
-/// // Read five bytes from the stream.
-/// let mut buf = [0; 5];
-/// read.read_exact(&mut buf).await?;
-/// assert_eq!(buf, [0, 1, 2, 3, 4]);
-///
-/// // Read the rest of the current chunk.
-/// assert_eq!(read.read(&mut buf).await?, 3);
-/// assert_eq!(&buf[..3], [5, 6, 7]);
-///
-/// // Read the next chunk.
-/// assert_eq!(read.read(&mut buf).await?, 4);
-/// assert_eq!(&buf[..4], [8, 9, 10, 11]);
-///
-/// // We have now reached the end.
-/// assert_eq!(read.read(&mut buf).await?, 0);
-///
-/// # Ok(())
-/// # }
-/// ```
-#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
-#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
-pub fn stream_reader<S, B>(stream: S) -> StreamReader<S, B>
-where
- S: Stream<Item = Result<B, io::Error>>,
- B: Buf,
-{
- StreamReader::new(stream)
-}
-
-impl<S, B> StreamReader<S, B>
-where
- S: Stream<Item = Result<B, io::Error>>,
- B: Buf,
-{
- /// Convert the provided stream into an `AsyncRead`.
- fn new(stream: S) -> Self {
- Self {
- inner: stream,
- chunk: None,
- }
- }
- /// Do we have a chunk and is it non-empty?
- fn has_chunk(self: Pin<&mut Self>) -> bool {
- if let Some(chunk) = self.project().chunk {
- chunk.remaining() > 0
- } else {
- false
- }
- }
-}
-
-impl<S, B> AsyncRead for StreamReader<S, B>
-where
- S: Stream<Item = Result<B, io::Error>>,
- B: Buf,
-{
- fn poll_read(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut ReadBuf<'_>,
- ) -> Poll<io::Result<()>> {
- if buf.remaining() == 0 {
- return Poll::Ready(Ok(()));
- }
-
- let inner_buf = match self.as_mut().poll_fill_buf(cx) {
- Poll::Ready(Ok(buf)) => buf,
- Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
- Poll::Pending => return Poll::Pending,
- };
- let len = std::cmp::min(inner_buf.len(), buf.remaining());
- buf.append(&inner_buf[..len]);
-
- self.consume(len);
- Poll::Ready(Ok(()))
- }
- fn poll_read_buf<BM: BufMut>(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut BM,
- ) -> Poll<io::Result<usize>>
- where
- Self: Sized,
- {
- if !buf.has_remaining_mut() {
- return Poll::Ready(Ok(0));
- }
-
- let inner_buf = match self.as_mut().poll_fill_buf(cx) {
- Poll::Ready(Ok(buf)) => buf,
- Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
- Poll::Pending => return Poll::Pending,
- };
- let len = std::cmp::min(inner_buf.len(), buf.remaining_mut());
- buf.put_slice(&inner_buf[..len]);
-
- self.consume(len);
- Poll::Ready(Ok(len))
- }
-}
-
-impl<S, B> AsyncBufRead for StreamReader<S, B>
-where
- S: Stream<Item = Result<B, io::Error>>,
- B: Buf,
-{
- fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
- loop {
- if self.as_mut().has_chunk() {
- // This unwrap is very sad, but it can't be avoided.
- let buf = self.project().chunk.as_ref().unwrap().bytes();
- return Poll::Ready(Ok(buf));
- } else {
- match self.as_mut().project().inner.poll_next(cx) {
- Poll::Ready(Some(Ok(chunk))) => {
- // Go around the loop in case the chunk is empty.
- *self.as_mut().project().chunk = Some(chunk);
- }
- Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
- Poll::Ready(None) => return Poll::Ready(Ok(&[])),
- Poll::Pending => return Poll::Pending,
- }
- }
- }
- }
- fn consume(self: Pin<&mut Self>, amt: usize) {
- if amt > 0 {
- self.project()
- .chunk
- .as_mut()
- .expect("No chunk present")
- .advance(amt);
- }
- }
-}
diff --git a/tokio/tests/stream_reader.rs b/tokio/tests/stream_reader.rs
deleted file mode 100644
index 8370df4d..00000000
--- a/tokio/tests/stream_reader.rs
+++ /dev/null
@@ -1,35 +0,0 @@
-#![warn(rust_2018_idioms)]
-#![cfg(feature = "full")]
-
-use bytes::Bytes;
-use tokio::io::{stream_reader, AsyncReadExt};
-use tokio::stream::iter;
-
-#[tokio::test]
-async fn test_stream_reader() -> std::io::Result<()> {
- let stream = iter(vec![
- Ok(Bytes::from_static(&[])),
- Ok(Bytes::from_static(&[0, 1, 2, 3])),
- Ok(Bytes::from_static(&[])),
- Ok(Bytes::from_static(&[4, 5, 6, 7])),
- Ok(Bytes::from_static(&[])),
- Ok(Bytes::from_static(&[8, 9, 10, 11])),
- Ok(Bytes::from_static(&[])),
- ]);
-
- let mut read = stream_reader(stream);
-
- let mut buf = [0; 5];
- read.read_exact(&mut buf).await?;
- assert_eq!(buf, [0, 1, 2, 3, 4]);
-
- assert_eq!(read.read(&mut buf).await?, 3);
- assert_eq!(&buf[..3], [5, 6, 7]);
-
- assert_eq!(read.read(&mut buf).await?, 4);
- assert_eq!(&buf[..4], [8, 9, 10, 11]);
-
- assert_eq!(read.read(&mut buf).await?, 0);
-
- Ok(())
-}