summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-06-26 14:42:19 -0700
committerGitHub <noreply@github.com>2019-06-26 14:42:19 -0700
commit11f6b2862fc458204aabbed9a6f919c65215aeb5 (patch)
tree544c19e0b13a21f3eeb66e2eff910f37511c2b71 /tokio
parent8404f796ac99504ac9fbbce898e78bb02f05804a (diff)
tokio: move I/O helpers to ext traits (#1204)
Refs: #1203
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/io/async_read_ext.rs64
-rw-r--r--tokio/src/io/async_write_ext.rs24
-rw-r--r--tokio/src/io/copy.rs38
-rw-r--r--tokio/src/io/mod.rs21
-rw-r--r--tokio/src/io/read.rs2
-rw-r--r--tokio/src/io/read_exact.rs12
-rw-r--r--tokio/src/io/write.rs2
-rw-r--r--tokio/src/lib.rs9
-rw-r--r--tokio/tests/io.rs158
-rw-r--r--tokio/tests/io_copy.rs71
-rw-r--r--tokio/tests/io_read.rs41
-rw-r--r--tokio/tests/io_read_exact.rs46
-rw-r--r--tokio/tests/io_write.rs48
13 files changed, 322 insertions, 214 deletions
diff --git a/tokio/src/io/async_read_ext.rs b/tokio/src/io/async_read_ext.rs
new file mode 100644
index 00000000..cca0906a
--- /dev/null
+++ b/tokio/src/io/async_read_ext.rs
@@ -0,0 +1,64 @@
+use crate::io::copy::{copy, Copy};
+use crate::io::read::{read, Read};
+use crate::io::read_exact::{read_exact, ReadExact};
+
+use tokio_io::{AsyncRead, AsyncWrite};
+
+/// An extension trait which adds utility methods to `AsyncRead` types.
+pub trait AsyncReadExt: AsyncRead {
+
+ /// Copy all data from `self` into the provided `AsyncWrite`.
+ ///
+ /// The returned future will copy all the bytes read from `reader` into the
+ /// `writer` specified. This future will only complete once the `reader`
+ /// has hit EOF and all bytes have been written to and flushed from the
+ /// `writer` provided.
+ ///
+ /// On success the number of bytes is returned and the `reader` and `writer`
+ /// are consumed. On error the error is returned and the I/O objects are
+ /// consumed as well.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// unimplemented!();
+ /// ```
+ fn copy<'a, W>(&'a mut self, dst: &'a mut W) -> Copy<'a, Self, W>
+ where
+ Self: Unpin,
+ W: AsyncWrite + Unpin + ?Sized,
+ {
+ copy(self, dst)
+ }
+
+ /// Read data into the provided buffer.
+ ///
+ /// The returned future will resolve to the number of bytes read once the
+ /// read operation is completed.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// unimplemented!();
+ /// ```
+ fn read<'a>(&'a mut self, dst: &'a mut [u8]) -> Read<'a, Self>
+ where Self: Unpin,
+ {
+ read(self, dst)
+ }
+
+ /// Read exactly the amount of data needed to fill the provided buffer.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// unimplemented!();
+ /// ```
+ fn read_exact<'a>(&'a mut self, dst: &'a mut [u8]) -> ReadExact<'a, Self>
+ where Self: Unpin,
+ {
+ read_exact(self, dst)
+ }
+}
+
+impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
diff --git a/tokio/src/io/async_write_ext.rs b/tokio/src/io/async_write_ext.rs
new file mode 100644
index 00000000..759dac99
--- /dev/null
+++ b/tokio/src/io/async_write_ext.rs
@@ -0,0 +1,24 @@
+use crate::io::write::{write, Write};
+
+use tokio_io::AsyncWrite;
+
+/// An extension trait which adds utility methods to `AsyncWrite` types.
+pub trait AsyncWriteExt: AsyncWrite {
+ /// Write the provided data into `self`.
+ ///
+ /// The returned future will resolve to the number of bytes written once the
+ /// write operation is completed.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// unimplemented!();
+ /// ````
+ fn write<'a>(&'a mut self, src: &'a [u8]) -> Write<'a, Self>
+ where Self: Unpin,
+ {
+ write(self, src)
+ }
+}
+
+impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
diff --git a/tokio/src/io/copy.rs b/tokio/src/io/copy.rs
index b407c193..cc8e71dd 100644
--- a/tokio/src/io/copy.rs
+++ b/tokio/src/io/copy.rs
@@ -4,23 +4,8 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
-macro_rules! ready {
- ($e:expr) => {
- match $e {
- ::std::task::Poll::Ready(t) => t,
- ::std::task::Poll::Pending => return ::std::task::Poll::Pending,
- }
- };
-}
-
-/// A future which will copy all data from a reader into a writer.
-///
-/// Created by the [`copy`] function, this future will resolve to the number of
-/// bytes copied or an error if one happens.
-///
-/// [`copy`]: fn.copy.html
#[derive(Debug)]
-pub struct Copy<'a, R, W> {
+pub struct Copy<'a, R: ?Sized, W: ?Sized> {
reader: &'a mut R,
read_done: bool,
writer: &'a mut W,
@@ -30,21 +15,10 @@ pub struct Copy<'a, R, W> {
buf: Box<[u8]>,
}
-/// Creates a future which represents copying all the bytes from one object to
-/// another.
-///
-/// The returned future will copy all the bytes read from `reader` into the
-/// `writer` specified. This future will only complete once the `reader` has hit
-/// EOF and all bytes have been written to and flushed from the `writer`
-/// provided.
-///
-/// On success the number of bytes is returned and the `reader` and `writer` are
-/// consumed. On error the error is returned and the I/O objects are consumed as
-/// well.
-pub fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> Copy<'a, R, W>
+pub(crate) fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> Copy<'a, R, W>
where
- R: AsyncRead + Unpin,
- W: AsyncWrite + Unpin,
+ R: AsyncRead + Unpin + ?Sized,
+ W: AsyncWrite + Unpin + ?Sized,
{
Copy {
reader,
@@ -59,8 +33,8 @@ where
impl<'a, R, W> Future for Copy<'a, R, W>
where
- R: AsyncRead + Unpin,
- W: AsyncWrite + Unpin,
+ R: AsyncRead + Unpin + ?Sized,
+ W: AsyncWrite + Unpin + ?Sized,
{
type Output = io::Result<u64>;
diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs
index 9b3d854f..20730811 100644
--- a/tokio/src/io/mod.rs
+++ b/tokio/src/io/mod.rs
@@ -36,22 +36,21 @@
//! [`ErrorKind`]: enum.ErrorKind.html
//! [`Result`]: type.Result.html
-pub use tokio_io::{AsyncRead, AsyncWrite};
+mod async_read_ext;
+mod async_write_ext;
+mod copy;
+mod read;
+mod write;
+mod read_exact;
+
+pub use self::async_read_ext::AsyncReadExt;
+pub use self::async_write_ext::AsyncWriteExt;
// standard input, output, and error
#[cfg(feature = "fs")]
pub use tokio_fs::{stderr, stdin, stdout, Stderr, Stdin, Stdout};
+pub use tokio_io::{AsyncRead, AsyncWrite};
// Re-export io::Error so that users don't have to deal
// with conflicts when `use`ing `futures::io` and `std::io`.
pub use std::io::{Error, ErrorKind, Result};
-
-mod copy;
-mod read;
-mod write;
-mod read_exact;
-
-pub use self::copy::{copy, Copy};
-pub use self::read::{read, Read};
-pub use self::write::{write, Write};
-pub use self::read_exact::{read_exact, ReadExact};
diff --git a/tokio/src/io/read.rs b/tokio/src/io/read.rs
index 57137571..320c5edd 100644
--- a/tokio/src/io/read.rs
+++ b/tokio/src/io/read.rs
@@ -10,7 +10,7 @@ use tokio_io::AsyncRead;
///
/// The returned future will resolve to both the I/O stream and the buffer
/// as well as the number of bytes read once the read operation is completed.
-pub fn read<'a, R>(reader: &'a mut R, buf: &'a mut [u8]) -> Read<'a, R>
+pub(crate) fn read<'a, R>(reader: &'a mut R, buf: &'a mut [u8]) -> Read<'a, R>
where
R: AsyncRead + Unpin + ?Sized,
{
diff --git a/tokio/src/io/read_exact.rs b/tokio/src/io/read_exact.rs
index e8a66a47..c8b2f884 100644
--- a/tokio/src/io/read_exact.rs
+++ b/tokio/src/io/read_exact.rs
@@ -5,23 +5,13 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_io::AsyncRead;
-macro_rules! ready {
- ($e:expr) => {
- match $e {
- ::std::task::Poll::Ready(t) => t,
- ::std::task::Poll::Pending => return ::std::task::Poll::Pending,
- }
- };
-}
-
-
/// A future which can be used to easily read exactly enough bytes to fill
/// a buffer.
///
/// Created by the [`read_exact`] function.
///
/// [`read_exact`]: fn.read_exact.html
-pub fn read_exact<'a, A>(reader: &'a mut A, buf: &'a mut[u8]) -> ReadExact<'a, A>
+pub(crate) fn read_exact<'a, A>(reader: &'a mut A, buf: &'a mut[u8]) -> ReadExact<'a, A>
where
A: AsyncRead + Unpin + ?Sized
{
diff --git a/tokio/src/io/write.rs b/tokio/src/io/write.rs
index 424f478d..2ccc61b5 100644
--- a/tokio/src/io/write.rs
+++ b/tokio/src/io/write.rs
@@ -14,7 +14,7 @@ pub struct Write<'a, W: ?Sized> {
/// Tries to write some bytes from the given `buf` to the writer in an
/// asynchronous manner, returning a future.
-pub fn write<'a, W>(writer: &'a mut W, buf: &'a [u8]) -> Write<'a, W>
+pub(crate) fn write<'a, W>(writer: &'a mut W, buf: &'a [u8]) -> Write<'a, W>
where
W: AsyncWrite + Unpin + ?Sized,
{
diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs
index 89f05133..d554ab00 100644
--- a/tokio/src/lib.rs
+++ b/tokio/src/lib.rs
@@ -74,6 +74,15 @@ macro_rules! if_runtime {
)*)
}
+macro_rules! ready {
+ ($e:expr) => {
+ match $e {
+ ::std::task::Poll::Ready(t) => t,
+ ::std::task::Poll::Pending => return ::std::task::Poll::Pending,
+ }
+ };
+}
+
#[cfg(feature = "timer")]
pub mod clock;
#[cfg(feature = "codec")]
diff --git a/tokio/tests/io.rs b/tokio/tests/io.rs
index 5bf52d60..8224c598 100644
--- a/tokio/tests/io.rs
+++ b/tokio/tests/io.rs
@@ -8,162 +8,4 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio_test::assert_ready_ok;
use tokio_test::task::MockTask;
-#[test]
-fn write() {
- struct Wr(BytesMut);
- impl AsyncWrite for Wr {
- fn poll_write(
- mut self: Pin<&mut Self>,
- _cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- self.0.extend(buf);
- Ok(buf.len()).into()
- }
-
- fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- Ok(()).into()
- }
-
- fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- Ok(()).into()
- }
- }
-
- let mut task = MockTask::new();
-
- task.enter(|cx| {
- let mut wr = Wr(BytesMut::with_capacity(64));
-
- let write = tokio::io::write(&mut wr, "hello world".as_bytes());
- pin_mut!(write);
-
- let n = assert_ready_ok!(write.poll(cx));
- assert_eq!(n, 11);
- });
-}
-
-#[test]
-fn read() {
- struct Rd;
-
- impl AsyncRead for Rd {
- fn poll_read(
- self: Pin<&mut Self>,
- _cx: &mut Context<'_>,
- buf: &mut [u8],
- ) -> Poll<io::Result<usize>> {
- buf[0..11].copy_from_slice(b"hello world");
- Poll::Ready(Ok(11))
- }
- }
-
- let mut buf = Box::new([0; 11]);
- let mut task = MockTask::new();
-
- task.enter(|cx| {
- let mut rd = Rd;
-
- let read = tokio::io::read(&mut rd, &mut buf[..]);
- pin_mut!(read);
-
- let n = assert_ready_ok!(read.poll(cx));
- assert_eq!(n, 11);
- assert_eq!(buf[..], b"hello world"[..]);
- });
-}
-
-#[test]
-fn copy() {
- struct Rd(bool);
-
- impl AsyncRead for Rd {
- fn poll_read(
- mut self: Pin<&mut Self>,
- _cx: &mut Context<'_>,
- buf: &mut [u8],
- ) -> Poll<io::Result<usize>> {
- if self.0 {
- buf[0..11].copy_from_slice(b"hello world");
- self.0 = false;
- Poll::Ready(Ok(11))
- } else {
- Poll::Ready(Ok(0))
- }
- }
- }
-
- struct Wr(BytesMut);
-
- impl Unpin for Wr {}
- impl AsyncWrite for Wr {
- fn poll_write(
- mut self: Pin<&mut Self>,
- _cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- self.0.extend(buf);
- Ok(buf.len()).into()
- }
-
- fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- Ok(()).into()
- }
-
- fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- Ok(()).into()
- }
- }
-
- let buf = BytesMut::with_capacity(64);
- let mut task = MockTask::new();
-
- task.enter(|cx| {
- let mut rd = Rd(true);
- let mut wr = Wr(buf);
-
- let copy = tokio::io::copy(&mut rd, &mut wr);
- pin_mut!(copy);
-
- let n = assert_ready_ok!(copy.poll(cx));
-
- assert_eq!(n, 11);
- assert_eq!(wr.0[..], b"hello world"[..]);
- });
-}
-
-#[test]
-fn read_exact() {
- struct Rd {
- val: &'static [u8; 11],
- }
-
- impl AsyncRead for Rd {
- fn poll_read(
- mut self: Pin<&mut Self>,
- _cx: &mut Context<'_>,
- buf: &mut [u8]
- ) -> Poll<io::Result<usize>> {
- let me = &mut *self;
- let len = buf.len();
-
- buf[..].copy_from_slice(&me.val[..len]);
- Poll::Ready(Ok(buf.len()))
- }
- }
-
- let mut buf = Box::new([0; 8]);
- let mut task = MockTask::new();
-
- task.enter(|cx| {
- let mut rd = Rd { val: b"hello world" };
-
- let read = tokio::io::read_exact(&mut rd, &mut buf[..]);
- pin_mut!(read);
-
- let n = assert_ready_ok!(read.poll(cx));
- assert_eq!(n, 8);
- assert_eq!(buf[..], b"hello wo"[..]);
- });
-}
diff --git a/tokio/tests/io_copy.rs b/tokio/tests/io_copy.rs
new file mode 100644
index 00000000..ee972681
--- /dev/null
+++ b/tokio/tests/io_copy.rs
@@ -0,0 +1,71 @@
+#![deny(warnings, rust_2018_idioms)]
+
+use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt};
+use tokio_test::assert_ready_ok;
+use tokio_test::task::MockTask;
+
+use bytes::BytesMut;
+use pin_utils::pin_mut;
+use std::future::Future;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+#[test]
+fn copy() {
+ struct Rd(bool);
+
+ impl AsyncRead for Rd {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ if self.0 {
+ buf[0..11].copy_from_slice(b"hello world");
+ self.0 = false;
+ Poll::Ready(Ok(11))
+ } else {
+ Poll::Ready(Ok(0))
+ }
+ }
+ }
+
+ struct Wr(BytesMut);
+
+ impl Unpin for Wr {}
+ impl AsyncWrite for Wr {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.0.extend(buf);
+ Ok(buf.len()).into()
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Ok(()).into()
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Ok(()).into()
+ }
+ }
+
+ let buf = BytesMut::with_capacity(64);
+ let mut task = MockTask::new();
+
+ task.enter(|cx| {
+ let mut rd = Rd(true);
+ let mut wr = Wr(buf);
+
+ let copy = rd.copy(&mut wr);
+ pin_mut!(copy);
+
+ let n = assert_ready_ok!(copy.poll(cx));
+
+ assert_eq!(n, 11);
+ assert_eq!(wr.0[..], b"hello world"[..]);
+ });
+}
diff --git a/tokio/tests/io_read.rs b/tokio/tests/io_read.rs
new file mode 100644
index 00000000..8544a6c6
--- /dev/null
+++ b/tokio/tests/io_read.rs
@@ -0,0 +1,41 @@
+#![deny(warnings, rust_2018_idioms)]
+
+use tokio::io::{AsyncRead, AsyncReadExt};
+use tokio_test::assert_ready_ok;
+use tokio_test::task::MockTask;
+
+use pin_utils::pin_mut;
+use std::future::Future;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+#[test]
+fn read() {
+ struct Rd;
+
+ impl AsyncRead for Rd {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ buf[0..11].copy_from_slice(b"hello world");
+ Poll::Ready(Ok(11))
+ }
+ }
+
+ let mut buf = Box::new([0; 11]);
+ let mut task = MockTask::new();
+
+ task.enter(|cx| {
+ let mut rd = Rd;
+
+ let read = rd.read(&mut buf[..]);
+ pin_mut!(read);
+
+ let n = assert_ready_ok!(read.poll(cx));
+ assert_eq!(n, 11);
+ assert_eq!(buf[..], b"hello world"[..]);
+ });
+}
diff --git a/tokio/tests/io_read_exact.rs b/tokio/tests/io_read_exact.rs
new file mode 100644
index 00000000..94e35514
--- /dev/null
+++ b/tokio/tests/io_read_exact.rs
@@ -0,0 +1,46 @@
+#![deny(warnings, rust_2018_idioms)]
+
+use tokio::io::{AsyncRead, AsyncReadExt};
+use tokio_test::assert_ready_ok;
+use tokio_test::task::MockTask;
+
+use pin_utils::pin_mut;
+use std::future::Future;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+#[test]
+fn read_exact() {
+ struct Rd {
+ val: &'static [u8; 11],
+ }
+
+ impl AsyncRead for Rd {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &mut [u8]
+ ) -> Poll<io::Result<usize>> {
+ let me = &mut *self;
+ let len = buf.len();
+
+ buf[..].copy_from_slice(&me.val[..len]);
+ Poll::Ready(Ok(buf.len()))
+ }
+ }
+
+ let mut buf = Box::new([0; 8]);
+ let mut task = MockTask::new();
+
+ task.enter(|cx| {
+ let mut rd = Rd { val: b"hello world" };
+
+ let read = rd.read_exact(&mut buf[..]);
+ pin_mut!(read);
+
+ let n = assert_ready_ok!(read.poll(cx));
+ assert_eq!(n, 8);
+ assert_eq!(buf[..], b"hello wo"[..]);
+ });
+}
diff --git a/tokio/tests/io_write.rs b/tokio/tests/io_write.rs
new file mode 100644
index 00000000..7d80ca55
--- /dev/null
+++ b/tokio/tests/io_write.rs
@@ -0,0 +1,48 @@
+#![deny(warnings, rust_2018_idioms)]
+
+use tokio::io::{AsyncWrite, AsyncWriteExt};
+use tokio_test::assert_ready_ok;
+use tokio_test::task::MockTask;
+
+use bytes::BytesMut;
+use pin_utils::pin_mut;
+use std::future::Future;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+#[test]
+fn write() {
+ struct Wr(BytesMut);
+
+ impl AsyncWrite for Wr {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.0.extend(buf);
+ Ok(buf.len()).into()
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Ok(()).into()
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Ok(()).into()
+ }
+ }
+
+ let mut task = MockTask::new();
+
+ task.enter(|cx| {
+ let mut wr = Wr(BytesMut::with_capacity(64));
+
+ let write = wr.write(b"hello world");
+ pin_mut!(write);
+
+ let n = assert_ready_ok!(write.poll(cx));
+ assert_eq!(n, 11);
+ });
+}