diff options
Diffstat (limited to 'src/io')
-rw-r--r-- | src/io/copy.rs | 82 | ||||
-rw-r--r-- | src/io/flush.rs | 39 | ||||
-rw-r--r-- | src/io/mod.rs | 47 | ||||
-rw-r--r-- | src/io/read_exact.rs | 77 | ||||
-rw-r--r-- | src/io/read_to_end.rs | 62 | ||||
-rw-r--r-- | src/io/task.rs | 102 | ||||
-rw-r--r-- | src/io/window.rs | 116 | ||||
-rw-r--r-- | src/io/write_all.rs | 80 |
8 files changed, 605 insertions, 0 deletions
diff --git a/src/io/copy.rs b/src/io/copy.rs new file mode 100644 index 00000000..042e819b --- /dev/null +++ b/src/io/copy.rs @@ -0,0 +1,82 @@ +use std::io::{self, Read, Write}; + +use futures::{Future, Poll}; + +/// A future which will copy all data from a reader into a writer. +/// +/// Created by the `copy` function, this future will resolve to the number of +/// bytes copied or an error if one happens. +pub struct Copy<R, W> { + reader: R, + read_done: bool, + writer: W, + pos: usize, + cap: usize, + amt: u64, + buf: Box<[u8]>, +} + +/// Creates a future which represents copying all the bytes from one object to +/// another. +/// +/// The returned future will copy all the bytes read from `reader` into the +/// `writer` specified. This future will only complete once the `reader` has hit +/// EOF and all bytes have been written to and flushed from the `writer` +/// provided. +/// +/// On success the number of bytes is returned and the `reader` and `writer` are +/// consumed. On error the error is returned and the I/O objects are consumed as +/// well. +pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W> + where R: Read, + W: Write, +{ + Copy { + reader: reader, + read_done: false, + writer: writer, + amt: 0, + pos: 0, + cap: 0, + buf: Box::new([0; 2048]), + } +} + +impl<R, W> Future for Copy<R, W> + where R: Read, + W: Write, +{ + type Item = u64; + type Error = io::Error; + + fn poll(&mut self) -> Poll<u64, io::Error> { + loop { + // If our buffer is empty, then we need to read some data to + // continue. + if self.pos == self.cap && !self.read_done { + let n = try_nb!(self.reader.read(&mut self.buf)); + if n == 0 { + self.read_done = true; + } else { + self.pos = 0; + self.cap = n; + } + } + + // If our buffer has some data, let's write it out! + while self.pos < self.cap { + let i = try_nb!(self.writer.write(&self.buf[self.pos..self.cap])); + self.pos += i; + self.amt += i as u64; + } + + // If we've written al the data and we've seen EOF, flush out the + // data and finish the transfer. + // done with the entire transfer. + if self.pos == self.cap && self.read_done { + try_nb!(self.writer.flush()); + return Poll::Ok(self.amt) + } + } + } +} diff --git a/src/io/flush.rs b/src/io/flush.rs new file mode 100644 index 00000000..159a178b --- /dev/null +++ b/src/io/flush.rs @@ -0,0 +1,39 @@ +use std::io::{self, Write}; + +use futures::{Poll, Future}; + +/// A future used to fully flush an I/O object. +/// +/// Resolves to the underlying I/O object once the flush operation is complete. +/// +/// Created by the `flush` function. +pub struct Flush<A> { + a: Option<A>, +} + +/// Creates a future which will entirely flush an I/O object and then yield the +/// object itself. +/// +/// This function will consume the object provided if an error happens, and +/// otherwise it will repeatedly call `flush` until it sees `Ok(())`, scheduling +/// a retry if `WouldBlock` is seen along the way. +pub fn flush<A>(a: A) -> Flush<A> + where A: Write, +{ + Flush { + a: Some(a), + } +} + +impl<A> Future for Flush<A> + where A: Write, +{ + type Item = A; + type Error = io::Error; + + fn poll(&mut self) -> Poll<A, io::Error> { + try_nb!(self.a.as_mut().unwrap().flush()); + Poll::Ok(self.a.take().unwrap()) + } +} + diff --git a/src/io/mod.rs b/src/io/mod.rs new file mode 100644 index 00000000..41a73978 --- /dev/null +++ b/src/io/mod.rs @@ -0,0 +1,47 @@ +//! I/O conveniences when working with primitives in `tokio-core` +//! +//! Contains various combinators to work with I/O objects and type definitions +//! as well. + +use std::io; + +use futures::BoxFuture; +use futures::stream::BoxStream; + +/// A convenience typedef around a `Future` whose error component is `io::Error` +pub type IoFuture<T> = BoxFuture<T, io::Error>; + +/// A convenience typedef around a `Stream` whose error component is `io::Error` +pub type IoStream<T> = BoxStream<T, io::Error>; + +/// A convenience macro for working with `io::Result<T>` from the `Read` and +/// `Write` traits. +/// +/// This macro takes `io::Result<T>` as input, and returns `T` as the output. If +/// the input type is of the `Err` variant, then `Poll::NotReady` is returned if +/// it indicates `WouldBlock` or otherwise `Err` is returned. +#[macro_export] +macro_rules! try_nb { + ($e:expr) => (match $e { + Ok(t) => t, + Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => { + return ::futures::Poll::NotReady + } + Err(e) => return ::futures::Poll::Err(e.into()), + }) +} + +mod copy; +mod flush; +mod read_exact; +mod read_to_end; +mod task; +mod window; +mod write_all; +pub use self::copy::{copy, Copy}; +pub use self::flush::{flush, Flush}; +pub use self::read_exact::{read_exact, ReadExact}; +pub use self::read_to_end::{read_to_end, ReadToEnd}; +pub use self::task::{TaskIo, TaskIoRead, TaskIoWrite}; +pub use self::window::Window; +pub use self::write_all::{write_all, WriteAll}; diff --git a/src/io/read_exact.rs b/src/io/read_exact.rs new file mode 100644 index 00000000..b251bfea --- /dev/null +++ b/src/io/read_exact.rs @@ -0,0 +1,77 @@ +use std::io::{self, Read}; +use std::mem; + +use futures::{Poll, Future}; + +/// A future which can be used to easily read the entire contents of a stream +/// into a vector. +/// +/// Created by the `read_exact` function. +pub struct ReadExact<A, T> { + state: State<A, T>, +} + +enum State<A, T> { + Reading { + a: A, + buf: T, + pos: usize, + }, + Empty, +} + +/// Creates a future which will read exactly enough bytes to fill `buf`, +/// returning an error if EOF is hit sooner. +/// +/// The returned future will resolve to both the I/O stream as well as the +/// buffer once the read operation is completed. +/// +/// In the case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success the object will be destroyed and +/// the buffer will be returned, with all data read from the stream appended to +/// the buffer. +pub fn read_exact<A, T>(a: A, buf: T) -> ReadExact<A, T> + where A: Read, + T: AsMut<[u8]>, +{ + ReadExact { + state: State::Reading { + a: a, + buf: buf, + pos: 0, + }, + } +} + +fn eof() -> io::Error { + io::Error::new(io::ErrorKind::UnexpectedEof, "early eof") +} + +impl<A, T> Future for ReadExact<A, T> + where A: Read, + T: AsMut<[u8]>, +{ + type Item = (A, T); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, T), io::Error> { + match self.state { + State::Reading { ref mut a, ref mut buf, ref mut pos } => { + let buf = buf.as_mut(); + while *pos < buf.len() { + let n = try_nb!(a.read(&mut buf[*pos..])); + *pos += n; + if n == 0 { + return Poll::Err(eof()) + } + } + } + State::Empty => panic!("poll a WriteAll after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, buf, .. } => Poll::Ok((a, buf)), + State::Empty => panic!(), + } + } +} diff --git a/src/io/read_to_end.rs b/src/io/read_to_end.rs new file mode 100644 index 00000000..80e7cd6c --- /dev/null +++ b/src/io/read_to_end.rs @@ -0,0 +1,62 @@ +use std::io::{self, Read}; +use std::mem; + +use futures::{Poll, Future}; + +/// A future which can be used to easily read the entire contents of a stream +/// into a vector. +/// +/// Created by the `read_to_end` function. +pub struct ReadToEnd<A> { + state: State<A>, +} + +enum State<A> { + Reading { + a: A, + buf: Vec<u8>, + }, + Empty, +} + +/// Creates a future which will read all the bytes associated with the I/O +/// object `A` into the buffer provided. +/// +/// In the case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success the object will be destroyed and +/// the buffer will be returned, with all data read from the stream appended to +/// the buffer. +pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A> + where A: Read, +{ + ReadToEnd { + state: State::Reading { + a: a, + buf: buf, + } + } +} + +impl<A> Future for ReadToEnd<A> + where A: Read, +{ + type Item = (A, Vec<u8>); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> { + match self.state { + State::Reading { ref mut a, ref mut buf } => { + // If we get `Ok`, then we know the stream hit EOF and we're done. If we + // hit "would block" then all the read data so far is in our buffer, and + // otherwise we propagate errors + try_nb!(a.read_to_end(buf)); + }, + State::Empty => panic!("poll ReadToEnd after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, buf } => Poll::Ok((a, buf)), + State::Empty => unreachable!(), + } + } +} diff --git a/src/io/task.rs b/src/io/task.rs new file mode 100644 index 00000000..3c87a32e --- /dev/null +++ b/src/io/task.rs @@ -0,0 +1,102 @@ +use std::cell::RefCell; +use std::io::{self, Read, Write}; + +use futures::task::TaskData; + +/// Abstraction that allows inserting an I/O object into task-local storage, +/// returning a handle that can be split. +/// +/// A `TaskIo<T>` handle implements the `ReadTask` and `WriteTask` and will only +/// work with the same task that the associated object was inserted into. The +/// handle may then be optionally `split` into the read/write halves so they can +/// be worked with independently. +/// +/// Note that it is important that the future returned from `TaskIo::new`, when +/// polled, will pin the yielded `TaskIo<T>` object to that specific task. Any +/// attempt to read or write the object on other tasks will result in a panic. +pub struct TaskIo<T> { + handle: TaskData<RefCell<T>>, +} + +/// The readable half of a `TaskIo<T>` instance returned from `TaskIo::split`. +/// +/// This handle implements the `ReadTask` trait and can be used to split up an +/// I/O object into two distinct halves. +pub struct TaskIoRead<T> { + handle: TaskData<RefCell<T>>, +} + +/// The writable half of a `TaskIo<T>` instance returned from `TaskIo::split`. +/// +/// This handle implements the `WriteTask` trait and can be used to split up an +/// I/O object into two distinct halves. +pub struct TaskIoWrite<T> { + handle: TaskData<RefCell<T>>, +} + +impl<T> TaskIo<T> { + /// Returns a new future which represents the insertion of the I/O object + /// `T` into task local storage, returning a `TaskIo<T>` handle to it. + /// + /// The returned future will never resolve to an error. + pub fn new(t: T) -> TaskIo<T> { + TaskIo { + handle: TaskData::new(RefCell::new(t)), + } + } +} + +impl<T> TaskIo<T> + where T: Read + Write, +{ + /// For an I/O object which is both readable and writable, this method can + /// be used to split the handle into two independently owned halves. + /// + /// The returned pair implements the `ReadTask` and `WriteTask` traits, + /// respectively, and can be used to pass around the object to different + /// combinators if necessary. + pub fn split(self) -> (TaskIoRead<T>, TaskIoWrite<T>) { + (TaskIoRead { handle: self.handle.clone() }, + TaskIoWrite { handle: self.handle }) + } +} + +impl<T> Read for TaskIo<T> + where T: io::Read, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.handle.with(|t| t.borrow_mut().read(buf)) + } +} + +impl<T> Write for TaskIo<T> + where T: io::Write, +{ + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.handle.with(|t| t.borrow_mut().write(buf)) + } + + fn flush(&mut self) -> io::Result<()> { + self.handle.with(|t| t.borrow_mut().flush()) + } +} + +impl<T> Read for TaskIoRead<T> + where T: io::Read, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.handle.with(|t| t.borrow_mut().read(buf)) + } +} + +impl<T> Write for TaskIoWrite<T> + where T: io::Write, +{ + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.handle.with(|t| t.borrow_mut().write(buf)) + } + + fn flush(&mut self) -> io::Result<()> { + self.handle.with(|t| t.borrow_mut().flush()) + } +} diff --git a/src/io/window.rs b/src/io/window.rs new file mode 100644 index 00000000..c6136099 --- /dev/null +++ b/src/io/window.rs @@ -0,0 +1,116 @@ +use std::ops; + +/// A owned window around an underlying buffer. +/// +/// Normally slices work great for considering sub-portions of a buffer, but +/// unfortunately a slice is a *borrowed* type in Rust which has an associated +/// lifetime. When working with future and async I/O these lifetimes are not +/// always appropriate, and are sometimes difficult to store in tasks. This +/// type strives to fill this gap by providing an "owned slice" around an +/// underlying buffer of bytes. +/// +/// A `Window<T>` wraps an underlying buffer, `T`, and has configurable +/// start/end indexes to alter the behavior of the `AsRef<[u8]>` implementation +/// that this type carries. +/// +/// This type can be particularly useful when working with the `write_all` +/// combinator in this crate. Data can be sliced via `Window`, consumed by +/// `write_all`, and then earned back once the write operation finishes through +/// the `into_inner` method on this type. +pub struct Window<T> { + inner: T, + range: ops::Range<usize>, +} + +impl<T: AsRef<[u8]>> Window<T> { + /// Creates a new window around the buffer `t` defaulting to the entire + /// slice. + /// + /// Further methods can be called on the returned `Window<T>` to alter the + /// window into the data provided. + pub fn new(t: T) -> Window<T> { + Window { + range: 0..t.as_ref().len(), + inner: t, + } + } + + /// Gets a shared reference to the underlying buffer inside of this + /// `Window`. + pub fn get_ref(&self) -> &T { + &self.inner + } + + /// Gets a mutable reference to the underlying buffer inside of this + /// `Window`. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Consumes this `Window`, returning the underlying buffer. + pub fn into_inner(self) -> T { + self.inner + } + + /// Returns the starting index of this window into the underlying buffer + /// `T`. + pub fn start(&self) -> usize { + self.range.start + } + + /// Returns the end index of this window into the underlying buffer + /// `T`. + pub fn end(&self) -> usize { + self.range.end + } + + /// Changes the starting index of this window to the index specified. + /// + /// Returns the windows back to chain multiple calls to this method. + /// + /// # Panics + /// + /// This method will panic if `start` is out of bounds for the underlying + /// slice or if it comes after the `end` configured in this window. + pub fn set_start(&mut self, start: usize) -> &mut Window<T> { + assert!(start < self.inner.as_ref().len()); + assert!(start <= self.range.end); + self.range.start = start; + self + } + + /// Changes the end index of this window to the index specified. + /// + /// Returns the windows back to chain multiple calls to this method. + /// + /// # Panics + /// + /// This method will panic if `end` is out of bounds for the underlying + /// slice or if it comes after the `end` configured in this window. + pub fn set_end(&mut self, end: usize) -> &mut Window<T> { + assert!(end < self.inner.as_ref().len()); + assert!(self.range.start <= end); + self.range.end = end; + self + } + + // TODO: how about a generic set() method along the lines of: + // + // buffer.set(..3) + // .set(0..2) + // .set(4..) + // + // etc. +} + +impl<T: AsRef<[u8]>> AsRef<[u8]> for Window<T> { + fn as_ref(&self) -> &[u8] { + &self.inner.as_ref()[self.range.start..self.range.end] + } +} + +impl<T: AsMut<[u8]>> AsMut<[u8]> for Window<T> { + fn as_mut(&mut self) -> &mut [u8] { + &mut self.inner.as_mut()[self.range.start..self.range.end] + } +} diff --git a/src/io/write_all.rs b/src/io/write_all.rs new file mode 100644 index 00000000..df4751df --- /dev/null +++ b/src/io/write_all.rs @@ -0,0 +1,80 @@ +use std::io::{self, Write}; +use std::mem; + +use futures::{Poll, Future}; + +/// A future used to write the entire contents of some data to a stream. +/// +/// This is created by the `write_all` top-level method. +pub struct WriteAll<A, T> { + state: State<A, T>, +} + +enum State<A, T> { + Writing { + a: A, + buf: T, + pos: usize, + }, + Empty, +} + +/// Creates a future that will write the entire contents of the buffer `buf` to +/// the stream `a` provided. +/// +/// The returned future will not return until all the data has been written, and +/// the future will resolve to the stream as well as the buffer (for reuse if +/// needed). +/// +/// Any error which happens during writing will cause both the stream and the +/// buffer to get destroyed. +/// +/// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should +/// be broadly applicable to accepting data which can be converted to a slice. +/// The `Window` struct is also available in this crate to provide a different +/// window into a slice if necessary. +pub fn write_all<A, T>(a: A, buf: T) -> WriteAll<A, T> + where A: Write, + T: AsRef<[u8]>, +{ + WriteAll { + state: State::Writing { + a: a, + buf: buf, + pos: 0, + }, + } +} + +fn zero_write() -> io::Error { + io::Error::new(io::ErrorKind::WriteZero, "zero-length write") +} + +impl<A, T> Future for WriteAll<A, T> + where A: Write, + T: AsRef<[u8]>, +{ + type Item = (A, T); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, T), io::Error> { + match self.state { + State::Writing { ref mut a, ref buf, ref mut pos } => { + let buf = buf.as_ref(); + while *pos < buf.len() { + let n = try_nb!(a.write(&buf[*pos..])); + *pos += n; + if n == 0 { + return Poll::Err(zero_write()) + } + } + } + State::Empty => panic!("poll a WriteAll after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Writing { a, buf, .. } => Poll::Ok((a, buf)), + State::Empty => panic!(), + } + } +} |