summaryrefslogtreecommitdiffstats
path: root/src/io
diff options
context:
space:
mode:
Diffstat (limited to 'src/io')
-rw-r--r--src/io/copy.rs82
-rw-r--r--src/io/flush.rs39
-rw-r--r--src/io/mod.rs47
-rw-r--r--src/io/read_exact.rs77
-rw-r--r--src/io/read_to_end.rs62
-rw-r--r--src/io/task.rs102
-rw-r--r--src/io/window.rs116
-rw-r--r--src/io/write_all.rs80
8 files changed, 605 insertions, 0 deletions
diff --git a/src/io/copy.rs b/src/io/copy.rs
new file mode 100644
index 00000000..042e819b
--- /dev/null
+++ b/src/io/copy.rs
@@ -0,0 +1,82 @@
+use std::io::{self, Read, Write};
+
+use futures::{Future, Poll};
+
+/// A future which will copy all data from a reader into a writer.
+///
+/// Created by the `copy` function, this future will resolve to the number of
+/// bytes copied or an error if one happens.
+pub struct Copy<R, W> {
+ reader: R,
+ read_done: bool,
+ writer: W,
+ pos: usize,
+ cap: usize,
+ amt: u64,
+ buf: Box<[u8]>,
+}
+
+/// Creates a future which represents copying all the bytes from one object to
+/// another.
+///
+/// The returned future will copy all the bytes read from `reader` into the
+/// `writer` specified. This future will only complete once the `reader` has hit
+/// EOF and all bytes have been written to and flushed from the `writer`
+/// provided.
+///
+/// On success the number of bytes is returned and the `reader` and `writer` are
+/// consumed. On error the error is returned and the I/O objects are consumed as
+/// well.
+pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W>
+ where R: Read,
+ W: Write,
+{
+ Copy {
+ reader: reader,
+ read_done: false,
+ writer: writer,
+ amt: 0,
+ pos: 0,
+ cap: 0,
+ buf: Box::new([0; 2048]),
+ }
+}
+
+impl<R, W> Future for Copy<R, W>
+ where R: Read,
+ W: Write,
+{
+ type Item = u64;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<u64, io::Error> {
+ loop {
+ // If our buffer is empty, then we need to read some data to
+ // continue.
+ if self.pos == self.cap && !self.read_done {
+ let n = try_nb!(self.reader.read(&mut self.buf));
+ if n == 0 {
+ self.read_done = true;
+ } else {
+ self.pos = 0;
+ self.cap = n;
+ }
+ }
+
+ // If our buffer has some data, let's write it out!
+ while self.pos < self.cap {
+ let i = try_nb!(self.writer.write(&self.buf[self.pos..self.cap]));
+ self.pos += i;
+ self.amt += i as u64;
+ }
+
+ // If we've written al the data and we've seen EOF, flush out the
+ // data and finish the transfer.
+ // done with the entire transfer.
+ if self.pos == self.cap && self.read_done {
+ try_nb!(self.writer.flush());
+ return Poll::Ok(self.amt)
+ }
+ }
+ }
+}
diff --git a/src/io/flush.rs b/src/io/flush.rs
new file mode 100644
index 00000000..159a178b
--- /dev/null
+++ b/src/io/flush.rs
@@ -0,0 +1,39 @@
+use std::io::{self, Write};
+
+use futures::{Poll, Future};
+
+/// A future used to fully flush an I/O object.
+///
+/// Resolves to the underlying I/O object once the flush operation is complete.
+///
+/// Created by the `flush` function.
+pub struct Flush<A> {
+ a: Option<A>,
+}
+
+/// Creates a future which will entirely flush an I/O object and then yield the
+/// object itself.
+///
+/// This function will consume the object provided if an error happens, and
+/// otherwise it will repeatedly call `flush` until it sees `Ok(())`, scheduling
+/// a retry if `WouldBlock` is seen along the way.
+pub fn flush<A>(a: A) -> Flush<A>
+ where A: Write,
+{
+ Flush {
+ a: Some(a),
+ }
+}
+
+impl<A> Future for Flush<A>
+ where A: Write,
+{
+ type Item = A;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<A, io::Error> {
+ try_nb!(self.a.as_mut().unwrap().flush());
+ Poll::Ok(self.a.take().unwrap())
+ }
+}
+
diff --git a/src/io/mod.rs b/src/io/mod.rs
new file mode 100644
index 00000000..41a73978
--- /dev/null
+++ b/src/io/mod.rs
@@ -0,0 +1,47 @@
+//! I/O conveniences when working with primitives in `tokio-core`
+//!
+//! Contains various combinators to work with I/O objects and type definitions
+//! as well.
+
+use std::io;
+
+use futures::BoxFuture;
+use futures::stream::BoxStream;
+
+/// A convenience typedef around a `Future` whose error component is `io::Error`
+pub type IoFuture<T> = BoxFuture<T, io::Error>;
+
+/// A convenience typedef around a `Stream` whose error component is `io::Error`
+pub type IoStream<T> = BoxStream<T, io::Error>;
+
+/// A convenience macro for working with `io::Result<T>` from the `Read` and
+/// `Write` traits.
+///
+/// This macro takes `io::Result<T>` as input, and returns `T` as the output. If
+/// the input type is of the `Err` variant, then `Poll::NotReady` is returned if
+/// it indicates `WouldBlock` or otherwise `Err` is returned.
+#[macro_export]
+macro_rules! try_nb {
+ ($e:expr) => (match $e {
+ Ok(t) => t,
+ Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => {
+ return ::futures::Poll::NotReady
+ }
+ Err(e) => return ::futures::Poll::Err(e.into()),
+ })
+}
+
+mod copy;
+mod flush;
+mod read_exact;
+mod read_to_end;
+mod task;
+mod window;
+mod write_all;
+pub use self::copy::{copy, Copy};
+pub use self::flush::{flush, Flush};
+pub use self::read_exact::{read_exact, ReadExact};
+pub use self::read_to_end::{read_to_end, ReadToEnd};
+pub use self::task::{TaskIo, TaskIoRead, TaskIoWrite};
+pub use self::window::Window;
+pub use self::write_all::{write_all, WriteAll};
diff --git a/src/io/read_exact.rs b/src/io/read_exact.rs
new file mode 100644
index 00000000..b251bfea
--- /dev/null
+++ b/src/io/read_exact.rs
@@ -0,0 +1,77 @@
+use std::io::{self, Read};
+use std::mem;
+
+use futures::{Poll, Future};
+
+/// A future which can be used to easily read the entire contents of a stream
+/// into a vector.
+///
+/// Created by the `read_exact` function.
+pub struct ReadExact<A, T> {
+ state: State<A, T>,
+}
+
+enum State<A, T> {
+ Reading {
+ a: A,
+ buf: T,
+ pos: usize,
+ },
+ Empty,
+}
+
+/// Creates a future which will read exactly enough bytes to fill `buf`,
+/// returning an error if EOF is hit sooner.
+///
+/// The returned future will resolve to both the I/O stream as well as the
+/// buffer once the read operation is completed.
+///
+/// In the case of an error the buffer and the object will be discarded, with
+/// the error yielded. In the case of success the object will be destroyed and
+/// the buffer will be returned, with all data read from the stream appended to
+/// the buffer.
+pub fn read_exact<A, T>(a: A, buf: T) -> ReadExact<A, T>
+ where A: Read,
+ T: AsMut<[u8]>,
+{
+ ReadExact {
+ state: State::Reading {
+ a: a,
+ buf: buf,
+ pos: 0,
+ },
+ }
+}
+
+fn eof() -> io::Error {
+ io::Error::new(io::ErrorKind::UnexpectedEof, "early eof")
+}
+
+impl<A, T> Future for ReadExact<A, T>
+ where A: Read,
+ T: AsMut<[u8]>,
+{
+ type Item = (A, T);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, T), io::Error> {
+ match self.state {
+ State::Reading { ref mut a, ref mut buf, ref mut pos } => {
+ let buf = buf.as_mut();
+ while *pos < buf.len() {
+ let n = try_nb!(a.read(&mut buf[*pos..]));
+ *pos += n;
+ if n == 0 {
+ return Poll::Err(eof())
+ }
+ }
+ }
+ State::Empty => panic!("poll a WriteAll after it's done"),
+ }
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Reading { a, buf, .. } => Poll::Ok((a, buf)),
+ State::Empty => panic!(),
+ }
+ }
+}
diff --git a/src/io/read_to_end.rs b/src/io/read_to_end.rs
new file mode 100644
index 00000000..80e7cd6c
--- /dev/null
+++ b/src/io/read_to_end.rs
@@ -0,0 +1,62 @@
+use std::io::{self, Read};
+use std::mem;
+
+use futures::{Poll, Future};
+
+/// A future which can be used to easily read the entire contents of a stream
+/// into a vector.
+///
+/// Created by the `read_to_end` function.
+pub struct ReadToEnd<A> {
+ state: State<A>,
+}
+
+enum State<A> {
+ Reading {
+ a: A,
+ buf: Vec<u8>,
+ },
+ Empty,
+}
+
+/// Creates a future which will read all the bytes associated with the I/O
+/// object `A` into the buffer provided.
+///
+/// In the case of an error the buffer and the object will be discarded, with
+/// the error yielded. In the case of success the object will be destroyed and
+/// the buffer will be returned, with all data read from the stream appended to
+/// the buffer.
+pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A>
+ where A: Read,
+{
+ ReadToEnd {
+ state: State::Reading {
+ a: a,
+ buf: buf,
+ }
+ }
+}
+
+impl<A> Future for ReadToEnd<A>
+ where A: Read,
+{
+ type Item = (A, Vec<u8>);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> {
+ match self.state {
+ State::Reading { ref mut a, ref mut buf } => {
+ // If we get `Ok`, then we know the stream hit EOF and we're done. If we
+ // hit "would block" then all the read data so far is in our buffer, and
+ // otherwise we propagate errors
+ try_nb!(a.read_to_end(buf));
+ },
+ State::Empty => panic!("poll ReadToEnd after it's done"),
+ }
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Reading { a, buf } => Poll::Ok((a, buf)),
+ State::Empty => unreachable!(),
+ }
+ }
+}
diff --git a/src/io/task.rs b/src/io/task.rs
new file mode 100644
index 00000000..3c87a32e
--- /dev/null
+++ b/src/io/task.rs
@@ -0,0 +1,102 @@
+use std::cell::RefCell;
+use std::io::{self, Read, Write};
+
+use futures::task::TaskData;
+
+/// Abstraction that allows inserting an I/O object into task-local storage,
+/// returning a handle that can be split.
+///
+/// A `TaskIo<T>` handle implements the `ReadTask` and `WriteTask` and will only
+/// work with the same task that the associated object was inserted into. The
+/// handle may then be optionally `split` into the read/write halves so they can
+/// be worked with independently.
+///
+/// Note that it is important that the future returned from `TaskIo::new`, when
+/// polled, will pin the yielded `TaskIo<T>` object to that specific task. Any
+/// attempt to read or write the object on other tasks will result in a panic.
+pub struct TaskIo<T> {
+ handle: TaskData<RefCell<T>>,
+}
+
+/// The readable half of a `TaskIo<T>` instance returned from `TaskIo::split`.
+///
+/// This handle implements the `ReadTask` trait and can be used to split up an
+/// I/O object into two distinct halves.
+pub struct TaskIoRead<T> {
+ handle: TaskData<RefCell<T>>,
+}
+
+/// The writable half of a `TaskIo<T>` instance returned from `TaskIo::split`.
+///
+/// This handle implements the `WriteTask` trait and can be used to split up an
+/// I/O object into two distinct halves.
+pub struct TaskIoWrite<T> {
+ handle: TaskData<RefCell<T>>,
+}
+
+impl<T> TaskIo<T> {
+ /// Returns a new future which represents the insertion of the I/O object
+ /// `T` into task local storage, returning a `TaskIo<T>` handle to it.
+ ///
+ /// The returned future will never resolve to an error.
+ pub fn new(t: T) -> TaskIo<T> {
+ TaskIo {
+ handle: TaskData::new(RefCell::new(t)),
+ }
+ }
+}
+
+impl<T> TaskIo<T>
+ where T: Read + Write,
+{
+ /// For an I/O object which is both readable and writable, this method can
+ /// be used to split the handle into two independently owned halves.
+ ///
+ /// The returned pair implements the `ReadTask` and `WriteTask` traits,
+ /// respectively, and can be used to pass around the object to different
+ /// combinators if necessary.
+ pub fn split(self) -> (TaskIoRead<T>, TaskIoWrite<T>) {
+ (TaskIoRead { handle: self.handle.clone() },
+ TaskIoWrite { handle: self.handle })
+ }
+}
+
+impl<T> Read for TaskIo<T>
+ where T: io::Read,
+{
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.handle.with(|t| t.borrow_mut().read(buf))
+ }
+}
+
+impl<T> Write for TaskIo<T>
+ where T: io::Write,
+{
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.handle.with(|t| t.borrow_mut().write(buf))
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.handle.with(|t| t.borrow_mut().flush())
+ }
+}
+
+impl<T> Read for TaskIoRead<T>
+ where T: io::Read,
+{
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.handle.with(|t| t.borrow_mut().read(buf))
+ }
+}
+
+impl<T> Write for TaskIoWrite<T>
+ where T: io::Write,
+{
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.handle.with(|t| t.borrow_mut().write(buf))
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.handle.with(|t| t.borrow_mut().flush())
+ }
+}
diff --git a/src/io/window.rs b/src/io/window.rs
new file mode 100644
index 00000000..c6136099
--- /dev/null
+++ b/src/io/window.rs
@@ -0,0 +1,116 @@
+use std::ops;
+
+/// A owned window around an underlying buffer.
+///
+/// Normally slices work great for considering sub-portions of a buffer, but
+/// unfortunately a slice is a *borrowed* type in Rust which has an associated
+/// lifetime. When working with future and async I/O these lifetimes are not
+/// always appropriate, and are sometimes difficult to store in tasks. This
+/// type strives to fill this gap by providing an "owned slice" around an
+/// underlying buffer of bytes.
+///
+/// A `Window<T>` wraps an underlying buffer, `T`, and has configurable
+/// start/end indexes to alter the behavior of the `AsRef<[u8]>` implementation
+/// that this type carries.
+///
+/// This type can be particularly useful when working with the `write_all`
+/// combinator in this crate. Data can be sliced via `Window`, consumed by
+/// `write_all`, and then earned back once the write operation finishes through
+/// the `into_inner` method on this type.
+pub struct Window<T> {
+ inner: T,
+ range: ops::Range<usize>,
+}
+
+impl<T: AsRef<[u8]>> Window<T> {
+ /// Creates a new window around the buffer `t` defaulting to the entire
+ /// slice.
+ ///
+ /// Further methods can be called on the returned `Window<T>` to alter the
+ /// window into the data provided.
+ pub fn new(t: T) -> Window<T> {
+ Window {
+ range: 0..t.as_ref().len(),
+ inner: t,
+ }
+ }
+
+ /// Gets a shared reference to the underlying buffer inside of this
+ /// `Window`.
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ /// Gets a mutable reference to the underlying buffer inside of this
+ /// `Window`.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+
+ /// Consumes this `Window`, returning the underlying buffer.
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+
+ /// Returns the starting index of this window into the underlying buffer
+ /// `T`.
+ pub fn start(&self) -> usize {
+ self.range.start
+ }
+
+ /// Returns the end index of this window into the underlying buffer
+ /// `T`.
+ pub fn end(&self) -> usize {
+ self.range.end
+ }
+
+ /// Changes the starting index of this window to the index specified.
+ ///
+ /// Returns the windows back to chain multiple calls to this method.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `start` is out of bounds for the underlying
+ /// slice or if it comes after the `end` configured in this window.
+ pub fn set_start(&mut self, start: usize) -> &mut Window<T> {
+ assert!(start < self.inner.as_ref().len());
+ assert!(start <= self.range.end);
+ self.range.start = start;
+ self
+ }
+
+ /// Changes the end index of this window to the index specified.
+ ///
+ /// Returns the windows back to chain multiple calls to this method.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `end` is out of bounds for the underlying
+ /// slice or if it comes after the `end` configured in this window.
+ pub fn set_end(&mut self, end: usize) -> &mut Window<T> {
+ assert!(end < self.inner.as_ref().len());
+ assert!(self.range.start <= end);
+ self.range.end = end;
+ self
+ }
+
+ // TODO: how about a generic set() method along the lines of:
+ //
+ // buffer.set(..3)
+ // .set(0..2)
+ // .set(4..)
+ //
+ // etc.
+}
+
+impl<T: AsRef<[u8]>> AsRef<[u8]> for Window<T> {
+ fn as_ref(&self) -> &[u8] {
+ &self.inner.as_ref()[self.range.start..self.range.end]
+ }
+}
+
+impl<T: AsMut<[u8]>> AsMut<[u8]> for Window<T> {
+ fn as_mut(&mut self) -> &mut [u8] {
+ &mut self.inner.as_mut()[self.range.start..self.range.end]
+ }
+}
diff --git a/src/io/write_all.rs b/src/io/write_all.rs
new file mode 100644
index 00000000..df4751df
--- /dev/null
+++ b/src/io/write_all.rs
@@ -0,0 +1,80 @@
+use std::io::{self, Write};
+use std::mem;
+
+use futures::{Poll, Future};
+
+/// A future used to write the entire contents of some data to a stream.
+///
+/// This is created by the `write_all` top-level method.
+pub struct WriteAll<A, T> {
+ state: State<A, T>,
+}
+
+enum State<A, T> {
+ Writing {
+ a: A,
+ buf: T,
+ pos: usize,
+ },
+ Empty,
+}
+
+/// Creates a future that will write the entire contents of the buffer `buf` to
+/// the stream `a` provided.
+///
+/// The returned future will not return until all the data has been written, and
+/// the future will resolve to the stream as well as the buffer (for reuse if
+/// needed).
+///
+/// Any error which happens during writing will cause both the stream and the
+/// buffer to get destroyed.
+///
+/// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should
+/// be broadly applicable to accepting data which can be converted to a slice.
+/// The `Window` struct is also available in this crate to provide a different
+/// window into a slice if necessary.
+pub fn write_all<A, T>(a: A, buf: T) -> WriteAll<A, T>
+ where A: Write,
+ T: AsRef<[u8]>,
+{
+ WriteAll {
+ state: State::Writing {
+ a: a,
+ buf: buf,
+ pos: 0,
+ },
+ }
+}
+
+fn zero_write() -> io::Error {
+ io::Error::new(io::ErrorKind::WriteZero, "zero-length write")
+}
+
+impl<A, T> Future for WriteAll<A, T>
+ where A: Write,
+ T: AsRef<[u8]>,
+{
+ type Item = (A, T);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, T), io::Error> {
+ match self.state {
+ State::Writing { ref mut a, ref buf, ref mut pos } => {
+ let buf = buf.as_ref();
+ while *pos < buf.len() {
+ let n = try_nb!(a.write(&buf[*pos..]));
+ *pos += n;
+ if n == 0 {
+ return Poll::Err(zero_write())
+ }
+ }
+ }
+ State::Empty => panic!("poll a WriteAll after it's done"),
+ }
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Writing { a, buf, .. } => Poll::Ok((a, buf)),
+ State::Empty => panic!(),
+ }
+ }
+}