diff options
author | Zahari Dichev <zaharidichev@gmail.com> | 2020-10-19 11:15:25 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-19 10:15:25 +0200 |
commit | 423ecc187a1c68cfd3d7288ce19f4c9a780e2060 (patch) | |
tree | 138e0fb9ab0279faa1f24ee7dbe9b2fefa2a39e1 | |
parent | fb28caa90c8ab4453b259424c88dc7ec8ff06bbb (diff) |
io: add copy_buf (#2884)
-rw-r--r-- | tokio/src/io/mod.rs | 3 | ||||
-rw-r--r-- | tokio/src/io/util/copy_buf.rs | 102 | ||||
-rw-r--r-- | tokio/src/io/util/mod.rs | 3 |
3 files changed, 106 insertions, 2 deletions
diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 62728ac1..9191bbcd 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -232,10 +232,9 @@ cfg_io_util! { pub use split::{split, ReadHalf, WriteHalf}; pub(crate) mod seek; - pub(crate) mod util; pub use util::{ - copy, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, + copy, copy_buf, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take, }; } diff --git a/tokio/src/io/util/copy_buf.rs b/tokio/src/io/util/copy_buf.rs new file mode 100644 index 00000000..6831580b --- /dev/null +++ b/tokio/src/io/util/copy_buf.rs @@ -0,0 +1,102 @@ +use crate::io::{AsyncBufRead, AsyncWrite}; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +cfg_io_util! { + /// A future that asynchronously copies the entire contents of a reader into a + /// writer. + /// + /// This struct is generally created by calling [`copy_buf`][copy_buf]. Please + /// see the documentation of `copy_buf()` for more details. + /// + /// [copy_buf]: copy_buf() + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + struct CopyBuf<'a, R: ?Sized, W: ?Sized> { + reader: &'a mut R, + writer: &'a mut W, + amt: u64, + } + + /// Asynchronously copies the entire contents of a reader into a writer. + /// + /// This function returns a future that will continuously read data from + /// `reader` and then write it into `writer` in a streaming fashion until + /// `reader` returns EOF. + /// + /// On success, the total number of bytes that were copied from `reader` to + /// `writer` is returned. + /// + /// + /// # Errors + /// + /// The returned future will finish with an error will return an error + /// immediately if any call to `poll_fill_buf` or `poll_write` returns an + /// error. + /// + /// # Examples + /// + /// ``` + /// use tokio::io; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut reader: &[u8] = b"hello"; + /// let mut writer: Vec<u8> = vec![]; + /// + /// io::copy_buf(&mut reader, &mut writer).await?; + /// + /// assert_eq!(b"hello", &writer[..]); + /// # Ok(()) + /// # } + /// ``` + pub async fn copy_buf<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> io::Result<u64> + where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, + { + CopyBuf { + reader, + writer, + amt: 0, + }.await + } +} + +impl<R, W> Future for CopyBuf<'_, R, W> +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + type Output = io::Result<u64>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + loop { + let me = &mut *self; + let buffer = ready!(Pin::new(&mut *me.reader).poll_fill_buf(cx))?; + if buffer.is_empty() { + ready!(Pin::new(&mut self.writer).poll_flush(cx))?; + return Poll::Ready(Ok(self.amt)); + } + + let i = ready!(Pin::new(&mut *me.writer).poll_write(cx, buffer))?; + if i == 0 { + return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into())); + } + self.amt += i as u64; + Pin::new(&mut *self.reader).consume(i); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn assert_unpin() { + use std::marker::PhantomPinned; + crate::is_unpin::<CopyBuf<'_, PhantomPinned, PhantomPinned>>(); + } +} diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index 36cadb18..c945be0d 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -27,6 +27,9 @@ cfg_io_util! { mod copy; pub use copy::copy; + mod copy_buf; + pub use copy_buf::copy_buf; + mod empty; pub use empty::{empty, Empty}; |