summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZahari Dichev <zaharidichev@gmail.com>2020-10-19 11:15:25 +0300
committerGitHub <noreply@github.com>2020-10-19 10:15:25 +0200
commit423ecc187a1c68cfd3d7288ce19f4c9a780e2060 (patch)
tree138e0fb9ab0279faa1f24ee7dbe9b2fefa2a39e1
parentfb28caa90c8ab4453b259424c88dc7ec8ff06bbb (diff)
io: add copy_buf (#2884)
-rw-r--r--tokio/src/io/mod.rs3
-rw-r--r--tokio/src/io/util/copy_buf.rs102
-rw-r--r--tokio/src/io/util/mod.rs3
3 files changed, 106 insertions, 2 deletions
diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs
index 62728ac1..9191bbcd 100644
--- a/tokio/src/io/mod.rs
+++ b/tokio/src/io/mod.rs
@@ -232,10 +232,9 @@ cfg_io_util! {
pub use split::{split, ReadHalf, WriteHalf};
pub(crate) mod seek;
-
pub(crate) mod util;
pub use util::{
- copy, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
+ copy, copy_buf, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take,
};
}
diff --git a/tokio/src/io/util/copy_buf.rs b/tokio/src/io/util/copy_buf.rs
new file mode 100644
index 00000000..6831580b
--- /dev/null
+++ b/tokio/src/io/util/copy_buf.rs
@@ -0,0 +1,102 @@
+use crate::io::{AsyncBufRead, AsyncWrite};
+use std::future::Future;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+cfg_io_util! {
+ /// A future that asynchronously copies the entire contents of a reader into a
+ /// writer.
+ ///
+ /// This struct is generally created by calling [`copy_buf`][copy_buf]. Please
+ /// see the documentation of `copy_buf()` for more details.
+ ///
+ /// [copy_buf]: copy_buf()
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ struct CopyBuf<'a, R: ?Sized, W: ?Sized> {
+ reader: &'a mut R,
+ writer: &'a mut W,
+ amt: u64,
+ }
+
+ /// Asynchronously copies the entire contents of a reader into a writer.
+ ///
+ /// This function returns a future that will continuously read data from
+ /// `reader` and then write it into `writer` in a streaming fashion until
+ /// `reader` returns EOF.
+ ///
+ /// On success, the total number of bytes that were copied from `reader` to
+ /// `writer` is returned.
+ ///
+ ///
+ /// # Errors
+ ///
+ /// The returned future will finish with an error will return an error
+ /// immediately if any call to `poll_fill_buf` or `poll_write` returns an
+ /// error.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::io;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let mut reader: &[u8] = b"hello";
+ /// let mut writer: Vec<u8> = vec![];
+ ///
+ /// io::copy_buf(&mut reader, &mut writer).await?;
+ ///
+ /// assert_eq!(b"hello", &writer[..]);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn copy_buf<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> io::Result<u64>
+ where
+ R: AsyncBufRead + Unpin + ?Sized,
+ W: AsyncWrite + Unpin + ?Sized,
+ {
+ CopyBuf {
+ reader,
+ writer,
+ amt: 0,
+ }.await
+ }
+}
+
+impl<R, W> Future for CopyBuf<'_, R, W>
+where
+ R: AsyncBufRead + Unpin + ?Sized,
+ W: AsyncWrite + Unpin + ?Sized,
+{
+ type Output = io::Result<u64>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ loop {
+ let me = &mut *self;
+ let buffer = ready!(Pin::new(&mut *me.reader).poll_fill_buf(cx))?;
+ if buffer.is_empty() {
+ ready!(Pin::new(&mut self.writer).poll_flush(cx))?;
+ return Poll::Ready(Ok(self.amt));
+ }
+
+ let i = ready!(Pin::new(&mut *me.writer).poll_write(cx, buffer))?;
+ if i == 0 {
+ return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into()));
+ }
+ self.amt += i as u64;
+ Pin::new(&mut *self.reader).consume(i);
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn assert_unpin() {
+ use std::marker::PhantomPinned;
+ crate::is_unpin::<CopyBuf<'_, PhantomPinned, PhantomPinned>>();
+ }
+}
diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs
index 36cadb18..c945be0d 100644
--- a/tokio/src/io/util/mod.rs
+++ b/tokio/src/io/util/mod.rs
@@ -27,6 +27,9 @@ cfg_io_util! {
mod copy;
pub use copy::copy;
+ mod copy_buf;
+ pub use copy_buf::copy_buf;
+
mod empty;
pub use empty::{empty, Empty};