diff options
Diffstat (limited to 'tokio/src/io/io/copy.rs')
-rw-r--r-- | tokio/src/io/io/copy.rs | 134 |
1 files changed, 134 insertions, 0 deletions
diff --git a/tokio/src/io/io/copy.rs b/tokio/src/io/io/copy.rs new file mode 100644 index 00000000..88c87630 --- /dev/null +++ b/tokio/src/io/io/copy.rs @@ -0,0 +1,134 @@ +use crate::io::{AsyncRead, AsyncWrite}; + +use futures_core::ready; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A future that asynchronously copies the entire contents of a reader into a +/// writer. +/// +/// This struct is generally created by calling [`copy`][copy]. Please +/// see the documentation of `copy()` for more details. +/// +/// [copy]: fn.copy.html +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Copy<'a, R: ?Sized, W: ?Sized> { + reader: &'a mut R, + read_done: bool, + writer: &'a mut W, + pos: usize, + cap: usize, + amt: u64, + buf: Box<[u8]>, +} + +/// Asynchronously copies the entire contents of a reader into a writer. +/// +/// This function returns a future that will continuously read data from +/// `reader` and then write it into `writer` in a streaming fashion until +/// `reader` returns EOF. +/// +/// On success, the total number of bytes that were copied from +/// `reader` to `writer` is returned. +/// +/// This is an asynchronous version of [`std::io::copy`][std]. +/// +/// # Errors +/// +/// The returned future will finish with an error will return an error +/// immediately if any call to `poll_read` or `poll_write` returns an error. +/// +/// # Examples +/// +/// ``` +/// use tokio::io; +/// +/// # async fn dox() -> std::io::Result<()> { +/// let mut reader: &[u8] = b"hello"; +/// let mut writer: Vec<u8> = vec![]; +/// +/// io::copy(&mut reader, &mut writer).await?; +/// +/// assert_eq!(&b"hello"[..], &writer[..]); +/// # Ok(()) +/// # } +/// ``` +/// +/// [std]: https://doc.rust-lang.org/std/io/fn.copy.html +pub fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> Copy<'a, R, W> +where + R: AsyncRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + Copy { + reader, + read_done: false, + writer, + amt: 0, + pos: 0, + cap: 0, + buf: Box::new([0; 2048]), + } +} + +impl<R, W> Future for Copy<'_, R, W> +where + R: AsyncRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + type Output = io::Result<u64>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { + loop { + // If our buffer is empty, then we need to read some data to + // continue. + if self.pos == self.cap && !self.read_done { + let me = &mut *self; + let n = ready!(Pin::new(&mut *me.reader).poll_read(cx, &mut me.buf))?; + if n == 0 { + self.read_done = true; + } else { + self.pos = 0; + self.cap = n; + } + } + + // If our buffer has some data, let's write it out! + while self.pos < self.cap { + let me = &mut *self; + let i = ready!(Pin::new(&mut *me.writer).poll_write(cx, &me.buf[me.pos..me.cap]))?; + if i == 0 { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::WriteZero, + "write zero byte into writer", + ))); + } else { + self.pos += i; + self.amt += i as u64; + } + } + + // If we've written all the data and we've seen EOF, flush out the + // data and finish the transfer. + if self.pos == self.cap && self.read_done { + let me = &mut *self; + ready!(Pin::new(&mut *me.writer).poll_flush(cx))?; + return Poll::Ready(Ok(self.amt)); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn assert_unpin() { + use std::marker::PhantomPinned; + crate::is_unpin::<Copy<'_, PhantomPinned, PhantomPinned>>(); + } +} |