summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2016-08-26 14:30:46 -0700
committerAlex Crichton <alex@alexcrichton.com>2016-08-26 14:39:47 -0700
commitf107c8d860137b41e509b179d605db30082cb0da (patch)
tree063e7b0c86785f70a63458e17183602e58d9c7f3
parente71d509fee767d6b796ba18a5501f80f0fb4babc (diff)
Rename to tokio-core, add in futures-io
Renames the futures-mio crate to tokio-core, pulls in the futures-io crate under an `io` module, and gets everything compiling.
-rw-r--r--.gitignore2
-rw-r--r--.travis.yml24
-rw-r--r--Cargo.toml15
-rw-r--r--README.md45
-rw-r--r--src/bin/echo.rs7
-rw-r--r--src/bin/sink.rs11
-rw-r--r--src/channel.rs2
-rw-r--r--src/io/copy.rs82
-rw-r--r--src/io/flush.rs39
-rw-r--r--src/io/mod.rs47
-rw-r--r--src/io/read_exact.rs77
-rw-r--r--src/io/read_to_end.rs62
-rw-r--r--src/io/task.rs102
-rw-r--r--src/io/window.rs116
-rw-r--r--src/io/write_all.rs80
-rw-r--r--src/lib.rs5
-rw-r--r--src/lock.rs106
-rw-r--r--src/slot.rs691
-rw-r--r--src/tcp.rs2
-rw-r--r--src/timeout.rs2
-rw-r--r--src/udp.rs2
-rw-r--r--tests/buffered.rs7
-rw-r--r--tests/chain.rs7
-rw-r--r--tests/echo.rs7
-rw-r--r--tests/limit.rs7
-rw-r--r--tests/stream-buffered.rs7
-rw-r--r--tests/tcp.rs8
-rw-r--r--tests/timeout.rs4
-rw-r--r--tests/udp.rs6
29 files changed, 1495 insertions, 77 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 00000000..a9d37c56
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+target
+Cargo.lock
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 00000000..a89a11e3
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,24 @@
+language: rust
+
+rust:
+ - stable
+ - beta
+ - nightly
+sudo: false
+before_script:
+ - pip install 'travis-cargo<0.2' --user && export PATH=$HOME/.local/bin:$PATH
+script:
+ - cargo build
+ - cargo test
+ - cargo doc --no-deps
+after_success:
+ - travis-cargo --only nightly doc-upload
+env:
+ global:
+ - secure: LVrtwDI0IJradnHRk53dGWtTS+limhkuHym17wuto/Zaz6IJB9aq7G5wSYuZU3qabcxah7pigjXPFgzYwFD6mNHW1DAuAko1qOi4AL0rvg+rA7Fa5E9NEIxoqzCf+wBtqCvomBe/akOs7UtHdjE3CZpIEPwSHVf3jf61suB0mPVUW0AFTHvYTvHT4lyHjlruY+Ifi350yb4t0Oy9rU1bHNtX0q1T0mKuTnKkmpCT2Kj+2L7afgsAR3UgBjL3Py89LXmnF5VxSMGJWa6HL3xgEi3CXxBRQFdr+vipIDejWtjY+7DzvSRHid1rVfwCLdLfTwvA3Pf3b0I5DSJnjzRgKkfiH2j7JNFtCvLz+mM5C/4QJzAgNmdyNuDv0qOy07OABtYs/LE60f6ZZ5YMZAloMtA/9qQjJx+c2jO2nTZkx6vNJ5C421yzm2klQSL0d8pFaDmojqC5pT85MYhf3mESqSw1UjwFPa0xFtysT52oJBcyvwI/wBYbK40sArjSDZaU2Jncw9ptDWML/xUM+sWHF7ZW/mI1V15lqaCBX91xlbppfWDMgNF2c60vC90t0entbGpYLvHjQMdW6iucbsLLN5KAPzYPuufX2vJa8V1gxMxZ7CLcVLx9lmm3uEdrOZLEg4Fg7H7Xqc2JRygbNrTtOeBw1/o73znnnjEv8Vl3xqg=
+notifications:
+ email:
+ on_success: never
+os:
+ - linux
+ - osx
diff --git a/Cargo.toml b/Cargo.toml
index dfadac02..469ab143 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,19 +1,18 @@
[package]
-name = "futures-mio"
+name = "tokio-core"
version = "0.1.0"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT/Apache-2.0"
-repository = "https://github.com/alexcrichton/futures-rs"
-homepage = "https://github.com/alexcrichton/futures-rs"
-documentation = "http://alexcrichton.com/futures-rs/futures_mio/"
+repository = "https://github.com/tokio-rs/tokio-core"
+homepage = "https://github.com/tokio-rs/tokio-core"
+documentation = "https://tokio-rs.github.io/tokio-core"
description = """
-Bindings from the `futures` crate to the `mio` crate to get I/O in the form of
-futures and streams.
+Core I/O and event loop primitives for asynchronous I/O in Rust. Foundation for
+the rest of the tokio crates.
"""
[dependencies]
-futures = { path = "..", version = "0.1.0" }
-futures-io = { path = "../futures-io", version = "0.1.0" }
+futures = { git = "https://github.com/alexcrichton/futures-rs" }
log = "0.3"
mio = { git = "https://github.com/carllerche/mio" }
scoped-tls = "0.1.0"
diff --git a/README.md b/README.md
index c09ac5dc..98ea7739 100644
--- a/README.md
+++ b/README.md
@@ -1,12 +1,12 @@
-# futures-mio
+# tokio-core
-Bindings to the `mio` crate implementing the `futures-io` and `futures`
-abstractions.
+Core I/O and event loop abstraction for asynchronous I/O in Rust built on
+`futures` and `mio`.
-[![Build Status](https://travis-ci.org/alexcrichton/futures-rs.svg?branch=master)](https://travis-ci.org/alexcrichton/futures-rs)
+[![Build Status](https://travis-ci.org/tokio-rs/tokio-core.svg?branch=master)](https://travis-ci.org/tokio-rs/tokio-core)
[![Build status](https://ci.appveyor.com/api/projects/status/yl5w3ittk4kggfsh?svg=true)](https://ci.appveyor.com/project/alexcrichton/futures-rs)
-[Documentation](http://alexcrichton.com/futures-rs/futures_mio)
+[Documentation](https://tokio-rs.github.io/tokio-core)
## Usage
@@ -14,13 +14,13 @@ First, add this to your `Cargo.toml`:
```toml
[dependencies]
-futures-mio = { git = "https://github.com/alexcrichton/futures-rs" }
+tokio-core = { git = "https://github.com/tokio-rs/tokio-core" }
```
Next, add this to your crate:
```rust
-extern crate futures_mio;
+extern crate tokio_core;
```
## Examples
@@ -30,18 +30,17 @@ There are a few small examples showing off how to use this library:
* [echo.rs] - a simple TCP echo server
* [socks5.rs] - an implementation of a SOCKSv5 proxy server
-[echo.rs]: https://github.com/alexcrichton/futures-rs/blob/master/futures-mio/src/bin/echo.rs
-[socks5.rs]: https://github.com/alexcrichton/futures-rs/blob/master/futures-socks5/src/main.rs
+[echo.rs]: https://github.com/tokio-rs/tokio-core/blob/master/src/bin/echo.rs
+[socks5.rs]: https://github.com/tokio-rs/tokio-socks5/blob/master/src/main.rs
-## What is futures-mio?
+## What is tokio-core?
This crate is a connection `futures`, a zero-cost implementation of futures in
-Rust, and `mio`, a crate for zero-cost asynchronous I/O, and `futures-io`,
-abstractions for I/O on top of the `futures` crate. The types and structures
-implemented in `futures-mio` implement `Future` and `Stream` traits as
-appropriate. For example connecting a TCP stream returns a `Future` resolving
-to a TCP stream, and a TCP listener implements a stream of TCP streams
-(accepted connections).
+Rust, and `mio` and a crate for zero-cost asynchronous I/O. The types and
+structures implemented in `tokio-core` implement `Future` and `Stream` traits
+as appropriate. For example connecting a TCP stream returns a `Future`
+resolving to a TCP stream, and a TCP listener implements a stream of TCP
+streams (accepted connections).
This crate also provides facilities such as:
@@ -52,20 +51,20 @@ This crate also provides facilities such as:
* Data owned and local to the event loop
* An `Executor` implementation for a futures' `Task`
-The intention of `futures-mio` is to provide a concrete implementation for
-crates built on top of `futures-io`. For example you can easily turn a TCP
-stream into a TLS/SSL stream with the [`futures-tls`] crate or use the
-combinators to compose working with data on sockets.
+The intention of `tokio-core` is to provide a concrete implementation for crates
+built on top of asynchronous I/O. For example you can easily turn a TCP stream
+into a TLS/SSL stream with the [`tokio-tls`] crate or use the combinators to
+compose working with data on sockets.
-[`futures-tls`]: http://alexcrichton.com/futures-rs/futures_tls
+[`tokio-tls`]: https://tokio-rs.github.io/tokio-tls
Check out the [documentation] for more information, and more coming here soon!
-[documentation]: http://alexcrichton.com/futures-rs/futures_mio
+[documentation]: https://tokio-rs.github.io/tokio-core
# License
-`futures-mio` is primarily distributed under the terms of both the MIT license
+`tokio-core` is primarily distributed under the terms of both the MIT license
and the Apache License (Version 2.0), with portions covered by various BSD-like
licenses.
diff --git a/src/bin/echo.rs b/src/bin/echo.rs
index ccd94752..710db79d 100644
--- a/src/bin/echo.rs
+++ b/src/bin/echo.rs
@@ -1,22 +1,21 @@
//! An echo server that just writes back everything that's written to it.
extern crate futures;
-extern crate futures_io;
-extern crate futures_mio;
+extern crate tokio_core;
use std::env;
use std::net::SocketAddr;
use futures::Future;
use futures::stream::Stream;
-use futures_io::{copy, TaskIo};
+use tokio_core::io::{copy, TaskIo};
fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
// Create the event loop that will drive this server
- let mut l = futures_mio::Loop::new().unwrap();
+ let mut l = tokio_core::Loop::new().unwrap();
// Create a TCP listener which will listen for incoming connections
let server = l.handle().tcp_listen(&addr);
diff --git a/src/bin/sink.rs b/src/bin/sink.rs
index 49feed35..baf96b8e 100644
--- a/src/bin/sink.rs
+++ b/src/bin/sink.rs
@@ -5,8 +5,7 @@
#[macro_use]
extern crate futures;
-extern crate futures_io;
-extern crate futures_mio;
+extern crate tokio_core;
use std::env;
use std::iter;
@@ -14,13 +13,13 @@ use std::net::SocketAddr;
use futures::Future;
use futures::stream::{self, Stream};
-use futures_io::IoFuture;
+use tokio_core::io::IoFuture;
fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
- let mut l = futures_mio::Loop::new().unwrap();
+ let mut l = tokio_core::Loop::new().unwrap();
let server = l.handle().tcp_listen(&addr).and_then(|socket| {
socket.incoming().and_then(|(socket, addr)| {
println!("got a socket: {}", addr);
@@ -34,10 +33,10 @@ fn main() {
l.run(server).unwrap();
}
-fn write(socket: futures_mio::TcpStream) -> IoFuture<()> {
+fn write(socket: tokio_core::TcpStream) -> IoFuture<()> {
static BUF: &'static [u8] = &[0; 64 * 1024];
let iter = iter::repeat(()).map(|()| Ok(()));
stream::iter(iter).fold(socket, |socket, ()| {
- futures_io::write_all(socket, BUF).map(|(socket, _)| socket)
+ tokio_core::io::write_all(socket, BUF).map(|(socket, _)| socket)
}).map(|_| ()).boxed()
}
diff --git a/src/channel.rs b/src/channel.rs
index cd5fef46..3da44b53 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -3,10 +3,10 @@ use std::sync::mpsc::TryRecvError;
use futures::{Future, Poll};
use futures::stream::Stream;
-use futures_io::IoFuture;
use mio::channel;
use {ReadinessStream, LoopHandle};
+use io::IoFuture;
/// The transmission half of a channel used for sending messages to a receiver.
///
diff --git a/src/io/copy.rs b/src/io/copy.rs
new file mode 100644
index 00000000..042e819b
--- /dev/null
+++ b/src/io/copy.rs
@@ -0,0 +1,82 @@
+use std::io::{self, Read, Write};
+
+use futures::{Future, Poll};
+
+/// A future which will copy all data from a reader into a writer.
+///
+/// Created by the `copy` function, this future will resolve to the number of
+/// bytes copied or an error if one happens.
+pub struct Copy<R, W> {
+ reader: R,
+ read_done: bool,
+ writer: W,
+ pos: usize,
+ cap: usize,
+ amt: u64,
+ buf: Box<[u8]>,
+}
+
+/// Creates a future which represents copying all the bytes from one object to
+/// another.
+///
+/// The returned future will copy all the bytes read from `reader` into the
+/// `writer` specified. This future will only complete once the `reader` has hit
+/// EOF and all bytes have been written to and flushed from the `writer`
+/// provided.
+///
+/// On success the number of bytes is returned and the `reader` and `writer` are
+/// consumed. On error the error is returned and the I/O objects are consumed as
+/// well.
+pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W>
+ where R: Read,
+ W: Write,
+{
+ Copy {
+ reader: reader,
+ read_done: false,
+ writer: writer,
+ amt: 0,
+ pos: 0,
+ cap: 0,
+ buf: Box::new([0; 2048]),
+ }
+}
+
+impl<R, W> Future for Copy<R, W>
+ where R: Read,
+ W: Write,
+{
+ type Item = u64;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<u64, io::Error> {
+ loop {
+ // If our buffer is empty, then we need to read some data to
+ // continue.
+ if self.pos == self.cap && !self.read_done {
+ let n = try_nb!(self.reader.read(&mut self.buf));
+ if n == 0 {
+ self.read_done = true;
+ } else {
+ self.pos = 0;
+ self.cap = n;
+ }
+ }
+
+ // If our buffer has some data, let's write it out!
+ while self.pos < self.cap {
+ let i = try_nb!(self.writer.write(&self.buf[self.pos..self.cap]));
+ self.pos += i;
+ self.amt += i as u64;
+ }
+
+ // If we've written al the data and we've seen EOF, flush out the
+ // data and finish the transfer.
+ // done with the entire transfer.
+ if self.pos == self.cap && self.read_done {
+ try_nb!(self.writer.flush());
+ return Poll::Ok(self.amt)
+ }
+ }
+ }
+}
diff --git a/src/io/flush.rs b/src/io/flush.rs
new file mode 100644
index 00000000..159a178b
--- /dev/null
+++ b/src/io/flush.rs
@@ -0,0 +1,39 @@
+use std::io::{self, Write};
+
+use futures::{Poll, Future};
+
+/// A future used to fully flush an I/O object.
+///
+/// Resolves to the underlying I/O object once the flush operation is complete.
+///
+/// Created by the `flush` function.
+pub struct Flush<A> {
+ a: Option<A>,
+}
+
+/// Creates a future which will entirely flush an I/O object and then yield the
+/// object itself.
+///
+/// This function will consume the object provided if an error happens, and
+/// otherwise it will repeatedly call `flush` until it sees `Ok(())`, scheduling
+/// a retry if `WouldBlock` is seen along the way.
+pub fn flush<A>(a: A) -> Flush<A>
+ where A: Write,
+{
+ Flush {
+ a: Some(a),
+ }
+}
+
+impl<A> Future for Flush<A>
+ where A: Write,
+{
+ type Item = A;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<A, io::Error> {
+ try_nb!(self.a.as_mut().unwrap().flush());
+ Poll::Ok(self.a.take().unwrap())
+ }
+}
+
diff --git a/src/io/mod.rs b/src/io/mod.rs
new file mode 100644
index 00000000..41a73978
--- /dev/null
+++ b/src/io/mod.rs
@@ -0,0 +1,47 @@
+//! I/O conveniences when working with primitives in `tokio-core`
+//!
+//! Contains various combinators to work with I/O objects and type definitions
+//! as well.
+
+use std::io;
+
+use futures::BoxFuture;
+use futures::stream::BoxStream;
+
+/// A convenience typedef around a `Future` whose error component is `io::Error`
+pub type IoFuture<T> = BoxFuture<T, io::Error>;
+
+/// A convenience typedef around a `Stream` whose error component is `io::Error`
+pub type IoStream<T> = BoxStream<T, io::Error>;
+
+/// A convenience macro for working with `io::Result<T>` from the `Read` and
+/// `Write` traits.
+///
+/// This macro takes `io::Result<T>` as input, and returns `T` as the output. If
+/// the input type is of the `Err` variant, then `Poll::NotReady` is returned if
+/// it indicates `WouldBlock` or otherwise `Err` is returned.
+#[macro_export]
+macro_rules! try_nb {
+ ($e:expr) => (match $e {
+ Ok(t) => t,
+ Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => {
+ return ::futures::Poll::NotReady
+ }
+ Err(e) => return ::futures::Poll::Err(e.into()),
+ })
+}
+
+mod copy;
+mod flush;
+mod read_exact;
+mod read_to_end;
+mod task;
+mod window;
+mod write_all;
+pub use self::copy::{copy, Copy};
+pub use self::flush::{flush, Flush};
+pub use self::read_exact::{read_exact, ReadExact};
+pub use self::read_to_end::{read_to_end, ReadToEnd};
+pub use self::task::{TaskIo, TaskIoRead, TaskIoWrite};
+pub use self::window::Window;
+pub use self::write_all::{write_all, WriteAll};
diff --git a/src/io/read_exact.rs b/src/io/read_exact.rs
new file mode 100644
index 00000000..b251bfea
--- /dev/null
+++ b/src/io/read_exact.rs
@@ -0,0 +1,77 @@
+use std::io::{self, Read};
+use std::mem;
+
+use futures::{Poll, Future};
+
+/// A future which can be used to easily read the entire contents of a stream
+/// into a vector.
+///
+/// Created by the `read_exact` function.
+pub struct ReadExact<A, T> {
+ state: State<A, T>,
+}
+
+enum State<A, T> {
+ Reading {
+ a: A,
+ buf: T,
+ pos: usize,
+ },
+ Empty,
+}
+
+/// Creates a future which will read exactly enough bytes to fill `buf`,
+/// returning an error if EOF is hit sooner.
+///
+/// The returned future will resolve to both the I/O stream as well as the
+/// buffer once the read operation is completed.
+///
+/// In the case of an error the buffer and the object will be discarded, with
+/// the error yielded. In the case of success the object will be destroyed and
+/// the buffer will be returned, with all data read from the stream appended to
+/// the buffer.
+pub fn read_exact<A, T>(a: A, buf: T) -> ReadExact<A, T>
+ where A: Read,
+ T: AsMut<[u8]>,
+{
+ ReadExact {
+ state: State::Reading {
+ a: a,
+ buf: buf,
+ pos: 0,
+ },
+ }
+}
+
+fn eof() -> io::Error {
+ io::Error::new(io::ErrorKind::UnexpectedEof, "early eof")
+}
+
+impl<A, T> Future for ReadExact<A, T>
+ where A: Read,
+ T: AsMut<[u8]>,
+{
+ type Item = (A, T);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, T), io::Error> {
+ match self.state {
+ State::Reading { ref mut a, ref mut buf, ref mut pos } => {
+ let buf = buf.as_mut();
+ while *pos < buf.len() {
+ let n = try_nb!(a.read(&mut buf[*pos..]));
+ *pos += n;
+ if n == 0 {
+ return Poll::Err(eof())
+ }
+ }
+ }
+ State::Empty => panic!("poll a WriteAll after it's done"),
+ }
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Reading { a, buf, .. } => Poll::Ok((a, buf)),
+ State::Empty => panic!(),
+ }
+ }
+}
diff --git a/src/io/read_to_end.rs b/src/io/read_to_end.rs
new file mode 100644
index 00000000..80e7cd6c
--- /dev/null
+++ b/src/io/read_to_end.rs
@@ -0,0 +1,62 @@
+use std::io::{self, Read};
+use std::mem;
+
+use futures::{Poll, Future};
+
+/// A future which can be used to easily read the entire contents of a stream
+/// into a vector.
+///
+/// Created by the `read_to_end` function.
+pub struct ReadToEnd<A> {
+ state: State<A>,
+}
+
+enum State<A> {
+ Reading {
+ a: A,
+ buf: Vec<u8>,
+ },
+ Empty,
+}
+
+/// Creates a future which will read all the bytes associated with the I/O
+/// object `A` into the buffer provided.
+///
+/// In the case of an error the buffer and the object will be discarded, with
+/// the error yielded. In the case of success the object will be destroyed and
+/// the buffer will be returned, with all data read from the stream appended to
+/// the buffer.
+pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A>
+ where A: Read,
+{
+ ReadToEnd {
+ state: State::Reading {
+ a: a,
+ buf: buf,
+ }
+ }
+}
+
+impl<A> Future for ReadToEnd<A>
+ where A: Read,
+{
+ type Item = (A, Vec<u8>);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> {
+ match self.state {
+ State::Reading { ref mut a, ref mut buf } => {
+ // If we get `Ok`, then we know the stream hit EOF and we're done. If we
+ // hit "would block" then all the read data so far is in our buffer, and
+ // otherwise we propagate errors
+ try_nb!(a.read_to_end(buf));
+ },
+ State::Empty => panic!("poll ReadToEnd after it's done"),
+ }
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Reading { a, buf } => Poll::Ok((a, buf)),
+ State::Empty => unreachable!(),
+ }
+ }
+}
diff --git a/src/io/task.rs b/src/io/task.rs
new file mode 100644
index 00000000..3c87a32e
--- /dev/null
+++ b/src/io/task.rs
@@ -0,0 +1,102 @@
+use std::cell::RefCell;
+use std::io::{self, Read, Write};
+
+use futures::task::TaskData;
+
+/// Abstraction that allows inserting an I/O object into task-local storage,
+/// returning a handle that can be split.
+///
+/// A `TaskIo<T>` handle implements the `ReadTask` and `WriteTask` and will only
+/// work with the same task that the associated object was inserted into. The
+/// handle may then be optionally `split` into the read/write halves so they can
+/// be worked with independently.
+///
+/// Note that it is important that the future returned from `TaskIo::new`, when
+/// polled, will pin the yielded `TaskIo<T>` object to that specific task. Any
+/// attempt to read or write the object on other tasks will result in a panic.
+pub struct TaskIo<T> {
+ handle: TaskData<RefCell<T>>,
+}
+
+/// The readable half of a `TaskIo<T>` instance returned from `TaskIo::split`.
+///
+/// This handle implements the `ReadTask` trait and can be used to split up an
+/// I/O object into two distinct halves.
+pub struct TaskIoRead<T> {
+ handle: TaskData<RefCell<T>>,
+}
+
+/// The writable half of a `TaskIo<T>` instance returned from `TaskIo::split`.
+///
+/// This handle implements the `WriteTask` trait and can be used to split up an
+/// I/O object into two distinct halves.
+pub struct TaskIoWrite<T> {
+ handle: TaskData<RefCell<T>>,
+}
+
+impl<T> TaskIo<T> {
+ /// Returns a new future which represents the insertion of the I/O object
+ /// `T` into task local storage, returning a `TaskIo<T>` handle to it.
+ ///
+ /// The returned future will never resolve to an error.
+ pub fn new(t: T) -> TaskIo<T> {
+ TaskIo {
+ handle: TaskData::new(RefCell::new(t)),
+ }
+ }
+}
+
+impl<T> TaskIo<T>
+ where T: Read + Write,
+{
+ /// For an I/O object which is both readable and writable, this method can
+ /// be used to split the handle into two independently owned halves.
+ ///
+ /// The returned pair implements the `ReadTask` and `WriteTask` traits,
+ /// respectively, and can be used to pass around the object to different
+ /// combinators if necessary.
+ pub fn split(self) -> (TaskIoRead<T>, TaskIoWrite<T>) {
+ (TaskIoRead { handle: self.handle.clone() },
+ TaskIoWrite { handle: self.handle })
+ }
+}
+
+impl<T> Read for TaskIo<T>
+ where T: io::Read,
+{
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.handle.with(|t| t.borrow_mut().read(buf))
+ }
+}
+
+impl<T> Write for TaskIo<T>
+ where T: io::Write,
+{
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.handle.with(|t| t.borrow_mut().write(buf))
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.handle.with(|t| t.borrow_mut().flush())
+ }
+}
+
+impl<T> Read for TaskIoRead<T>
+ where T: io::Read,
+{
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.handle.with(|t| t.borrow_mut().read(buf))
+ }
+}
+
+impl<T> Write for TaskIoWrite<T>
+ where T: io::Write,
+{
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.handle.with(|t| t.borrow_mut().write(buf))
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.handle.with(|t| t.borrow_mut().flush())
+ }
+}
diff --git a/src/io/window.rs b/src/io/window.rs
new file mode 100644
index 00000000..c6136099
--- /dev/null
+++ b/src/io/window.rs
@@ -0,0 +1,116 @@
+use std::ops;
+
+/// A owned window around an underlying buffer.
+///
+/// Normally slices work great for considering sub-portions of a buffer, but
+/// unfortunately a slice is a *borrowed* type in Rust which has an associated
+/// lifetime. When working with future and async I/O these lifetimes are not
+/// always appropriate, and are sometimes difficult to store in tasks. This
+/// type strives to fill this gap by providing an "owned slice" around an
+/// underlying buffer of bytes.
+///
+/// A `Window<T>` wraps an underlying buffer, `T`, and has configurable
+/// start/end indexes to alter the behavior of the `AsRef<[u8]>` implementation
+/// that this type carries.
+///
+/// This type can be particularly useful when working with the `write_all`
+/// combinator in this crate. Data can be sliced via `Window`, consumed by
+/// `write_all`, and then earned back once the write operation finishes through
+/// the `into_inner` method on this type.
+pub struct Window<T> {
+ inner: T,
+ range: ops::Range<usize>,
+}
+
+impl<T: AsRef<[u8]>> Window<T> {
+ /// Creates a new window around the buffer `t` defaulting to the entire
+ /// slice.
+ ///
+ /// Further methods can be called on the returned `Window<T>` to alter the
+ /// window into the data provided.
+ pub fn new(t: T) -> Window<T> {
+ Window {
+ range: 0..t.as_ref().len(),
+ inner: t,
+ }
+ }
+
+ /// Gets a shared reference to the underlying buffer inside of this
+ /// `Window`.
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ /// Gets a mutable reference to the underlying buffer inside of this
+ /// `Window`.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+
+ /// Consumes this `Window`, returning the underlying buffer.
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+
+ /// Returns the starting index of this window into the underlying buffer
+ /// `T`.
+ pub fn start(&self) -> usize {
+ self.range.start
+ }
+
+ /// Returns the end index of this window into the underlying buffer
+ /// `T`.
+ pub fn end(&self) -> usize {
+ self.range.end
+ }
+
+ /// Changes the starting index of this window to the index specified.
+ ///
+ /// Returns the windows back to chain multiple calls to this method.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `start` is out of bounds for the underlying
+ /// slice or if it comes after the `end` configured in this window.
+ pub fn set_start(&mut self, start: usize) -> &mut Window<T> {
+ assert!(start < self.inner.as_ref().len());
+ assert!(start <= self.range.end);
+ self.range.start = start;
+ self
+ }
+
+ /// Changes the end index of this window to the index specified.
+ ///
+ /// Returns the windows back to chain multiple calls to this method.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `end` is out of bounds for the underlying
+ /// slice or if it comes after the `end` configured in this window.
+ pub fn set_end(&mut self, end: usize) -> &mut Window<T> {
+ assert!(end < self.inner.as_ref().len());
+ assert!(self.range.start <= end);
+ self.range.end = end;
+ self
+ }
+
+ // TODO: how about a generic set() method along the lines of:
+ //
+ // buffer.set(..3)
+ // .set(0..2)
+ // .set(4..)
+ //
+ // etc.
+}
+
+impl<T: AsRef<[u8]>> AsRef<[u8]> for Window<T> {
+ fn as_ref(&self) -> &[u8] {
+ &self.inner.as_ref()[self.range.start..self.range.end]
+ }
+}
+
+impl<T: AsMut<[u8]>> AsMut<[u8]> for Window<T> {
+ fn as_mut(&mut self) -> &mut [u8] {
+ &mut self.inner.as_mut()[self.range.start..self.range.end]
+ }
+}
diff --git a/src/io/write_all.rs b/src/io/write_all.rs
new file mode 100644
index 00000000..df4751df
--- /dev/null
+++ b/src/io/write_all.rs
@@ -0,0 +1,80 @@
+use std::io::{self, Write};
+use std::mem;
+
+use futures::{Poll, Future};
+
+/// A future used to write the entire contents of some data to a stream.
+///
+/// This is created by the `write_all` top-level method.
+pub struct WriteAll<A, T> {
+ state: State<A, T>,
+}
+
+enum State<A, T> {
+ Writing {
+ a: A,
+ buf: T,
+ pos: usize,
+ },
+ Empty,
+}
+
+/// Creates a future that will write the entire contents of the buffer `buf` to
+/// the stream `a` provided.