diff options
author | Alex Crichton <alex@alexcrichton.com> | 2016-08-26 14:30:46 -0700 |
---|---|---|
committer | Alex Crichton <alex@alexcrichton.com> | 2016-08-26 14:39:47 -0700 |
commit | f107c8d860137b41e509b179d605db30082cb0da (patch) | |
tree | 063e7b0c86785f70a63458e17183602e58d9c7f3 | |
parent | e71d509fee767d6b796ba18a5501f80f0fb4babc (diff) |
Rename to tokio-core, add in futures-io
Renames the futures-mio crate to tokio-core, pulls in the futures-io crate under
an `io` module, and gets everything compiling.
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | .travis.yml | 24 | ||||
-rw-r--r-- | Cargo.toml | 15 | ||||
-rw-r--r-- | README.md | 45 | ||||
-rw-r--r-- | src/bin/echo.rs | 7 | ||||
-rw-r--r-- | src/bin/sink.rs | 11 | ||||
-rw-r--r-- | src/channel.rs | 2 | ||||
-rw-r--r-- | src/io/copy.rs | 82 | ||||
-rw-r--r-- | src/io/flush.rs | 39 | ||||
-rw-r--r-- | src/io/mod.rs | 47 | ||||
-rw-r--r-- | src/io/read_exact.rs | 77 | ||||
-rw-r--r-- | src/io/read_to_end.rs | 62 | ||||
-rw-r--r-- | src/io/task.rs | 102 | ||||
-rw-r--r-- | src/io/window.rs | 116 | ||||
-rw-r--r-- | src/io/write_all.rs | 80 | ||||
-rw-r--r-- | src/lib.rs | 5 | ||||
-rw-r--r-- | src/lock.rs | 106 | ||||
-rw-r--r-- | src/slot.rs | 691 | ||||
-rw-r--r-- | src/tcp.rs | 2 | ||||
-rw-r--r-- | src/timeout.rs | 2 | ||||
-rw-r--r-- | src/udp.rs | 2 | ||||
-rw-r--r-- | tests/buffered.rs | 7 | ||||
-rw-r--r-- | tests/chain.rs | 7 | ||||
-rw-r--r-- | tests/echo.rs | 7 | ||||
-rw-r--r-- | tests/limit.rs | 7 | ||||
-rw-r--r-- | tests/stream-buffered.rs | 7 | ||||
-rw-r--r-- | tests/tcp.rs | 8 | ||||
-rw-r--r-- | tests/timeout.rs | 4 | ||||
-rw-r--r-- | tests/udp.rs | 6 |
29 files changed, 1495 insertions, 77 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..a9d37c56 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +target +Cargo.lock diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 00000000..a89a11e3 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,24 @@ +language: rust + +rust: + - stable + - beta + - nightly +sudo: false +before_script: + - pip install 'travis-cargo<0.2' --user && export PATH=$HOME/.local/bin:$PATH +script: + - cargo build + - cargo test + - cargo doc --no-deps +after_success: + - travis-cargo --only nightly doc-upload +env: + global: + - secure: LVrtwDI0IJradnHRk53dGWtTS+limhkuHym17wuto/Zaz6IJB9aq7G5wSYuZU3qabcxah7pigjXPFgzYwFD6mNHW1DAuAko1qOi4AL0rvg+rA7Fa5E9NEIxoqzCf+wBtqCvomBe/akOs7UtHdjE3CZpIEPwSHVf3jf61suB0mPVUW0AFTHvYTvHT4lyHjlruY+Ifi350yb4t0Oy9rU1bHNtX0q1T0mKuTnKkmpCT2Kj+2L7afgsAR3UgBjL3Py89LXmnF5VxSMGJWa6HL3xgEi3CXxBRQFdr+vipIDejWtjY+7DzvSRHid1rVfwCLdLfTwvA3Pf3b0I5DSJnjzRgKkfiH2j7JNFtCvLz+mM5C/4QJzAgNmdyNuDv0qOy07OABtYs/LE60f6ZZ5YMZAloMtA/9qQjJx+c2jO2nTZkx6vNJ5C421yzm2klQSL0d8pFaDmojqC5pT85MYhf3mESqSw1UjwFPa0xFtysT52oJBcyvwI/wBYbK40sArjSDZaU2Jncw9ptDWML/xUM+sWHF7ZW/mI1V15lqaCBX91xlbppfWDMgNF2c60vC90t0entbGpYLvHjQMdW6iucbsLLN5KAPzYPuufX2vJa8V1gxMxZ7CLcVLx9lmm3uEdrOZLEg4Fg7H7Xqc2JRygbNrTtOeBw1/o73znnnjEv8Vl3xqg= +notifications: + email: + on_success: never +os: + - linux + - osx @@ -1,19 +1,18 @@ [package] -name = "futures-mio" +name = "tokio-core" version = "0.1.0" authors = ["Alex Crichton <alex@alexcrichton.com>"] license = "MIT/Apache-2.0" -repository = "https://github.com/alexcrichton/futures-rs" -homepage = "https://github.com/alexcrichton/futures-rs" -documentation = "http://alexcrichton.com/futures-rs/futures_mio/" +repository = "https://github.com/tokio-rs/tokio-core" +homepage = "https://github.com/tokio-rs/tokio-core" +documentation = "https://tokio-rs.github.io/tokio-core" description = """ -Bindings from the `futures` crate to the `mio` crate to get I/O in the form of -futures and streams. +Core I/O and event loop primitives for asynchronous I/O in Rust. Foundation for +the rest of the tokio crates. """ [dependencies] -futures = { path = "..", version = "0.1.0" } -futures-io = { path = "../futures-io", version = "0.1.0" } +futures = { git = "https://github.com/alexcrichton/futures-rs" } log = "0.3" mio = { git = "https://github.com/carllerche/mio" } scoped-tls = "0.1.0" @@ -1,12 +1,12 @@ -# futures-mio +# tokio-core -Bindings to the `mio` crate implementing the `futures-io` and `futures` -abstractions. +Core I/O and event loop abstraction for asynchronous I/O in Rust built on +`futures` and `mio`. -[![Build Status](https://travis-ci.org/alexcrichton/futures-rs.svg?branch=master)](https://travis-ci.org/alexcrichton/futures-rs) +[![Build Status](https://travis-ci.org/tokio-rs/tokio-core.svg?branch=master)](https://travis-ci.org/tokio-rs/tokio-core) [![Build status](https://ci.appveyor.com/api/projects/status/yl5w3ittk4kggfsh?svg=true)](https://ci.appveyor.com/project/alexcrichton/futures-rs) -[Documentation](http://alexcrichton.com/futures-rs/futures_mio) +[Documentation](https://tokio-rs.github.io/tokio-core) ## Usage @@ -14,13 +14,13 @@ First, add this to your `Cargo.toml`: ```toml [dependencies] -futures-mio = { git = "https://github.com/alexcrichton/futures-rs" } +tokio-core = { git = "https://github.com/tokio-rs/tokio-core" } ``` Next, add this to your crate: ```rust -extern crate futures_mio; +extern crate tokio_core; ``` ## Examples @@ -30,18 +30,17 @@ There are a few small examples showing off how to use this library: * [echo.rs] - a simple TCP echo server * [socks5.rs] - an implementation of a SOCKSv5 proxy server -[echo.rs]: https://github.com/alexcrichton/futures-rs/blob/master/futures-mio/src/bin/echo.rs -[socks5.rs]: https://github.com/alexcrichton/futures-rs/blob/master/futures-socks5/src/main.rs +[echo.rs]: https://github.com/tokio-rs/tokio-core/blob/master/src/bin/echo.rs +[socks5.rs]: https://github.com/tokio-rs/tokio-socks5/blob/master/src/main.rs -## What is futures-mio? +## What is tokio-core? This crate is a connection `futures`, a zero-cost implementation of futures in -Rust, and `mio`, a crate for zero-cost asynchronous I/O, and `futures-io`, -abstractions for I/O on top of the `futures` crate. The types and structures -implemented in `futures-mio` implement `Future` and `Stream` traits as -appropriate. For example connecting a TCP stream returns a `Future` resolving -to a TCP stream, and a TCP listener implements a stream of TCP streams -(accepted connections). +Rust, and `mio` and a crate for zero-cost asynchronous I/O. The types and +structures implemented in `tokio-core` implement `Future` and `Stream` traits +as appropriate. For example connecting a TCP stream returns a `Future` +resolving to a TCP stream, and a TCP listener implements a stream of TCP +streams (accepted connections). This crate also provides facilities such as: @@ -52,20 +51,20 @@ This crate also provides facilities such as: * Data owned and local to the event loop * An `Executor` implementation for a futures' `Task` -The intention of `futures-mio` is to provide a concrete implementation for -crates built on top of `futures-io`. For example you can easily turn a TCP -stream into a TLS/SSL stream with the [`futures-tls`] crate or use the -combinators to compose working with data on sockets. +The intention of `tokio-core` is to provide a concrete implementation for crates +built on top of asynchronous I/O. For example you can easily turn a TCP stream +into a TLS/SSL stream with the [`tokio-tls`] crate or use the combinators to +compose working with data on sockets. -[`futures-tls`]: http://alexcrichton.com/futures-rs/futures_tls +[`tokio-tls`]: https://tokio-rs.github.io/tokio-tls Check out the [documentation] for more information, and more coming here soon! -[documentation]: http://alexcrichton.com/futures-rs/futures_mio +[documentation]: https://tokio-rs.github.io/tokio-core # License -`futures-mio` is primarily distributed under the terms of both the MIT license +`tokio-core` is primarily distributed under the terms of both the MIT license and the Apache License (Version 2.0), with portions covered by various BSD-like licenses. diff --git a/src/bin/echo.rs b/src/bin/echo.rs index ccd94752..710db79d 100644 --- a/src/bin/echo.rs +++ b/src/bin/echo.rs @@ -1,22 +1,21 @@ //! An echo server that just writes back everything that's written to it. extern crate futures; -extern crate futures_io; -extern crate futures_mio; +extern crate tokio_core; use std::env; use std::net::SocketAddr; use futures::Future; use futures::stream::Stream; -use futures_io::{copy, TaskIo}; +use tokio_core::io::{copy, TaskIo}; fn main() { let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse::<SocketAddr>().unwrap(); // Create the event loop that will drive this server - let mut l = futures_mio::Loop::new().unwrap(); + let mut l = tokio_core::Loop::new().unwrap(); // Create a TCP listener which will listen for incoming connections let server = l.handle().tcp_listen(&addr); diff --git a/src/bin/sink.rs b/src/bin/sink.rs index 49feed35..baf96b8e 100644 --- a/src/bin/sink.rs +++ b/src/bin/sink.rs @@ -5,8 +5,7 @@ #[macro_use] extern crate futures; -extern crate futures_io; -extern crate futures_mio; +extern crate tokio_core; use std::env; use std::iter; @@ -14,13 +13,13 @@ use std::net::SocketAddr; use futures::Future; use futures::stream::{self, Stream}; -use futures_io::IoFuture; +use tokio_core::io::IoFuture; fn main() { let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse::<SocketAddr>().unwrap(); - let mut l = futures_mio::Loop::new().unwrap(); + let mut l = tokio_core::Loop::new().unwrap(); let server = l.handle().tcp_listen(&addr).and_then(|socket| { socket.incoming().and_then(|(socket, addr)| { println!("got a socket: {}", addr); @@ -34,10 +33,10 @@ fn main() { l.run(server).unwrap(); } -fn write(socket: futures_mio::TcpStream) -> IoFuture<()> { +fn write(socket: tokio_core::TcpStream) -> IoFuture<()> { static BUF: &'static [u8] = &[0; 64 * 1024]; let iter = iter::repeat(()).map(|()| Ok(())); stream::iter(iter).fold(socket, |socket, ()| { - futures_io::write_all(socket, BUF).map(|(socket, _)| socket) + tokio_core::io::write_all(socket, BUF).map(|(socket, _)| socket) }).map(|_| ()).boxed() } diff --git a/src/channel.rs b/src/channel.rs index cd5fef46..3da44b53 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -3,10 +3,10 @@ use std::sync::mpsc::TryRecvError; use futures::{Future, Poll}; use futures::stream::Stream; -use futures_io::IoFuture; use mio::channel; use {ReadinessStream, LoopHandle}; +use io::IoFuture; /// The transmission half of a channel used for sending messages to a receiver. /// diff --git a/src/io/copy.rs b/src/io/copy.rs new file mode 100644 index 00000000..042e819b --- /dev/null +++ b/src/io/copy.rs @@ -0,0 +1,82 @@ +use std::io::{self, Read, Write}; + +use futures::{Future, Poll}; + +/// A future which will copy all data from a reader into a writer. +/// +/// Created by the `copy` function, this future will resolve to the number of +/// bytes copied or an error if one happens. +pub struct Copy<R, W> { + reader: R, + read_done: bool, + writer: W, + pos: usize, + cap: usize, + amt: u64, + buf: Box<[u8]>, +} + +/// Creates a future which represents copying all the bytes from one object to +/// another. +/// +/// The returned future will copy all the bytes read from `reader` into the +/// `writer` specified. This future will only complete once the `reader` has hit +/// EOF and all bytes have been written to and flushed from the `writer` +/// provided. +/// +/// On success the number of bytes is returned and the `reader` and `writer` are +/// consumed. On error the error is returned and the I/O objects are consumed as +/// well. +pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W> + where R: Read, + W: Write, +{ + Copy { + reader: reader, + read_done: false, + writer: writer, + amt: 0, + pos: 0, + cap: 0, + buf: Box::new([0; 2048]), + } +} + +impl<R, W> Future for Copy<R, W> + where R: Read, + W: Write, +{ + type Item = u64; + type Error = io::Error; + + fn poll(&mut self) -> Poll<u64, io::Error> { + loop { + // If our buffer is empty, then we need to read some data to + // continue. + if self.pos == self.cap && !self.read_done { + let n = try_nb!(self.reader.read(&mut self.buf)); + if n == 0 { + self.read_done = true; + } else { + self.pos = 0; + self.cap = n; + } + } + + // If our buffer has some data, let's write it out! + while self.pos < self.cap { + let i = try_nb!(self.writer.write(&self.buf[self.pos..self.cap])); + self.pos += i; + self.amt += i as u64; + } + + // If we've written al the data and we've seen EOF, flush out the + // data and finish the transfer. + // done with the entire transfer. + if self.pos == self.cap && self.read_done { + try_nb!(self.writer.flush()); + return Poll::Ok(self.amt) + } + } + } +} diff --git a/src/io/flush.rs b/src/io/flush.rs new file mode 100644 index 00000000..159a178b --- /dev/null +++ b/src/io/flush.rs @@ -0,0 +1,39 @@ +use std::io::{self, Write}; + +use futures::{Poll, Future}; + +/// A future used to fully flush an I/O object. +/// +/// Resolves to the underlying I/O object once the flush operation is complete. +/// +/// Created by the `flush` function. +pub struct Flush<A> { + a: Option<A>, +} + +/// Creates a future which will entirely flush an I/O object and then yield the +/// object itself. +/// +/// This function will consume the object provided if an error happens, and +/// otherwise it will repeatedly call `flush` until it sees `Ok(())`, scheduling +/// a retry if `WouldBlock` is seen along the way. +pub fn flush<A>(a: A) -> Flush<A> + where A: Write, +{ + Flush { + a: Some(a), + } +} + +impl<A> Future for Flush<A> + where A: Write, +{ + type Item = A; + type Error = io::Error; + + fn poll(&mut self) -> Poll<A, io::Error> { + try_nb!(self.a.as_mut().unwrap().flush()); + Poll::Ok(self.a.take().unwrap()) + } +} + diff --git a/src/io/mod.rs b/src/io/mod.rs new file mode 100644 index 00000000..41a73978 --- /dev/null +++ b/src/io/mod.rs @@ -0,0 +1,47 @@ +//! I/O conveniences when working with primitives in `tokio-core` +//! +//! Contains various combinators to work with I/O objects and type definitions +//! as well. + +use std::io; + +use futures::BoxFuture; +use futures::stream::BoxStream; + +/// A convenience typedef around a `Future` whose error component is `io::Error` +pub type IoFuture<T> = BoxFuture<T, io::Error>; + +/// A convenience typedef around a `Stream` whose error component is `io::Error` +pub type IoStream<T> = BoxStream<T, io::Error>; + +/// A convenience macro for working with `io::Result<T>` from the `Read` and +/// `Write` traits. +/// +/// This macro takes `io::Result<T>` as input, and returns `T` as the output. If +/// the input type is of the `Err` variant, then `Poll::NotReady` is returned if +/// it indicates `WouldBlock` or otherwise `Err` is returned. +#[macro_export] +macro_rules! try_nb { + ($e:expr) => (match $e { + Ok(t) => t, + Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => { + return ::futures::Poll::NotReady + } + Err(e) => return ::futures::Poll::Err(e.into()), + }) +} + +mod copy; +mod flush; +mod read_exact; +mod read_to_end; +mod task; +mod window; +mod write_all; +pub use self::copy::{copy, Copy}; +pub use self::flush::{flush, Flush}; +pub use self::read_exact::{read_exact, ReadExact}; +pub use self::read_to_end::{read_to_end, ReadToEnd}; +pub use self::task::{TaskIo, TaskIoRead, TaskIoWrite}; +pub use self::window::Window; +pub use self::write_all::{write_all, WriteAll}; diff --git a/src/io/read_exact.rs b/src/io/read_exact.rs new file mode 100644 index 00000000..b251bfea --- /dev/null +++ b/src/io/read_exact.rs @@ -0,0 +1,77 @@ +use std::io::{self, Read}; +use std::mem; + +use futures::{Poll, Future}; + +/// A future which can be used to easily read the entire contents of a stream +/// into a vector. +/// +/// Created by the `read_exact` function. +pub struct ReadExact<A, T> { + state: State<A, T>, +} + +enum State<A, T> { + Reading { + a: A, + buf: T, + pos: usize, + }, + Empty, +} + +/// Creates a future which will read exactly enough bytes to fill `buf`, +/// returning an error if EOF is hit sooner. +/// +/// The returned future will resolve to both the I/O stream as well as the +/// buffer once the read operation is completed. +/// +/// In the case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success the object will be destroyed and +/// the buffer will be returned, with all data read from the stream appended to +/// the buffer. +pub fn read_exact<A, T>(a: A, buf: T) -> ReadExact<A, T> + where A: Read, + T: AsMut<[u8]>, +{ + ReadExact { + state: State::Reading { + a: a, + buf: buf, + pos: 0, + }, + } +} + +fn eof() -> io::Error { + io::Error::new(io::ErrorKind::UnexpectedEof, "early eof") +} + +impl<A, T> Future for ReadExact<A, T> + where A: Read, + T: AsMut<[u8]>, +{ + type Item = (A, T); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, T), io::Error> { + match self.state { + State::Reading { ref mut a, ref mut buf, ref mut pos } => { + let buf = buf.as_mut(); + while *pos < buf.len() { + let n = try_nb!(a.read(&mut buf[*pos..])); + *pos += n; + if n == 0 { + return Poll::Err(eof()) + } + } + } + State::Empty => panic!("poll a WriteAll after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, buf, .. } => Poll::Ok((a, buf)), + State::Empty => panic!(), + } + } +} diff --git a/src/io/read_to_end.rs b/src/io/read_to_end.rs new file mode 100644 index 00000000..80e7cd6c --- /dev/null +++ b/src/io/read_to_end.rs @@ -0,0 +1,62 @@ +use std::io::{self, Read}; +use std::mem; + +use futures::{Poll, Future}; + +/// A future which can be used to easily read the entire contents of a stream +/// into a vector. +/// +/// Created by the `read_to_end` function. +pub struct ReadToEnd<A> { + state: State<A>, +} + +enum State<A> { + Reading { + a: A, + buf: Vec<u8>, + }, + Empty, +} + +/// Creates a future which will read all the bytes associated with the I/O +/// object `A` into the buffer provided. +/// +/// In the case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success the object will be destroyed and +/// the buffer will be returned, with all data read from the stream appended to +/// the buffer. +pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A> + where A: Read, +{ + ReadToEnd { + state: State::Reading { + a: a, + buf: buf, + } + } +} + +impl<A> Future for ReadToEnd<A> + where A: Read, +{ + type Item = (A, Vec<u8>); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> { + match self.state { + State::Reading { ref mut a, ref mut buf } => { + // If we get `Ok`, then we know the stream hit EOF and we're done. If we + // hit "would block" then all the read data so far is in our buffer, and + // otherwise we propagate errors + try_nb!(a.read_to_end(buf)); + }, + State::Empty => panic!("poll ReadToEnd after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, buf } => Poll::Ok((a, buf)), + State::Empty => unreachable!(), + } + } +} diff --git a/src/io/task.rs b/src/io/task.rs new file mode 100644 index 00000000..3c87a32e --- /dev/null +++ b/src/io/task.rs @@ -0,0 +1,102 @@ +use std::cell::RefCell; +use std::io::{self, Read, Write}; + +use futures::task::TaskData; + +/// Abstraction that allows inserting an I/O object into task-local storage, +/// returning a handle that can be split. +/// +/// A `TaskIo<T>` handle implements the `ReadTask` and `WriteTask` and will only +/// work with the same task that the associated object was inserted into. The +/// handle may then be optionally `split` into the read/write halves so they can +/// be worked with independently. +/// +/// Note that it is important that the future returned from `TaskIo::new`, when +/// polled, will pin the yielded `TaskIo<T>` object to that specific task. Any +/// attempt to read or write the object on other tasks will result in a panic. +pub struct TaskIo<T> { + handle: TaskData<RefCell<T>>, +} + +/// The readable half of a `TaskIo<T>` instance returned from `TaskIo::split`. +/// +/// This handle implements the `ReadTask` trait and can be used to split up an +/// I/O object into two distinct halves. +pub struct TaskIoRead<T> { + handle: TaskData<RefCell<T>>, +} + +/// The writable half of a `TaskIo<T>` instance returned from `TaskIo::split`. +/// +/// This handle implements the `WriteTask` trait and can be used to split up an +/// I/O object into two distinct halves. +pub struct TaskIoWrite<T> { + handle: TaskData<RefCell<T>>, +} + +impl<T> TaskIo<T> { + /// Returns a new future which represents the insertion of the I/O object + /// `T` into task local storage, returning a `TaskIo<T>` handle to it. + /// + /// The returned future will never resolve to an error. + pub fn new(t: T) -> TaskIo<T> { + TaskIo { + handle: TaskData::new(RefCell::new(t)), + } + } +} + +impl<T> TaskIo<T> + where T: Read + Write, +{ + /// For an I/O object which is both readable and writable, this method can + /// be used to split the handle into two independently owned halves. + /// + /// The returned pair implements the `ReadTask` and `WriteTask` traits, + /// respectively, and can be used to pass around the object to different + /// combinators if necessary. + pub fn split(self) -> (TaskIoRead<T>, TaskIoWrite<T>) { + (TaskIoRead { handle: self.handle.clone() }, + TaskIoWrite { handle: self.handle }) + } +} + +impl<T> Read for TaskIo<T> + where T: io::Read, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.handle.with(|t| t.borrow_mut().read(buf)) + } +} + +impl<T> Write for TaskIo<T> + where T: io::Write, +{ + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.handle.with(|t| t.borrow_mut().write(buf)) + } + + fn flush(&mut self) -> io::Result<()> { + self.handle.with(|t| t.borrow_mut().flush()) + } +} + +impl<T> Read for TaskIoRead<T> + where T: io::Read, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.handle.with(|t| t.borrow_mut().read(buf)) + } +} + +impl<T> Write for TaskIoWrite<T> + where T: io::Write, +{ + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.handle.with(|t| t.borrow_mut().write(buf)) + } + + fn flush(&mut self) -> io::Result<()> { + self.handle.with(|t| t.borrow_mut().flush()) + } +} diff --git a/src/io/window.rs b/src/io/window.rs new file mode 100644 index 00000000..c6136099 --- /dev/null +++ b/src/io/window.rs @@ -0,0 +1,116 @@ +use std::ops; + +/// A owned window around an underlying buffer. +/// +/// Normally slices work great for considering sub-portions of a buffer, but +/// unfortunately a slice is a *borrowed* type in Rust which has an associated +/// lifetime. When working with future and async I/O these lifetimes are not +/// always appropriate, and are sometimes difficult to store in tasks. This +/// type strives to fill this gap by providing an "owned slice" around an +/// underlying buffer of bytes. +/// +/// A `Window<T>` wraps an underlying buffer, `T`, and has configurable +/// start/end indexes to alter the behavior of the `AsRef<[u8]>` implementation +/// that this type carries. +/// +/// This type can be particularly useful when working with the `write_all` +/// combinator in this crate. Data can be sliced via `Window`, consumed by +/// `write_all`, and then earned back once the write operation finishes through +/// the `into_inner` method on this type. +pub struct Window<T> { + inner: T, + range: ops::Range<usize>, +} + +impl<T: AsRef<[u8]>> Window<T> { + /// Creates a new window around the buffer `t` defaulting to the entire + /// slice. + /// + /// Further methods can be called on the returned `Window<T>` to alter the + /// window into the data provided. + pub fn new(t: T) -> Window<T> { + Window { + range: 0..t.as_ref().len(), + inner: t, + } + } + + /// Gets a shared reference to the underlying buffer inside of this + /// `Window`. + pub fn get_ref(&self) -> &T { + &self.inner + } + + /// Gets a mutable reference to the underlying buffer inside of this + /// `Window`. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Consumes this `Window`, returning the underlying buffer. + pub fn into_inner(self) -> T { + self.inner + } + + /// Returns the starting index of this window into the underlying buffer + /// `T`. + pub fn start(&self) -> usize { + self.range.start + } + + /// Returns the end index of this window into the underlying buffer + /// `T`. + pub fn end(&self) -> usize { + self.range.end + } + + /// Changes the starting index of this window to the index specified. + /// + /// Returns the windows back to chain multiple calls to this method. + /// + /// # Panics + /// + /// This method will panic if `start` is out of bounds for the underlying + /// slice or if it comes after the `end` configured in this window. + pub fn set_start(&mut self, start: usize) -> &mut Window<T> { + assert!(start < self.inner.as_ref().len()); + assert!(start <= self.range.end); + self.range.start = start; + self + } + + /// Changes the end index of this window to the index specified. + /// + /// Returns the windows back to chain multiple calls to this method. + /// + /// # Panics + /// + /// This method will panic if `end` is out of bounds for the underlying + /// slice or if it comes after the `end` configured in this window. + pub fn set_end(&mut self, end: usize) -> &mut Window<T> { + assert!(end < self.inner.as_ref().len()); + assert!(self.range.start <= end); + self.range.end = end; + self + } + + // TODO: how about a generic set() method along the lines of: + // + // buffer.set(..3) + // .set(0..2) + // .set(4..) + // + // etc. +} + +impl<T: AsRef<[u8]>> AsRef<[u8]> for Window<T> { + fn as_ref(&self) -> &[u8] { + &self.inner.as_ref()[self.range.start..self.range.end] + } +} + +impl<T: AsMut<[u8]>> AsMut<[u8]> for Window<T> { + fn as_mut(&mut self) -> &mut [u8] { + &mut self.inner.as_mut()[self.range.start..self.range.end] + } +} diff --git a/src/io/write_all.rs b/src/io/write_all.rs new file mode 100644 index 00000000..df4751df --- /dev/null +++ b/src/io/write_all.rs @@ -0,0 +1,80 @@ +use std::io::{self, Write}; +use std::mem; + +use futures::{Poll, Future}; + +/// A future used to write the entire contents of some data to a stream. +/// +/// This is created by the `write_all` top-level method. +pub struct WriteAll<A, T> { + state: State<A, T>, +} + +enum State<A, T> { + Writing { + a: A, + buf: T, + pos: usize, + }, + Empty, +} + +/// Creates a future that will write the entire contents of the buffer `buf` to +/// the stream `a` provided. |