From 423ecc187a1c68cfd3d7288ce19f4c9a780e2060 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Mon, 19 Oct 2020 11:15:25 +0300 Subject: io: add copy_buf (#2884) --- tokio/src/io/mod.rs | 3 +- tokio/src/io/util/copy_buf.rs | 102 ++++++++++++++++++++++++++++++++++++++++++ tokio/src/io/util/mod.rs | 3 ++ 3 files changed, 106 insertions(+), 2 deletions(-) create mode 100644 tokio/src/io/util/copy_buf.rs diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 62728ac1..9191bbcd 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -232,10 +232,9 @@ cfg_io_util! { pub use split::{split, ReadHalf, WriteHalf}; pub(crate) mod seek; - pub(crate) mod util; pub use util::{ - copy, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, + copy, copy_buf, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take, }; } diff --git a/tokio/src/io/util/copy_buf.rs b/tokio/src/io/util/copy_buf.rs new file mode 100644 index 00000000..6831580b --- /dev/null +++ b/tokio/src/io/util/copy_buf.rs @@ -0,0 +1,102 @@ +use crate::io::{AsyncBufRead, AsyncWrite}; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +cfg_io_util! { + /// A future that asynchronously copies the entire contents of a reader into a + /// writer. + /// + /// This struct is generally created by calling [`copy_buf`][copy_buf]. Please + /// see the documentation of `copy_buf()` for more details. + /// + /// [copy_buf]: copy_buf() + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + struct CopyBuf<'a, R: ?Sized, W: ?Sized> { + reader: &'a mut R, + writer: &'a mut W, + amt: u64, + } + + /// Asynchronously copies the entire contents of a reader into a writer. + /// + /// This function returns a future that will continuously read data from + /// `reader` and then write it into `writer` in a streaming fashion until + /// `reader` returns EOF. + /// + /// On success, the total number of bytes that were copied from `reader` to + /// `writer` is returned. + /// + /// + /// # Errors + /// + /// The returned future will finish with an error will return an error + /// immediately if any call to `poll_fill_buf` or `poll_write` returns an + /// error. + /// + /// # Examples + /// + /// ``` + /// use tokio::io; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut reader: &[u8] = b"hello"; + /// let mut writer: Vec = vec![]; + /// + /// io::copy_buf(&mut reader, &mut writer).await?; + /// + /// assert_eq!(b"hello", &writer[..]); + /// # Ok(()) + /// # } + /// ``` + pub async fn copy_buf<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> io::Result + where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, + { + CopyBuf { + reader, + writer, + amt: 0, + }.await + } +} + +impl Future for CopyBuf<'_, R, W> +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + let me = &mut *self; + let buffer = ready!(Pin::new(&mut *me.reader).poll_fill_buf(cx))?; + if buffer.is_empty() { + ready!(Pin::new(&mut self.writer).poll_flush(cx))?; + return Poll::Ready(Ok(self.amt)); + } + + let i = ready!(Pin::new(&mut *me.writer).poll_write(cx, buffer))?; + if i == 0 { + return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into())); + } + self.amt += i as u64; + Pin::new(&mut *self.reader).consume(i); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn assert_unpin() { + use std::marker::PhantomPinned; + crate::is_unpin::>(); + } +} diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index 36cadb18..c945be0d 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -27,6 +27,9 @@ cfg_io_util! { mod copy; pub use copy::copy; + mod copy_buf; + pub use copy_buf::copy_buf; + mod empty; pub use empty::{empty, Empty}; -- cgit v1.2.3