summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rw-r--r--.travis.yml24
-rw-r--r--Cargo.toml15
-rw-r--r--README.md45
-rw-r--r--src/bin/echo.rs7
-rw-r--r--src/bin/sink.rs11
-rw-r--r--src/channel.rs2
-rw-r--r--src/io/copy.rs82
-rw-r--r--src/io/flush.rs39
-rw-r--r--src/io/mod.rs47
-rw-r--r--src/io/read_exact.rs77
-rw-r--r--src/io/read_to_end.rs62
-rw-r--r--src/io/task.rs102
-rw-r--r--src/io/window.rs116
-rw-r--r--src/io/write_all.rs80
-rw-r--r--src/lib.rs5
-rw-r--r--src/lock.rs106
-rw-r--r--src/slot.rs691
-rw-r--r--src/tcp.rs2
-rw-r--r--src/timeout.rs2
-rw-r--r--src/udp.rs2
-rw-r--r--tests/buffered.rs7
-rw-r--r--tests/chain.rs7
-rw-r--r--tests/echo.rs7
-rw-r--r--tests/limit.rs7
-rw-r--r--tests/stream-buffered.rs7
-rw-r--r--tests/tcp.rs8
-rw-r--r--tests/timeout.rs4
-rw-r--r--tests/udp.rs6
29 files changed, 1495 insertions, 77 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 00000000..a9d37c56
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+target
+Cargo.lock
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 00000000..a89a11e3
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,24 @@
+language: rust
+
+rust:
+ - stable
+ - beta
+ - nightly
+sudo: false
+before_script:
+ - pip install 'travis-cargo<0.2' --user && export PATH=$HOME/.local/bin:$PATH
+script:
+ - cargo build
+ - cargo test
+ - cargo doc --no-deps
+after_success:
+ - travis-cargo --only nightly doc-upload
+env:
+ global:
+ - secure: LVrtwDI0IJradnHRk53dGWtTS+limhkuHym17wuto/Zaz6IJB9aq7G5wSYuZU3qabcxah7pigjXPFgzYwFD6mNHW1DAuAko1qOi4AL0rvg+rA7Fa5E9NEIxoqzCf+wBtqCvomBe/akOs7UtHdjE3CZpIEPwSHVf3jf61suB0mPVUW0AFTHvYTvHT4lyHjlruY+Ifi350yb4t0Oy9rU1bHNtX0q1T0mKuTnKkmpCT2Kj+2L7afgsAR3UgBjL3Py89LXmnF5VxSMGJWa6HL3xgEi3CXxBRQFdr+vipIDejWtjY+7DzvSRHid1rVfwCLdLfTwvA3Pf3b0I5DSJnjzRgKkfiH2j7JNFtCvLz+mM5C/4QJzAgNmdyNuDv0qOy07OABtYs/LE60f6ZZ5YMZAloMtA/9qQjJx+c2jO2nTZkx6vNJ5C421yzm2klQSL0d8pFaDmojqC5pT85MYhf3mESqSw1UjwFPa0xFtysT52oJBcyvwI/wBYbK40sArjSDZaU2Jncw9ptDWML/xUM+sWHF7ZW/mI1V15lqaCBX91xlbppfWDMgNF2c60vC90t0entbGpYLvHjQMdW6iucbsLLN5KAPzYPuufX2vJa8V1gxMxZ7CLcVLx9lmm3uEdrOZLEg4Fg7H7Xqc2JRygbNrTtOeBw1/o73znnnjEv8Vl3xqg=
+notifications:
+ email:
+ on_success: never
+os:
+ - linux
+ - osx
diff --git a/Cargo.toml b/Cargo.toml
index dfadac02..469ab143 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,19 +1,18 @@
[package]
-name = "futures-mio"
+name = "tokio-core"
version = "0.1.0"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT/Apache-2.0"
-repository = "https://github.com/alexcrichton/futures-rs"
-homepage = "https://github.com/alexcrichton/futures-rs"
-documentation = "http://alexcrichton.com/futures-rs/futures_mio/"
+repository = "https://github.com/tokio-rs/tokio-core"
+homepage = "https://github.com/tokio-rs/tokio-core"
+documentation = "https://tokio-rs.github.io/tokio-core"
description = """
-Bindings from the `futures` crate to the `mio` crate to get I/O in the form of
-futures and streams.
+Core I/O and event loop primitives for asynchronous I/O in Rust. Foundation for
+the rest of the tokio crates.
"""
[dependencies]
-futures = { path = "..", version = "0.1.0" }
-futures-io = { path = "../futures-io", version = "0.1.0" }
+futures = { git = "https://github.com/alexcrichton/futures-rs" }
log = "0.3"
mio = { git = "https://github.com/carllerche/mio" }
scoped-tls = "0.1.0"
diff --git a/README.md b/README.md
index c09ac5dc..98ea7739 100644
--- a/README.md
+++ b/README.md
@@ -1,12 +1,12 @@
-# futures-mio
+# tokio-core
-Bindings to the `mio` crate implementing the `futures-io` and `futures`
-abstractions.
+Core I/O and event loop abstraction for asynchronous I/O in Rust built on
+`futures` and `mio`.
-[![Build Status](https://travis-ci.org/alexcrichton/futures-rs.svg?branch=master)](https://travis-ci.org/alexcrichton/futures-rs)
+[![Build Status](https://travis-ci.org/tokio-rs/tokio-core.svg?branch=master)](https://travis-ci.org/tokio-rs/tokio-core)
[![Build status](https://ci.appveyor.com/api/projects/status/yl5w3ittk4kggfsh?svg=true)](https://ci.appveyor.com/project/alexcrichton/futures-rs)
-[Documentation](http://alexcrichton.com/futures-rs/futures_mio)
+[Documentation](https://tokio-rs.github.io/tokio-core)
## Usage
@@ -14,13 +14,13 @@ First, add this to your `Cargo.toml`:
```toml
[dependencies]
-futures-mio = { git = "https://github.com/alexcrichton/futures-rs" }
+tokio-core = { git = "https://github.com/tokio-rs/tokio-core" }
```
Next, add this to your crate:
```rust
-extern crate futures_mio;
+extern crate tokio_core;
```
## Examples
@@ -30,18 +30,17 @@ There are a few small examples showing off how to use this library:
* [echo.rs] - a simple TCP echo server
* [socks5.rs] - an implementation of a SOCKSv5 proxy server
-[echo.rs]: https://github.com/alexcrichton/futures-rs/blob/master/futures-mio/src/bin/echo.rs
-[socks5.rs]: https://github.com/alexcrichton/futures-rs/blob/master/futures-socks5/src/main.rs
+[echo.rs]: https://github.com/tokio-rs/tokio-core/blob/master/src/bin/echo.rs
+[socks5.rs]: https://github.com/tokio-rs/tokio-socks5/blob/master/src/main.rs
-## What is futures-mio?
+## What is tokio-core?
This crate is a connection `futures`, a zero-cost implementation of futures in
-Rust, and `mio`, a crate for zero-cost asynchronous I/O, and `futures-io`,
-abstractions for I/O on top of the `futures` crate. The types and structures
-implemented in `futures-mio` implement `Future` and `Stream` traits as
-appropriate. For example connecting a TCP stream returns a `Future` resolving
-to a TCP stream, and a TCP listener implements a stream of TCP streams
-(accepted connections).
+Rust, and `mio` and a crate for zero-cost asynchronous I/O. The types and
+structures implemented in `tokio-core` implement `Future` and `Stream` traits
+as appropriate. For example connecting a TCP stream returns a `Future`
+resolving to a TCP stream, and a TCP listener implements a stream of TCP
+streams (accepted connections).
This crate also provides facilities such as:
@@ -52,20 +51,20 @@ This crate also provides facilities such as:
* Data owned and local to the event loop
* An `Executor` implementation for a futures' `Task`
-The intention of `futures-mio` is to provide a concrete implementation for
-crates built on top of `futures-io`. For example you can easily turn a TCP
-stream into a TLS/SSL stream with the [`futures-tls`] crate or use the
-combinators to compose working with data on sockets.
+The intention of `tokio-core` is to provide a concrete implementation for crates
+built on top of asynchronous I/O. For example you can easily turn a TCP stream
+into a TLS/SSL stream with the [`tokio-tls`] crate or use the combinators to
+compose working with data on sockets.
-[`futures-tls`]: http://alexcrichton.com/futures-rs/futures_tls
+[`tokio-tls`]: https://tokio-rs.github.io/tokio-tls
Check out the [documentation] for more information, and more coming here soon!
-[documentation]: http://alexcrichton.com/futures-rs/futures_mio
+[documentation]: https://tokio-rs.github.io/tokio-core
# License
-`futures-mio` is primarily distributed under the terms of both the MIT license
+`tokio-core` is primarily distributed under the terms of both the MIT license
and the Apache License (Version 2.0), with portions covered by various BSD-like
licenses.
diff --git a/src/bin/echo.rs b/src/bin/echo.rs
index ccd94752..710db79d 100644
--- a/src/bin/echo.rs
+++ b/src/bin/echo.rs
@@ -1,22 +1,21 @@
//! An echo server that just writes back everything that's written to it.
extern crate futures;
-extern crate futures_io;
-extern crate futures_mio;
+extern crate tokio_core;
use std::env;
use std::net::SocketAddr;
use futures::Future;
use futures::stream::Stream;
-use futures_io::{copy, TaskIo};
+use tokio_core::io::{copy, TaskIo};
fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
// Create the event loop that will drive this server
- let mut l = futures_mio::Loop::new().unwrap();
+ let mut l = tokio_core::Loop::new().unwrap();
// Create a TCP listener which will listen for incoming connections
let server = l.handle().tcp_listen(&addr);
diff --git a/src/bin/sink.rs b/src/bin/sink.rs
index 49feed35..baf96b8e 100644
--- a/src/bin/sink.rs
+++ b/src/bin/sink.rs
@@ -5,8 +5,7 @@
#[macro_use]
extern crate futures;
-extern crate futures_io;
-extern crate futures_mio;
+extern crate tokio_core;
use std::env;
use std::iter;
@@ -14,13 +13,13 @@ use std::net::SocketAddr;
use futures::Future;
use futures::stream::{self, Stream};
-use futures_io::IoFuture;
+use tokio_core::io::IoFuture;
fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
- let mut l = futures_mio::Loop::new().unwrap();
+ let mut l = tokio_core::Loop::new().unwrap();
let server = l.handle().tcp_listen(&addr).and_then(|socket| {
socket.incoming().and_then(|(socket, addr)| {
println!("got a socket: {}", addr);
@@ -34,10 +33,10 @@ fn main() {
l.run(server).unwrap();
}
-fn write(socket: futures_mio::TcpStream) -> IoFuture<()> {
+fn write(socket: tokio_core::TcpStream) -> IoFuture<()> {
static BUF: &'static [u8] = &[0; 64 * 1024];
let iter = iter::repeat(()).map(|()| Ok(()));
stream::iter(iter).fold(socket, |socket, ()| {
- futures_io::write_all(socket, BUF).map(|(socket, _)| socket)
+ tokio_core::io::write_all(socket, BUF).map(|(socket, _)| socket)
}).map(|_| ()).boxed()
}
diff --git a/src/channel.rs b/src/channel.rs
index cd5fef46..3da44b53 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -3,10 +3,10 @@ use std::sync::mpsc::TryRecvError;
use futures::{Future, Poll};
use futures::stream::Stream;
-use futures_io::IoFuture;
use mio::channel;
use {ReadinessStream, LoopHandle};
+use io::IoFuture;
/// The transmission half of a channel used for sending messages to a receiver.
///
diff --git a/src/io/copy.rs b/src/io/copy.rs
new file mode 100644
index 00000000..042e819b
--- /dev/null
+++ b/src/io/copy.rs
@@ -0,0 +1,82 @@
+use std::io::{self, Read, Write};
+
+use futures::{Future, Poll};
+
+/// A future which will copy all data from a reader into a writer.
+///
+/// Created by the `copy` function, this future will resolve to the number of
+/// bytes copied or an error if one happens.
+pub struct Copy<R, W> {
+ reader: R,
+ read_done: bool,
+ writer: W,
+ pos: usize,
+ cap: usize,
+ amt: u64,
+ buf: Box<[u8]>,
+}
+
+/// Creates a future which represents copying all the bytes from one object to
+/// another.
+///
+/// The returned future will copy all the bytes read from `reader` into the
+/// `writer` specified. This future will only complete once the `reader` has hit
+/// EOF and all bytes have been written to and flushed from the `writer`
+/// provided.
+///
+/// On success the number of bytes is returned and the `reader` and `writer` are
+/// consumed. On error the error is returned and the I/O objects are consumed as
+/// well.
+pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W>
+ where R: Read,
+ W: Write,
+{
+ Copy {
+ reader: reader,
+ read_done: false,
+ writer: writer,
+ amt: 0,
+ pos: 0,
+ cap: 0,
+ buf: Box::new([0; 2048]),
+ }
+}
+
+impl<R, W> Future for Copy<R, W>
+ where R: Read,
+ W: Write,
+{
+ type Item = u64;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<u64, io::Error> {
+ loop {
+ // If our buffer is empty, then we need to read some data to
+ // continue.
+ if self.pos == self.cap && !self.read_done {
+ let n = try_nb!(self.reader.read(&mut self.buf));
+ if n == 0 {
+ self.read_done = true;
+ } else {
+ self.pos = 0;
+ self.cap = n;
+ }
+ }
+
+ // If our buffer has some data, let's write it out!
+ while self.pos < self.cap {
+ let i = try_nb!(self.writer.write(&self.buf[self.pos..self.cap]));
+ self.pos += i;
+ self.amt += i as u64;
+ }
+
+ // If we've written al the data and we've seen EOF, flush out the
+ // data and finish the transfer.
+ // done with the entire transfer.
+ if self.pos == self.cap && self.read_done {
+ try_nb!(self.writer.flush());
+ return Poll::Ok(self.amt)
+ }
+ }
+ }
+}
diff --git a/src/io/flush.rs b/src/io/flush.rs
new file mode 100644
index 00000000..159a178b
--- /dev/null
+++ b/src/io/flush.rs
@@ -0,0 +1,39 @@
+use std::io::{self, Write};
+
+use futures::{Poll, Future};
+
+/// A future used to fully flush an I/O object.
+///
+/// Resolves to the underlying I/O object once the flush operation is complete.
+///
+/// Created by the `flush` function.
+pub struct Flush<A> {
+ a: Option<A>,
+}
+
+/// Creates a future which will entirely flush an I/O object and then yield the
+/// object itself.
+///
+/// This function will consume the object provided if an error happens, and
+/// otherwise it will repeatedly call `flush` until it sees `Ok(())`, scheduling
+/// a retry if `WouldBlock` is seen along the way.
+pub fn flush<A>(a: A) -> Flush<A>
+ where A: Write,
+{
+ Flush {
+ a: Some(a),
+ }
+}
+
+impl<A> Future for Flush<A>
+ where A: Write,
+{
+ type Item = A;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<A, io::Error> {
+ try_nb!(self.a.as_mut().unwrap().flush());
+ Poll::Ok(self.a.take().unwrap())
+ }
+}
+
diff --git a/src/io/mod.rs b/src/io/mod.rs
new file mode 100644
index 00000000..41a73978
--- /dev/null
+++ b/src/io/mod.rs
@@ -0,0 +1,47 @@
+//! I/O conveniences when working with primitives in `tokio-core`
+//!
+//! Contains various combinators to work with I/O objects and type definitions
+//! as well.
+
+use std::io;
+
+use futures::BoxFuture;
+use futures::stream::BoxStream;
+
+/// A convenience typedef around a `Future` whose error component is `io::Error`
+pub type IoFuture<T> = BoxFuture<T, io::Error>;
+
+/// A convenience typedef around a `Stream` whose error component is `io::Error`
+pub type IoStream<T> = BoxStream<T, io::Error>;
+
+/// A convenience macro for working with `io::Result<T>` from the `Read` and
+/// `Write` traits.
+///
+/// This macro takes `io::Result<T>` as input, and returns `T` as the output. If
+/// the input type is of the `Err` variant, then `Poll::NotReady` is returned if
+/// it indicates `WouldBlock` or otherwise `Err` is returned.
+#[macro_export]
+macro_rules! try_nb {
+ ($e:expr) => (match $e {
+ Ok(t) => t,
+ Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => {
+ return ::futures::Poll::NotReady
+ }
+ Err(e) => return ::futures::Poll::Err(e.into()),
+ })
+}
+
+mod copy;
+mod flush;
+mod read_exact;
+mod read_to_end;
+mod task;
+mod window;
+mod write_all;
+pub use self::copy::{copy, Copy};
+pub use self::flush::{flush, Flush};
+pub use self::read_exact::{read_exact, ReadExact};
+pub use self::read_to_end::{read_to_end, ReadToEnd};
+pub use self::task::{TaskIo, TaskIoRead, TaskIoWrite};
+pub use self::window::Window;
+pub use self::write_all::{write_all, WriteAll};
diff --git a/src/io/read_exact.rs b/src/io/read_exact.rs
new file mode 100644
index 00000000..b251bfea
--- /dev/null
+++ b/src/io/read_exact.rs
@@ -0,0 +1,77 @@
+use std::io::{self, Read};
+use std::mem;
+
+use futures::{Poll, Future};
+
+/// A future which can be used to easily read the entire contents of a stream
+/// into a vector.
+///
+/// Created by the `read_exact` function.
+pub struct ReadExact<A, T> {
+ state: State<A, T>,
+}
+
+enum State<A, T> {
+ Reading {
+ a: A,
+ buf: T,
+ pos: usize,
+ },
+ Empty,
+}
+
+/// Creates a future which will read exactly enough bytes to fill `buf`,
+/// returning an error if EOF is hit sooner.
+///
+/// The returned future will resolve to both the I/O stream as well as the
+/// buffer once the read operation is completed.
+///
+/// In the case of an error the buffer and the object will be discarded, with
+/// the error yielded. In the case of success the object will be destroyed and
+/// the buffer will be returned, with all data read from the stream appended to
+/// the buffer.
+pub fn read_exact<A, T>(a: A, buf: T) -> ReadExact<A, T>
+ where A: Read,
+ T: AsMut<[u8]>,
+{
+ ReadExact {
+ state: State::Reading {
+ a: a,
+ buf: buf,
+ pos: 0,
+ },
+ }
+}
+
+fn eof() -> io::Error {
+ io::Error::new(io::ErrorKind::UnexpectedEof, "early eof")
+}
+
+impl<A, T> Future for ReadExact<A, T>
+ where A: Read,
+ T: AsMut<[u8]>,
+{
+ type Item = (A, T);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, T), io::Error> {
+ match self.state {
+ State::Reading { ref mut a, ref mut buf, ref mut pos } => {
+ let buf = buf.as_mut();
+ while *pos < buf.len() {
+ let n = try_nb!(a.read(&mut buf[*pos..]));
+ *pos += n;
+ if n == 0 {
+ return Poll::Err(eof())
+ }
+ }
+ }
+ State::Empty => panic!("poll a WriteAll after it's done"),
+ }
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Reading { a, buf, .. } => Poll::Ok((a, buf)),
+ State::Empty => panic!(),
+ }
+ }
+}
diff --git a/src/io/read_to_end.rs b/src/io/read_to_end.rs
new file mode 100644
index 00000000..80e7cd6c
--- /dev/null
+++ b/src/io/read_to_end.rs
@@ -0,0 +1,62 @@
+use std::io::{self, Read};
+use std::mem;
+
+use futures::{Poll, Future};
+
+/// A future which can be used to easily read the entire contents of a stream
+/// into a vector.
+///
+/// Created by the `read_to_end` function.
+pub struct ReadToEnd<A> {
+ state: State<A>,
+}
+
+enum State<A> {
+ Reading {
+ a: A,
+ buf: Vec<u8>,
+ },
+ Empty,
+}
+
+/// Creates a future which will read all the bytes associated with the I/O
+/// object `A` into the buffer provided.
+///
+/// In the case of an error the buffer and the object will be discarded, with
+/// the error yielded. In the case of success the object will be destroyed and
+/// the buffer will be returned, with all data read from the stream appended to
+/// the buffer.
+pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A>
+ where A: Read,
+{
+ ReadToEnd {
+ state: State::Reading {
+ a: a,
+ buf: buf,
+ }
+ }
+}
+
+impl<A> Future for ReadToEnd<A>
+ where A: Read,
+{
+ type Item = (A, Vec<u8>);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> {
+ match self.state {
+ State::Reading { ref mut a, ref mut buf } => {
+ // If we get `Ok`, then we know the stream hit EOF and we're done. If we
+ // hit "would block" then all the read data so far is in our buffer, and
+ // otherwise we propagate errors
+ try_nb!(a.read_to_end(buf));
+ },
+ State::Empty => panic!("poll ReadToEnd after it's done"),
+ }
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Reading { a, buf } => Poll::Ok((a, buf)),
+ State::Empty => unreachable!(),
+ }
+ }
+}
diff --git a/src/io/task.rs b/src/io/task.rs
new file mode 100644
index 00000000..3c87a32e
--- /dev/null
+++ b/src/io/task.rs
@@ -0,0 +1,102 @@
+use std::cell::RefCell;
+use std::io::{self, Read, Write};
+
+use futures::task::TaskData;
+
+/// Abstraction that allows inserting an I/O object into task-local storage,
+/// returning a handle that can be split.
+///
+/// A `TaskIo<T>` handle implements the `ReadTask` and `WriteTask` and will only
+/// work with the same task that the associated object was inserted into. The
+/// handle may then be optionally `split` into the read/write halves so they can
+/// be worked with independently.
+///
+/// Note that it is important that the future returned from `TaskIo::new`, when
+/// polled, will pin the yielded `TaskIo<T>` object to that specific task. Any
+/// attempt to read or write the object on other tasks will result in a panic.
+pub struct TaskIo<T> {
+ handle: TaskData<RefCell<T>>,
+}
+
+/// The readable half of a `TaskIo<T>` instance returned from `TaskIo::split`.
+///
+/// This handle implements the `ReadTask` trait and can be used to split up an
+/// I/O object into two distinct halves.
+pub struct TaskIoRead<T> {
+ handle: TaskData<RefCell<T>>,
+}
+
+/// The writable half of a `TaskIo<T>` instance returned from `TaskIo::split`.
+///
+/// This handle implements the `WriteTask` trait and can be used to split up an
+/// I/O object into two distinct halves.
+pub struct TaskIoWrite<T> {
+ handle: TaskData<RefCell<T>>,
+}
+
+impl<T> TaskIo<T> {
+ /// Returns a new future which represents the insertion of the I/O object
+ /// `T` into task local storage, returning a `TaskIo<T>` handle to it.
+ ///
+ /// The returned future will never resolve to an error.
+ pub fn new(t: T) -> TaskIo<T> {
+ TaskIo {
+ handle: TaskData::new(RefCell::new(t)),
+ }
+ }
+}
+
+impl<T> TaskIo<T>
+ where T: Read + Write,
+{
+ /// For an I/O object which is both readable and writable, this method can
+ /// be used to split the handle into two independently owned halves.
+ ///
+ /// The returned pair implements the `ReadTask` and `WriteTask` traits,
+ /// respectively, and can be used to pass around the object to different
+ /// combinators if necessary.
+ pub fn split(self) -> (TaskIoRead<T>, TaskIoWrite<T>) {
+ (TaskIoRead { handle: self.handle.clone() },
+ TaskIoWrite { handle: self.handle })
+ }
+}
+
+impl<T> Read for TaskIo<T>
+ where T: io::Read,
+{
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.handle.with(|t| t.borrow_mut().read(buf))
+ }
+}
+
+impl<T> Write for TaskIo<T>
+ where T: io::Write,
+{
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.handle.with(|t| t.borrow_mut().write(buf))
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.handle.with(|t| t.borrow_mut().flush())
+ }
+}
+
+impl<T> Read for TaskIoRead<T>
+ where T: io::Read,
+{
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.handle.with(|t| t.borrow_mut().read(buf))
+ }
+}
+
+impl<T> Write for TaskIoWrite<T>
+ where T: io::Write,
+{
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.handle.with(|t| t.borrow_mut().write(buf))
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.handle.with(|t| t.borrow_mut().flush())
+ }
+}
diff --git a/src/io/window.rs b/src/io/window.rs
new file mode 100644
index 00000000..c6136099
--- /dev/null
+++ b/src/io/window.rs
@@ -0,0 +1,116 @@
+use std::ops;
+
+/// A owned window around an underlying buffer.
+///
+/// Normally slices work great for considering sub-portions of a buffer, but
+/// unfortunately a slice is a *borrowed* type in Rust which has an associated
+/// lifetime. When working with future and async I/O these lifetimes are not
+/// always appropriate, and are sometimes difficult to store in tasks. This
+/// type strives to fill this gap by providing an "owned slice" around an
+/// underlying buffer of bytes.
+///
+/// A `Window<T>` wraps an underlying buffer, `T`, and has configurable
+/// start/end indexes to alter the behavior of the `AsRef<[u8]>` implementation
+/// that this type carries.
+///
+/// This type can be particularly useful when working with the `write_all`
+/// combinator in this crate. Data can be sliced via `Window`, consumed by
+/// `write_all`, and then earned back once the write operation finishes through
+/// the `into_inner` method on this type.
+pub struct Window<T> {
+ inner: T,
+ range: ops::Range<usize>,
+}
+
+impl<T: AsRef<[u8]>> Window<T> {
+ /// Creates a new window around the buffer `t` defaulting to the entire
+ /// slice.
+ ///
+ /// Further methods can be called on the returned `Window<T>` to alter the
+ /// window into the data provided.
+ pub fn new(t: T) -> Window<T> {
+ Window {
+ range: 0..t.as_ref().len(),
+ inner: t,
+ }
+ }
+
+ /// Gets a shared reference to the underlying buffer inside of this
+ /// `Window`.
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ /// Gets a mutable reference to the underlying buffer inside of this
+ /// `Window`.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+
+ /// Consumes this `Window`, returning the underlying buffer.
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+
+ /// Returns the starting index of this window into the underlying buffer
+ /// `T`.
+ pub fn start(&self) -> usize {
+ self.range.start
+ }
+
+ /// Returns the end index of this window into the underlying buffer
+ /// `T`.
+ pub fn end(&self) -> usize {
+ self.range.end
+ }
+
+ /// Changes the starting index of this window to the index specified.
+ ///
+ /// Returns the windows back to chain multiple calls to this method.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `start` is out of bounds for the underlying
+ /// slice or if it comes after the `end` configured in this window.
+ pub fn set_start(&mut self, start: usize) -> &mut Window<T> {
+ assert!(start < self.inner.as_ref().len());
+ assert!(start <= self.range.end);
+ self.range.start = start;
+ self
+ }
+
+ /// Changes the end index of this window to the index specified.
+ ///
+ /// Returns the windows back to chain multiple calls to this method.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `end` is out of bounds for the underlying
+ /// slice or if it comes after the `end` configured in this window.
+ pub fn set_end(&mut self, end: usize) -> &mut Window<T> {
+ assert!(end < self.inner.as_ref().len());
+ assert!(self.range.start <= end);
+ self.range.end = end;
+ self
+ }
+
+ // TODO: how about a generic set() method along the lines of:
+ //
+ // buffer.set(..3)
+ // .set(0..2)
+ // .set(4..)
+ //
+ // etc.
+}
+
+impl<T: AsRef<[u8]>> AsRef<[u8]> for Window<T> {
+ fn as_ref(&self) -> &[u8] {
+ &self.inner.as_ref()[self.range.start..self.range.end]
+ }
+}
+
+impl<T: AsMut<[u8]>> AsMut<[u8]> for Window<T> {
+ fn as_mut(&mut self) -> &mut [u8] {
+ &mut self.inner.as_mut()[self.range.start..self.range.end]
+ }
+}
diff --git a/src/io/write_all.rs b/src/io/write_all.rs
new file mode 100644
index 00000000..df4751df
--- /dev/null
+++ b/src/io/write_all.rs
@@ -0,0 +1,80 @@
+use std::io::{self, Write};
+use std::mem;
+
+use futures::{Poll, Future};
+
+/// A future used to write the entire contents of some data to a stream.
+///
+/// This is created by the `write_all` top-level method.
+pub struct WriteAll<A, T> {
+ state: State<A, T>,
+}
+
+enum State<A, T> {
+ Writing {
+ a: A,
+ buf: T,
+ pos: usize,
+ },
+ Empty,
+}
+
+/// Creates a future that will write the entire contents of the buffer `buf` to
+/// the stream `a` provided.
+///
+/// The returned future will not return until all the data has been written, and
+/// the future will resolve to the stream as well as the buffer (for reuse if
+/// needed).
+///
+/// Any error which happens during writing will cause both the stream and the
+/// buffer to get destroyed.
+///
+/// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should
+/// be broadly applicable to accepting data which can be converted to a slice.
+/// The `Window` struct is also available in this crate to provide a different
+/// window into a slice if necessary.
+pub fn write_all<A, T>(a: A, buf: T) -> WriteAll<A, T>
+ where A: Write,
+ T: AsRef<[u8]>,
+{
+ WriteAll {
+ state: State::Writing {
+ a: a,
+ buf: buf,
+ pos: 0,
+ },
+ }
+}
+
+fn zero_write() -> io::Error {
+ io::Error::new(io::ErrorKind::WriteZero, "zero-length write")
+}
+
+impl<A, T> Future for WriteAll<A, T>
+ where A: Write,
+ T: AsRef<[u8]>,
+{
+ type Item = (A, T);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, T), io::Error> {
+ match self.state {
+ State::Writing { ref mut a, ref buf, ref mut pos } => {
+ let buf = buf.as_ref();
+ while *pos < buf.len() {
+ let n = try_nb!(a.write(&buf[*pos..]));
+ *pos += n;
+ if n == 0 {
+ return Poll::Err(zero_write())
+ }
+ }
+ }
+ State::Empty => panic!("poll a WriteAll after it's done"),
+ }
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Writing { a, buf, .. } => Poll::Ok((a, buf)),
+ State::Empty => panic!(),
+ }
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index 449dc600..3250a0dc 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -6,7 +6,6 @@
#![deny(missing_docs)]
extern crate futures;
-extern crate futures_io;
extern crate mio;
extern crate slab;
@@ -16,9 +15,7 @@ extern crate scoped_tls;
#[macro_use]
extern crate log;
-#[path = "../../src/slot.rs"]
mod slot;
-#[path = "../../src/lock.rs"]
mod lock;
mod channel;
@@ -30,6 +27,8 @@ mod timeout;
mod timer_wheel;
mod udp;
+pub mod io;
+
pub use channel::{Sender, Receiver};
pub use event_loop::{Loop, LoopPin, LoopHandle, AddSource, AddTimeout};
pub use event_loop::{LoopData, AddLoopData, TimeoutToken, IoToken};
diff --git a/src/lock.rs b/src/lock.rs
new file mode 100644
index 00000000..e5bc6b2b
--- /dev/null
+++ b/src/lock.rs
@@ -0,0 +1,106 @@
+//! A "mutex" which only supports try_lock
+//!
+//! As a futures library the eventual call to an event loop should be the only
+//! thing that ever blocks, so this is assisted with a fast user-space
+//! implementation of a lock that can only have a `try_lock` operation.
+
+extern crate core;
+
+use self::core::cell::UnsafeCell;
+use self::core::ops::{Deref, DerefMut};
+use self::core::sync::atomic::Ordering::{Acquire, Release};
+use self::core::sync::atomic::AtomicBool;
+
+/// A "mutex" around a value, similar to `std::sync::Mutex<T>`.
+///
+/// This lock only supports the `try_lock` operation, however, and does not
+/// implement poisoning.
+pub struct Lock<T> {
+ locked: AtomicBool,
+ data: UnsafeCell<T>,
+}
+
+/// Sentinel representing an acquired lock through which the data can be
+/// accessed.
+pub struct TryLock<'a, T: 'a> {
+ __ptr: &'a Lock<T>,
+}
+
+// The `Lock` structure is basically just a `Mutex<T>`, and these two impls are
+// intended to mirror the standard library's corresponding impls for `Mutex<T>`.
+//
+// If a `T` is sendable across threads, so is the lock, and `T` must be sendable
+// across threads to be `Sync` because it allows mutable access from multiple
+// threads.
+unsafe impl<T: Send> Send for Lock<T> {}
+unsafe impl<T: Send> Sync for Lock<T> {}
+
+impl<T> Lock<T> {
+ /// Creates a new lock around the given value.
+ pub fn new(t: T) -> Lock<T> {
+ Lock {
+ locked: AtomicBool::new(false),
+ data: UnsafeCell::new(t),
+ }
+ }
+
+ /// Attempts to acquire this lock, returning whether the lock was acquired or
+ /// not.
+ ///
+ /// If `Some` is returned then the data this lock protects can be accessed
+ /// through the sentinel. This sentinel allows both mutable and immutable
+ /// access.
+ ///
+ /// If `None` is returned then the lock is already locked, either elsewhere
+ /// on this thread or on another thread.
+ pub fn try_lock(&self) -> Option<TryLock<T>> {
+ if !self.locked.swap(true, Acquire) {
+ Some(TryLock { __ptr: self })
+ } else {
+ None
+ }
+ }
+}
+
+impl<'a, T> Deref for TryLock<'a, T> {
+ type Target = T;
+ fn deref(&self) -> &T {
+ // The existence of `TryLock` represents that we own the lock, so we
+ // can safely access the data here.
+ unsafe { &*self.__ptr.data.get() }
+ }
+}
+
+impl<'a, T> DerefMut for TryLock<'a, T> {
+ fn deref_mut(&mut self) -> &mut T {
+ // The existence of `TryLock` represents that we own the lock, so we
+ // can safely access the data here.
+ //
+ // Additionally, we're the *only* `TryLock` in existence so mutable
+ // access should be ok.
+ unsafe { &mut *self.__ptr.data.get() }
+ }
+}
+
+impl<'a, T> Drop for TryLock<'a, T> {
+ fn drop(&mut self) {
+ self.__ptr.locked.store(false, Release);
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::Lock;
+
+ #[test]
+ fn smoke() {
+ let a = Lock::new(1);
+ let mut a1 = a.try_lock().unwrap();
+ assert!(a.try_lock().is_none());
+ assert_eq!(*a1, 1);
+ *a1 = 2;
+ drop(a1);
+ assert_eq!(*a.try_lock().unwrap(), 2);
+ assert_eq!(*a.try_lock().unwrap(), 2);
+ }
+}
diff --git a/src/slot.rs b/src/slot.rs
new file mode 100644
index 00000000..d802c987
--- /dev/null
+++ b/src/slot.rs
@@ -0,0 +1,691 @@
+//! A slot in memory for communicating between a producer and a consumer.
+//!
+//! This module contains an implementation detail of this library for a type
+//! which is only intended to be shared between one consumer and one producer of
+//! a value. It is unlikely that this module will survive stabilization of this
+//! library, so it is not recommended to rely on it.
+
+#![allow(dead_code)] // imported in a few places
+
+use std::prelude::v1::*;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+use lock::Lock;
+
+/// A slot in memory intended to represent the communication channel between one
+/// producer and one consumer.
+///
+/// Each slot contains space for a piece of data of type `T`, and can have
+/// callbacks registered to run when the slot is either full or empty.
+///
+/// Slots are only intended to be shared between exactly one producer and
+/// exactly one consumer. If there are multiple concurrent producers or
+/// consumers then this is still memory safe but will have unpredictable results
+/// (and maybe panics). Note that this does not require that the "consumer" is
+/// the same for the entire lifetime of a slot, simply that there is only one
+/// consumer at a time.
+///
+/// # Registering callbacks
+///
+/// [`on_empty`](#method.on_empty) registers a callback to run when the slot
+/// becomes empty, and [`on_full`](#method.on_full) registers one to run when it
+/// becomes full. In both cases, the callback will run immediately if possible.
+///
+/// At most one callback can be registered at any given time: it is an error to
+/// attempt to register a callback with `on_full` if one is currently registered
+/// via `on_empty`, or any other combination.
+///
+/// # Cancellation
+///
+/// Registering a callback returns a `Token` which can be used to
+/// [`cancel`](#method.cancel) the callback. Only callbacks that have not yet
+/// started running can be canceled. Canceling a callback that has already run
+/// is not an error, and `cancel` does not signal whether or not the callback
+/// was actually canceled to the caller.
+pub struct Slot<T> {
+ // The purpose of this data type is to communicate when a value becomes
+ // available and coordinate between a producer and consumer about that
+ // value. Slots end up being at the core of many futures as they handle
+ // values transferring between producers and consumers, which means that
+ // they can never block.
+ //
+ // As a result, this `Slot` is a lock-free implementation in terms of not
+ // actually blocking at any point in time. The `Lock` types are
+ // half-optional and half-not-optional. They aren't actually mutexes as they
+ // only support a `try_lock` operation, and all the methods below ensure
+ // that progress can always be made without blocking.
+ //
+ // The `state` variable keeps track of the state of this slot, while the
+ // other fields here are just the payloads of the slot itself. Note that the
+ // exact bits of `state` are typically wrapped up in a `State` for
+ // inspection (see below).
+ state: AtomicUsize,
+ slot: Lock<Option<T>>,
+ on_full: Lock<Option<Box<FnBox<T>>>>,
+ on_empty: Lock<Option<(Box<FnBox2<T>>, Option<T>)>>,
+}
+
+/// Error value returned from erroneous calls to `try_produce`, which contains
+/// the value that was passed to `try_produce`.
+#[derive(Debug, PartialEq)]
+pub struct TryProduceError<T>(T);
+
+/// Error value returned from erroneous calls to `try_consume`.
+#[derive(Debug, PartialEq)]
+pub struct TryConsumeError(());
+
+/// Error value returned from erroneous calls to `on_full`.
+#[derive(Debug, PartialEq)]
+pub struct OnFullError(());
+
+/// Error value returned from erroneous calls to `on_empty`.
+#[derive(Debug, PartialEq)]
+pub struct OnEmptyError(());
+
+/// A `Token` represents a registered callback, and can be used to cancel the callback.
+#[derive(Clone, Copy)]
+pub struct Token(usize);
+
+// Slot state: the lowest 3 bits are flags; the remaining bits are used to
+// store the `Token` for the currently registered callback. The special token
+// value 0 means no callback is registered.
+//
+// The flags are:
+// - `DATA`: the `Slot` contains a value
+// - `ON_FULL`: the `Slot` has an `on_full` callback registered
+// - `ON_EMPTY`: the `Slot` has an `on_empty` callback registered
+struct State(usize);
+
+const DATA: usize = 1 << 0;
+const ON_FULL: usize = 1 << 1;
+const ON_EMPTY: usize = 1 << 2;
+const STATE_BITS: usize = 3;
+const STATE_MASK: usize = (1 << STATE_BITS) - 1;
+
+fn _is_send<T: Send>() {}
+fn _is_sync<T: Send>() {}
+
+fn _assert() {
+ _is_send::<Slot<i32>>();
+ _is_sync::<Slot<u32>>();
+}
+
+impl<T> Slot<T> {
+ /// Creates a new `Slot` containing `val`, which may be `None` to create an
+ /// empty `Slot`.
+ pub fn new(val: Option<T>) -> Slot<T> {
+ Slot {
+ state: AtomicUsize::new(if val.is_some() {DATA} else {0}),
+ slot: Lock::new(val),
+ on_full: Lock::new(None),
+ on_empty: Lock::new(None),
+ }
+ }
+
+ /// Attempts to store `t` in the slot.
+ ///
+ /// This method can only be called by the one consumer working on this
+ /// `Slot`. Concurrent calls to this method or `on_empty` will result in
+ /// panics or possibly errors.
+ ///
+ /// # Errors
+ ///
+ /// Returns `Err` if the slot is already full. The value you attempted to
+ /// store is included in the error value.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if called concurrently with `try_produce` or
+ /// `on_empty`, or if `on_empty` has been called previously but the callback
+ /// hasn't fired.
+ pub fn try_produce(&self, t: T) -> Result<(), TryProduceError<T>> {
+ // First up, let's take a look at our current state. Of our three flags,
+ // we check a few:
+ //
+ // * DATA - if this is set, then the production fails as a value has
+ // already been produced and we're not ready to receive it yet.
+ // * ON_EMPTY - this should never be set as it indicates a contract
+ // violation as the producer already registered interest in
+ // a value but the callback wasn't fired.
+ // * ON_FULL - doesn't matter in this use case, we don't check it as
+ // either state is valid.
+ let mut state = State(self.state.load(Ordering::SeqCst));
+ assert!(!state.flag(ON_EMPTY));
+ if state.flag(DATA) {
+ return Err(TryProduceError(t))
+ }
+
+ // Ok, so we've determined that our state is either `ON_FULL` or `0`, in
+ // both cases we're going to store our data into our slot. This should
+ // always succeed as access to `slot` is gated on the `DATA` flag being
+ // set on the consumer side (which isn't set) and there should only be
+ // one producer.
+ let mut slot = self.slot.try_lock().expect("interference with consumer?");
+ assert!(slot.is_none());
+ *slot = Some(t);
+ drop(slot);
+
+ // Next, we update our state with `DATA` to say that something is
+ // available, and we also unset `ON_FULL` because we'll invoke the
+ // callback if it's available.
+ loop {
+ assert!(!state.flag(ON_EMPTY));
+ let new_state = state.set_flag(DATA, true).set_flag(ON_FULL, false);
+ let old = self.state.compare_and_swap(state.0,
+ new_state.0,
+ Ordering::SeqCst);
+ if old == state.0 {
+ break
+ }
+ state.0 = old;
+ }
+
+ // If our previous state we transitioned from indicates that it has an
+ // on-full callback, we call that callback here. There's a few unwraps
+ // here that should never fail because the consumer shouldn't be placing
+ // another callback here and there shouldn't be any other producers as
+ // well.
+ if state.flag(ON_FULL) {
+ let cb = self.on_full.try_lock().expect("interference2")
+ .take().expect("ON_FULL but no callback");
+ cb.call_box(self);
+ }
+ Ok(())
+ }
+
+ /// Registers `f` as a callback to run when the slot becomes empty.
+ ///
+ /// The callback will run immediately if the slot is already empty. Returns
+ /// a token that can be used to cancel the callback. This method is to be
+ /// called by the producer, and it is illegal to call this method
+ /// concurrently with either `on_empty` or `try_produce`.
+ ///
+ /// # Panics
+ ///
+ /// Panics if another callback was already registered via `on_empty` or
+ /// `on_full`, or if this value is called concurrently with other producer
+ /// methods.
+ pub fn on_empty<F>(&self, item: Option<T>, f: F) -> Token
+ where F: FnOnce(&Slot<T>, Option<T>) + Send + 'static
+ {
+ // First up, as usual, take a look at our state. Of the three flags we
+ // check two:
+ //
+ // * DATA - if set, we keep going, but if unset we're done as there's no
+ // data and we're already empty.
+ // * ON_EMPTY - this should be impossible as it's a contract violation
+ // to call this twice or concurrently.
+ // * ON_FULL - it's illegal to have both an empty and a full callback
+ // simultaneously, so we check this just after we ensure
+ // there's data available. If there's data there should not
+ // be a full callback as it should have been called.
+ let mut state = State(self.state.load(Ordering::SeqCst));
+ assert!(!state.flag(ON_EMPTY));
+ if !state.flag(DATA) {
+ f(self, item);
+ return Token(0)
+ }
+ assert!(!state.flag(ON_FULL));
+
+ // At this point we've precisely determined that our state is `DATA` and
+ // all other flags are unset. We're cleared for landing in initializing
+ // the `on_empty` slot so we store our callback here.
+ let mut slot = self.on_empty.try_lock().expect("on_empty interference");
+ assert!(slot.is_none());
+ *slot = Some((Box::new(f), item));
+ drop(slot);
+
+ // In this loop, we transition ourselves from the `DATA` state to a
+ // state which has the on empty flag state. Note that we also increase
+ // the token of this state as we're registering a new callback.
+ loop {
+ assert!(state.flag(DATA));
+ assert!(!state.flag(ON_FULL));
+ assert!(!state.flag(ON_EMPTY));
+ let new_state = state.set_flag(ON_EMPTY, true)
+ .set_token(state.token() + 1);
+ let old = self.state.compare_and_swap(state.0,
+ new_state.0,
+ Ordering::SeqCst);
+
+ // If we succeeded in the CAS, then we're done and our token is
+ // valid.
+ if old == state.0 {
+ return Token(new_state.token())
+ }
+ state.0 = old;
+
+ // If we failed the CAS but the data was taken in the meantime we
+ // abort our attempt to set on-empty and call the callback
+ // immediately. Note that the on-empty flag was never set, so it
+ // should still be there and should be available to take.
+ if !state.flag(DATA) {
+ let cb = self.on_empty.try_lock().expect("on_empty interference2")
+ .take().expect("on_empty not empty??");
+ let (cb, item) = cb;
+ cb.call_box(self, item);
+ return Token(0)
+ }
+ }
+ }
+
+ /// Attempts to consume the value stored in the slot.
+ ///
+ /// This method can only be called by the one consumer of this slot, and
+ /// cannot be called concurrently with `try_consume` or `on_full`.
+ ///
+ /// # Errors
+ ///
+ /// Returns `Err` if the slot is already empty.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if called concurrently with `try_consume` or
+ /// `on_full`, or otherwise show weird behavior.
+ pub fn try_consume(&self) -> Result<T, TryConsumeError> {
+ // The implementation of this method is basically the same as
+ // `try_produce` above, it's just the opposite of all the operations.
+ let mut state = State(self.state.load(Ordering::SeqCst));
+ assert!(!state.flag(ON_FULL));
+ if !state.flag(DATA) {
+ return Err(TryConsumeError(()))
+ }
+ let mut slot = self.slot.try_lock().expect("interference with producer?");
+ let val = slot.take().expect("DATA but not data");
+ drop(slot);
+
+ loop {
+ assert!(!state.flag(ON_FULL));
+ let new_state = state.set_flag(DATA, false).set_flag(ON_EMPTY, false);
+ let old = self.state.compare_and_swap(state.0,
+ new_state.0,
+ Ordering::SeqCst);
+ if old == state.0 {
+ break
+ }
+ state.0 = old;
+ }
+ assert!(!state.flag(ON_FULL));
+ if state.flag(ON_EMPTY) {
+ let cb = self.on_empty.try_lock().expect("interference3")
+ .take().expect("ON_EMPTY but no callback");
+ let (cb, item) = cb;
+ cb.call_box(self, item);
+ }
+ Ok(val)
+ }
+
+ /// Registers `f` as a callback to run when the slot becomes full.
+ ///
+ /// The callback will run immediately if the slot is already full. Returns a
+ /// token that can be used to cancel the callback.
+ ///
+ /// This method is to be called by the consumer.
+ ///
+ /// # Panics
+ ///
+ /// Panics if another callback was already registered via `on_empty` or
+ /// `on_full` or if called concurrently with `on_full` or `try_consume`.
+ pub fn on_full<F>(&self, f: F) -> Token
+ where F: FnOnce(&Slot<T>) + Send + 'static
+ {
+ // The implementation of this method is basically the same as
+ // `on_empty` above, it's just the opposite of all the operations.
+ let mut state = State(self.state.load(Ordering::SeqCst));
+ assert!(!state.flag(ON_FULL));
+ if state.flag(DATA) {
+ f(self);
+ return Token(0)
+ }
+ assert!(!state.flag(ON_EMPTY));
+
+ let mut slot = self.on_full.try_lock().expect("on_full interference");
+ assert!(slot.is_none());
+ *slot = Some(Box::new(f));
+ drop(slot);
+
+ loop {
+ assert!(!state.flag(DATA));
+ assert!(!state.flag(ON_EMPTY));
+ assert!(!state.flag(ON_FULL));
+ let new_state = state.set_flag(ON_FULL, true)
+ .set_token(state.token() + 1);
+ let old = self.state.compare_and_swap(state.0,
+ new_state.0,
+ Ordering::SeqCst);
+ if old == state.0 {
+ return Token(new_state.token())
+ }
+ state.0 = old;
+
+ if state.flag(DATA) {
+ let cb = self.on_full.try_lock().expect("on_full interference2")
+ .take().expect("on_full not full??");
+ cb.call_box(self);
+ return Token(0)
+ }
+ }
+ }
+
+ /// Cancels the callback associated with `token`.
+ ///
+ /// Canceling a callback that has already started running, or has already
+ /// run will do nothing, and is not an error. See
+ /// [Cancellation](#cancellation).
+ ///
+ /// # Panics
+ ///
+ /// This method may cause panics if it is called concurrently with
+ /// `on_empty` or `on_full`, depending on which callback is being canceled.
+ pub fn cancel(&self, token: Token) {
+ // Tokens with a value of "0" are sentinels which don't actually do
+ // anything.
+ let token = token.0;
+ if token == 0 {
+ return
+ }
+
+ let mut state = State(self.state.load(Ordering::SeqCst));
+ loop {
+ // If we've moved on to a different token, then we're guaranteed
+ // that our token won't show up again, so we can return immediately
+ // as our closure has likely already run (or been previously
+ // canceled).
+ if state.token() != token {
+ return
+ }
+
+ // If our token matches, then let's see if we're cancelling either
+ // the on-full or on-empty callbacks. It's illegal to have them both
+ // registered, so we only need to look at one.
+ //
+ // If neither are set then the token has probably already run, so we
+ // just continue along our merry way and don't worry.
+ let new_state = if state.flag(ON_FULL) {
+ assert!(!state.flag(ON_EMPTY));
+ state.set_flag(ON_FULL, false)
+ } else if state.flag(ON_EMPTY) {
+ assert!(!state.flag(ON_FULL));
+ state.set_flag(ON_EMPTY, false)
+ } else {
+ return
+ };
+ let old = self.state.compare_and_swap(state.0,
+ new_state.0,
+ Ordering::SeqCst);
+ if old == state.0 {
+ break
+ }
+ state.0 = old;
+ }
+
+ // Figure out which callback we just canceled, and now that the flag is
+ // unset we should own the callback to clear it.
+
+ if state.flag(ON_FULL) {
+ let cb = self.on_full.try_lock().expect("on_full interference3")
+ .take().expect("on_full not full??");
+ drop(cb);
+ } else {
+ let cb = self.on_empty.try_lock().expect("on_empty interference3")
+ .take().expect("on_empty not empty??");
+ drop(cb);
+ }
+ }
+}
+
+impl<T> TryProduceError<T> {
+ /// Extracts the value that was attempted to be produced.
+ pub fn into_inner(self) -> T {
+ self.0
+ }
+}
+
+trait FnBox<T>: Send {
+ fn call_box(self: Box<Self>, other: &Slot<T>);
+}
+
+impl<T, F> FnBox<T> for F
+ where F: FnOnce(&Slot<T>) + Send,
+{
+ fn call_box(self: Box<F>, other: &Slot<T>) {
+ (*self)(other)
+ }
+}
+
+trait FnBox2<T>: Send {
+ fn call_box(self: Box<Self>, other: &Slot<T>, Option<T>);
+}
+
+impl<T, F> FnBox2<T> for F
+ where F: FnOnce(&Slot<T>, Option<T>) + Send,
+{
+ fn call_box(self: Box<F>, other: &Slot<T>, item: Option<T>) {
+ (*self)(other, item)
+ }
+}
+
+impl State {
+ fn flag(&self, f: usize) -> bool {
+ self.0 & f != 0
+ }
+
+ fn set_flag(&self, f: usize, val: bool) -> State {
+ State(if val {
+ self.0 | f
+ } else {
+ self.0 & !f
+ })
+ }
+
+ fn token(&self) -> usize {
+ self.0 >> STATE_BITS
+ }
+
+ fn set_token(&self, gen: usize) -> State {
+ State((gen << STATE_BITS) | (self.0 & STATE_MASK))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+ use std::sync::atomic::{AtomicUsize, Ordering};
+ use std::thread;
+
+ use super::Slot;
+
+ #[test]
+ fn sequential() {
+ let slot = Slot::new(Some(1));
+
+ // We can consume once
+ assert_eq!(slot.try_consume(), Ok(1));
+ assert!(slot.try_consume().is_err());
+
+ // Consume a production
+ assert_eq!(slot.try_produce(2), Ok(()));
+ assert_eq!(slot.try_consume(), Ok(2));
+
+ // Can't produce twice
+ assert_eq!(slot.try_produce(3), Ok(()));
+ assert!(slot.try_produce(3).is_err());
+
+ // on_full is run immediately if full
+ let hit = Arc::new(AtomicUsize::new(0));
+ let hit2 = hit.clone();
+ slot.on_full(move |_s| {
+ hit2.fetch_add(1, Ordering::SeqCst);
+ });
+ assert_eq!(hit.load(Ordering::SeqCst), 1);
+
+ // on_full can be run twice, and we can consume in the callback
+ let hit2 = hit.clone();
+ slot.on_full(move |s| {
+ hit2.fetch_add(1, Ordering::SeqCst);
+ assert_eq!(s.try_consume(), Ok(3));
+ });
+ assert_eq!(hit.load(Ordering::SeqCst), 2);
+
+ // Production can't run a previous callback
+ assert_eq!(slot.try_produce(4), Ok(()));
+ assert_eq!(hit.load(Ordering::SeqCst), 2);
+ assert_eq!(slot.try_consume(), Ok(4));
+
+ // Productions run new callbacks
+ let hit2 = hit.clone();
+ slot.on_full(move |s| {
+ hit2.fetch_add(1, Ordering::SeqCst);
+ assert_eq!(s.try_consume(), Ok(5));
+ });
+ assert_eq!(slot.try_produce(5), Ok(()));
+ assert_eq!(hit.load(Ordering::SeqCst), 3);
+
+ // on empty should fire immediately for an empty slot
+ let hit2 = hit.clone();
+ slot.on_empty(None, move |_, _| {
+ hit2.fetch_add(1, Ordering::SeqCst);
+ });
+ assert_eq!(hit.load(Ordering::SeqCst), 4);
+ }
+
+ #[test]
+ fn channel() {
+ const N: usize = 10000;
+
+ struct Sender {
+ slot: Arc<Slot<usize>>,
+ hit: Arc<AtomicUsize>,
+ }
+
+ struct Receiver {
+ slot: Arc<Slot<usize>>,
+ hit: Arc<AtomicUsize>,
+ }
+
+ impl Sender {
+ fn send(&self, val: usize) {
+ if self.slot.try_produce(val).is_ok() {
+ return
+ }
+ let me = thread::current();
+ self.hit.store(0, Ordering::SeqCst);
+ let hit = self.hit.clone();
+ self.slot.on_empty(None, move |_slot, _| {
+ hit.store(1, Ordering::SeqCst);
+ me.unpark();
+ });
+ while self.hit.load(Ordering::SeqCst) == 0 {
+ thread::park();
+ }
+ self.slot.try_produce(val).expect("can't produce after on_empty")
+ }
+ }
+
+ impl Receiver {
+ fn recv(&self) -> usize {
+ if let Ok(i) = self.slot.try_consume() {
+ return i
+ }
+
+ let me = thread::current();
+ self.hit.store(0, Ordering::SeqCst);
+ let hit = self.hit.clone();
+ self.slot.on_full(move |_slot| {
+ hit.store(1, Ordering::SeqCst);
+ me.unpark();
+ });
+ while self.hit.load(Ordering::SeqCst) == 0 {
+ thread::park();
+ }
+ self.slot.try_consume().expect("can't consume after on_full")
+ }
+ }
+
+ let slot = Arc::new(Slot::new(None));
+ let slot2 = slot.clone();
+
+ let tx = Sender { slot: slot2, hit: Arc::new(AtomicUsize::new(0)) };
+ let rx = Receiver { slot: slot, hit: Arc::new(AtomicUsize::new(0)) };
+
+ let a = thread::spawn(move || {
+ for i in 0..N {
+ assert_eq!(rx.recv(), i);
+ }
+ });
+
+ for i in 0..N {
+ tx.send(i);
+ }
+
+ a.join().unwrap();
+ }
+
+ #[test]
+ fn cancel() {
+ let slot = Slot::new(None);
+ let hits = Arc::new(AtomicUsize::new(0));
+
+ let add = || {
+ let hits = hits.clone();
+ move |_: &Slot<u32>| { hits.fetch_add(1, Ordering::SeqCst); }
+ };
+ let add_empty = || {
+ let hits = hits.clone();
+ move |_: &Slot<u32>, _: Option<u32>| {
+ hits.fetch_add(1, Ordering::SeqCst);
+ }
+ };
+
+ // cancel on_full
+ let n = hits.load(Ordering::SeqCst);
+ assert_eq!(hits.load(Ordering::SeqCst), n);
+ let token = slot.on_full(add());
+ assert_eq!(hits.load(Ordering::SeqCst), n);
+ slot.cancel(token);
+ assert_eq!(hits.load(Ordering::SeqCst), n);
+ assert!(slot.try_consume().is_err());
+ assert!(slot.try_produce(1).is_ok());
+ assert!(slot.try_consume().is_ok());
+ assert_eq!(hits.load(Ordering::SeqCst), n);
+
+ // cancel on_empty
+ let n = hits.load(Ordering::SeqCst);
+ assert_eq!(hits.load(Ordering::SeqCst), n);
+ slot.try_produce(1).unwrap();
+ let token = slot.on_empty(None, add_empty());
+ assert_eq!(hits.load(Ordering::SeqCst), n);
+ slot.cancel(token);
+ assert_eq!(hits.load(Ordering::SeqCst), n);
+ assert!(slot.try_produce(1).is_err());
+
+ // cancel with no effect
+ let n = hits.load(Ordering::SeqCst);
+ assert_eq!(hits.load(Ordering::SeqCst), n);
+ let token = slot.on_full(add());
+ assert_eq!(hits.load(Ordering::SeqCst), n + 1);
+ slot.cancel(token);
+ assert_eq!(hits.load(Ordering::SeqCst), n + 1);
+ assert!(slot.try_consume().is_ok());
+ let token = slot.on_empty(None, add_empty());
+ assert_eq!(hits.load(Ordering::SeqCst), n + 2);
+ slot.cancel(token);
+ assert_eq!(hits.load(Ordering::SeqCst), n + 2);
+
+ // cancel old ones don't count
+ let n = hits.load(Ordering::SeqCst);
+ assert_eq!(hits.load(Ordering::SeqCst), n);
+ let token1 = slot.on_full(add());
+ assert_eq!(hits.load(Ordering::SeqCst), n);
+ assert!(slot.try_produce(1).is_ok());
+ assert_eq!(hits.load(Ordering::SeqCst), n + 1);
+ assert!(slot.try_consume().is_ok());
+ assert_eq!(hits.load(Ordering::SeqCst), n + 1);
+ let token2 = slot.on_full(add());
+ assert_eq!(hits.load(Ordering::SeqCst), n + 1);
+ slot.cancel(token1);
+ assert_eq!(hits.load(Ordering::SeqCst), n + 1);
+ slot.cancel(token2);
+ assert_eq!(hits.load(Ordering::SeqCst), n + 1);
+ }
+}
diff --git a/src/tcp.rs b/src/tcp.rs
index c0aca32b..bacf2ce1 100644
--- a/src/tcp.rs
+++ b/src/tcp.rs
@@ -5,10 +5,10 @@ use std::net::{self, SocketAddr, Shutdown};
use futures::stream::Stream;
use futures::{Future, IntoFuture, failed, Poll};
-use futures_io::{IoFuture, IoStream};
use mio;
use {ReadinessStream, LoopHandle};
+use io::{IoFuture, IoStream};
/// An I/O object representing a TCP socket listening for incoming connections.
///
diff --git a/src/timeout.rs b/src/timeout.rs
index 93199be1..e4ada461 100644
--- a/src/timeout.rs
+++ b/src/timeout.rs
@@ -2,9 +2,9 @@ use std::io;
use std::time::{Duration, Instant};
use futures::{Future, Poll};
-use futures_io::IoFuture;
use LoopHandle;
+use io::IoFuture;
use event_loop::TimeoutToken;
/// A future representing the notification that a timeout has occurred.
diff --git a/src/udp.rs b/src/udp.rs
index 5a0de2b5..b0a4dc6f 100644
--- a/src/udp.rs
+++ b/src/udp.rs
@@ -3,10 +3,10 @@ use std::net::{self, SocketAddr, Ipv4Addr, Ipv6Addr};
use std::fmt;
use futures::{Future, failed, Poll};
-use futures_io::IoFuture;
use mio;
use {ReadinessStream, LoopHandle};
+use io::IoFuture;
/// An I/O object representing a UDP socket.
pub struct UdpSocket {
diff --git a/tests/buffered.rs b/tests/buffered.rs
index a6bc8992..3da24e85 100644
--- a/tests/buffered.rs
+++ b/tests/buffered.rs
@@ -1,6 +1,5 @@
extern crate futures;
-extern crate futures_io;
-extern crate futures_mio;
+extern crate tokio_core;
extern crate env_logger;
use std::net::TcpStream;
@@ -9,7 +8,7 @@ use std::io::{Read, Write, BufReader, BufWriter};
use futures::Future;
use futures::stream::Stream;
-use futures_io::copy;
+use tokio_core::io::copy;
macro_rules! t {
($e:expr) => (match $e {
@@ -23,7 +22,7 @@ fn echo_server() {
const N: usize = 1024;
drop(env_logger::init());
- let mut l = t!(futures_mio::Loop::new());
+ let mut l = t!(tokio_core::Loop::new());
let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap());
let srv = t!(l.run(srv));
let addr = t!(srv.local_addr());
diff --git a/tests/chain.rs b/tests/chain.rs
index 410b9a0e..daeb52ab 100644
--- a/tests/chain.rs
+++ b/tests/chain.rs
@@ -1,6 +1,5 @@
extern crate futures;
-extern crate futures_io;
-extern crate futures_mio;
+extern crate tokio_core;
use std::net::TcpStream;
use std::thread;
@@ -8,7 +7,7 @@ use std::io::{Write, Read};
use futures::Future;
use futures::stream::Stream;
-use futures_io::read_to_end;
+use tokio_core::io::read_to_end;
macro_rules! t {
($e:expr) => (match $e {
@@ -19,7 +18,7 @@ macro_rules! t {
#[test]
fn chain_clients() {
- let mut l = t!(futures_mio::Loop::new());
+ let mut l = t!(tokio_core::Loop::new());
let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap());
let srv = t!(l.run(srv));
let addr = t!(srv.local_addr());
diff --git a/tests/echo.rs b/tests/echo.rs
index 77635539..a104beef 100644
--- a/tests/echo.rs
+++ b/tests/echo.rs
@@ -1,7 +1,6 @@
extern crate env_logger;
extern crate futures;
-extern crate futures_io;
-extern crate futures_mio;
+extern crate tokio_core;
use std::io::{Read, Write};
use std::net::TcpStream;
@@ -9,7 +8,7 @@ use std::thread;
use futures::Future;
use futures::stream::Stream;
-use futures_io::{copy, TaskIo};
+use tokio_core::io::{copy, TaskIo};
macro_rules! t {
($e:expr) => (match $e {
@@ -22,7 +21,7 @@ macro_rules! t {
fn echo_server() {
drop(env_logger::init());
- let mut l = t!(futures_mio::Loop::new());
+ let mut l = t!(tokio_core::Loop::new());
let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap());
let srv = t!(l.run(srv));
let addr = t!(srv.local_addr());
diff --git a/tests/limit.rs b/tests/limit.rs
index 620c495e..d7caaad0 100644
--- a/tests/limit.rs
+++ b/tests/limit.rs
@@ -1,6 +1,5 @@
extern crate futures;
-extern crate futures_io;
-extern crate futures_mio;
+extern crate tokio_core;
use std::net::TcpStream;
use std::thread;
@@ -8,7 +7,7 @@ use std::io::{Write, Read};
use futures::Future;
use futures::stream::Stream;
-use futures_io::read_to_end;
+use tokio_core::io::read_to_end;
macro_rules! t {
($e:expr) => (match $e {
@@ -19,7 +18,7 @@ macro_rules! t {
#[test]
fn limit() {
- let mut l = t!(futures_mio::Loop::new());
+ let mut l = t!(tokio_core::Loop::new());
let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap());
let srv = t!(l.run(srv));
let addr = t!(srv.local_addr());
diff --git a/tests/stream-buffered.rs b/tests/stream-buffered.rs
index a472a3cc..8474619e 100644
--- a/tests/stream-buffered.rs
+++ b/tests/stream-buffered.rs
@@ -1,6 +1,5 @@
extern crate futures;
-extern crate futures_io;
-extern crate futures_mio;
+extern crate tokio_core;
extern crate env_logger;
use std::io::{Read, Write};
@@ -9,7 +8,7 @@ use std::thread;
use futures::Future;
use futures::stream::Stream;
-use futures_io::{copy, TaskIo};
+use tokio_core::io::{copy, TaskIo};
macro_rules! t {
($e:expr) => (match $e {
@@ -22,7 +21,7 @@ macro_rules! t {
fn echo_server() {
drop(env_logger::init());
- let mut l = t!(futures_mio::Loop::new());
+ let mut l = t!(tokio_core::Loop::new());
let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap());
let srv = t!(l.run(srv));
let addr = t!(srv.local_addr());
diff --git a/tests/tcp.rs b/tests/tcp.rs
index 5fb38b1b..5384d8b1 100644
--- a/tests/tcp.rs
+++ b/tests/tcp.rs
@@ -1,6 +1,6 @@
extern crate env_logger;
extern crate futures;
-extern crate futures_mio;
+extern crate tokio_core;
use std::net::{TcpListener, TcpStream};
use std::sync::mpsc::channel;
@@ -19,7 +19,7 @@ macro_rules! t {
#[test]
fn connect() {
drop(env_logger::init());
- let mut l = t!(futures_mio::Loop::new());
+ let mut l = t!(tokio_core::Loop::new());
let srv = t!(TcpListener::bind("127.0.0.1:0"));
let addr = t!(srv.local_addr());
let t = thread::spawn(move || {
@@ -37,7 +37,7 @@ fn connect() {
#[test]
fn accept() {
drop(env_logger::init());
- let mut l = t!(futures_mio::Loop::new());
+ let mut l = t!(tokio_core::Loop::new());
let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap());
let srv = t!(l.run(srv));
let addr = t!(srv.local_addr());
@@ -63,7 +63,7 @@ fn accept() {
#[test]
fn accept2() {
drop(env_logger::init());
- let mut l = t!(futures_mio::Loop::new());
+ let mut l = t!(tokio_core::Loop::new());
let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap());
let srv = t!(l.run(srv));
let addr = t!(srv.local_addr());
diff --git a/tests/timeout.rs b/tests/timeout.rs
index 7676a50c..a00b64dc 100644
--- a/tests/timeout.rs
+++ b/tests/timeout.rs
@@ -1,6 +1,6 @@
extern crate env_logger;
extern crate futures;
-extern crate futures_mio;
+extern crate tokio_core;
use std::time::{Instant, Duration};
@@ -16,7 +16,7 @@ macro_rules! t {
#[test]
fn smoke() {
drop(env_logger::init());
- let mut l = t!(futures_mio::Loop::new());
+ let mut l = t!(tokio_core::Loop::new());
let dur = Duration::from_millis(10);
let timeout = l.handle().timeout(dur).and_then(|t| t);
let start = Instant::now();
diff --git a/tests/udp.rs b/tests/udp.rs
index 8f1ba5a9..1bfe512a 100644
--- a/tests/udp.rs
+++ b/tests/udp.rs
@@ -1,11 +1,11 @@
extern crate futures;
-extern crate futures_mio;
+extern crate tokio_core;
use std::io;
use std::net::SocketAddr;
use futures::{Future, Poll};
-use futures_mio::UdpSocket;
+use tokio_core::UdpSocket;
macro_rules! t {
($e:expr) => (match $e {
@@ -16,7 +16,7 @@ macro_rules! t {
#[test]
fn send_messages() {
- let mut l = t!(futures_mio::Loop::new());
+ let mut l = t!(tokio_core::Loop::new());
let a = l.handle().udp_bind(&"127.0.0.1:0".parse().unwrap());
let b = l.handle().udp_bind(&"127.0.0.1:0".parse().unwrap());
let (a, b) = t!(l.run(a.join(b)));