summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.travis.yml3
-rw-r--r--Cargo.toml11
-rw-r--r--futures2/Cargo.toml11
-rw-r--r--futures2/src/lib.rs2
-rw-r--r--src/executor/current_thread/mod.rs17
-rw-r--r--src/executor/mod.rs1
-rw-r--r--src/lib.rs19
-rw-r--r--src/net/tcp/incoming.rs15
-rw-r--r--src/net/tcp/listener.rs38
-rw-r--r--src/net/tcp/stream.rs111
-rw-r--r--src/runtime.rs50
-rwxr-xr-x[-rw-r--r--]tests/current_thread.rs2
-rw-r--r--tokio-executor/Cargo.toml5
-rw-r--r--tokio-executor/src/enter.rs9
-rw-r--r--tokio-executor/src/global.rs29
-rw-r--r--tokio-executor/src/lib.rs13
-rw-r--r--tokio-io/Cargo.toml5
-rw-r--r--tokio-reactor/Cargo.toml5
-rw-r--r--tokio-reactor/src/atomic_task.rs11
-rw-r--r--tokio-reactor/src/background.rs7
-rw-r--r--tokio-reactor/src/lib.rs41
-rw-r--r--tokio-reactor/src/poll_evented.rs215
-rw-r--r--tokio-reactor/src/registration.rs56
-rw-r--r--tokio-threadpool/Cargo.toml5
-rw-r--r--tokio-threadpool/src/lib.rs182
-rw-r--r--tokio-threadpool/src/task.rs77
-rw-r--r--tokio-threadpool/tests/threadpool.rs210
27 files changed, 1045 insertions, 105 deletions
diff --git a/.travis.yml b/.travis.yml
index 00d28a7c..225fbb5f 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -26,6 +26,9 @@ script:
cargo check --tests --all --target $TARGET
else
cargo test --all
+ cargo test --features unstable-futures
+ cargo test --manifest-path tokio-threadpool/Cargo.toml --features unstable-futures
+ cargo test --manifest-path tokio-reactor/Cargo.toml --features unstable-futures
fi
deploy:
diff --git a/Cargo.toml b/Cargo.toml
index 6ff60816..7af8cc43 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,6 +27,7 @@ members = [
"tokio-io",
"tokio-reactor",
"tokio-threadpool",
+ "futures2",
]
[badges]
@@ -44,6 +45,7 @@ mio = "0.6.14"
slab = "0.4"
iovec = "0.1"
futures = "0.1.18"
+futures2 = { version = "0.1", path = "futures2", optional = true }
[dev-dependencies]
env_logger = { version = "0.4", default-features = false }
@@ -60,3 +62,12 @@ time = "0.1"
[patch.crates-io]
tokio-io = { path = "tokio-io" }
+
+[features]
+unstable-futures = [
+ "futures2",
+ "tokio-reactor/unstable-futures",
+ "tokio-threadpool/unstable-futures",
+ "tokio-executor/unstable-futures"
+]
+default = []
diff --git a/futures2/Cargo.toml b/futures2/Cargo.toml
new file mode 100644
index 00000000..a78a42d8
--- /dev/null
+++ b/futures2/Cargo.toml
@@ -0,0 +1,11 @@
+[package]
+name = "futures2"
+
+version = "0.1.0"
+authors = ["Aaron Turon <aturon@mozilla.com>"]
+license = "MIT/Apache-2.0"
+repository = "https://github.com/tokio-rs/tokio"
+homepage = "https://tokio.rs"
+
+[dependencies]
+futures = "0.2.0-alpha"
diff --git a/futures2/src/lib.rs b/futures2/src/lib.rs
new file mode 100644
index 00000000..af0c9dc4
--- /dev/null
+++ b/futures2/src/lib.rs
@@ -0,0 +1,2 @@
+extern crate futures;
+pub use futures::*;
diff --git a/src/executor/current_thread/mod.rs b/src/executor/current_thread/mod.rs
index a5035d64..d4d2ed29 100644
--- a/src/executor/current_thread/mod.rs
+++ b/src/executor/current_thread/mod.rs
@@ -119,6 +119,9 @@ use std::marker::PhantomData;
use std::rc::Rc;
use std::time::{Duration, Instant};
+#[cfg(feature = "unstable-futures")]
+use futures2;
+
/// Executes tasks on the current thread
pub struct CurrentThread<P: Park = ParkThread> {
/// Execute futures and receive unpark notifications.
@@ -386,6 +389,13 @@ impl tokio_executor::Executor for CurrentThread {
self.borrow().spawn_local(future);
Ok(())
}
+
+ #[cfg(feature = "unstable-futures")]
+ fn spawn2(&mut self, _future: Box<futures2::Future<Item = (), Error = futures2::Never> + Send>)
+ -> Result<(), futures2::executor::SpawnError>
+ {
+ panic!("Futures 0.2 integration is not available for current_thread");
+ }
}
impl<P: Park> fmt::Debug for CurrentThread<P> {
@@ -591,6 +601,13 @@ impl tokio_executor::Executor for TaskExecutor {
self.spawn_local(future)
}
+ #[cfg(feature = "unstable-futures")]
+ fn spawn2(&mut self, _future: Box<futures2::Future<Item = (), Error = futures2::Never> + Send>)
+ -> Result<(), futures2::executor::SpawnError>
+ {
+ panic!("Futures 0.2 integration is not available for current_thread");
+ }
+
fn status(&self) -> Result<(), SpawnError> {
CURRENT.with(|current| {
if current.spawn.get().is_some() {
diff --git a/src/executor/mod.rs b/src/executor/mod.rs
index 465439bb..c6955803 100644
--- a/src/executor/mod.rs
+++ b/src/executor/mod.rs
@@ -49,7 +49,6 @@
//! [`Executor`]: #
//! [`spawn`]: #
-
pub mod current_thread;
pub mod thread_pool {
diff --git a/src/lib.rs b/src/lib.rs
index 963e71c5..6bbf2416 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -79,6 +79,9 @@ extern crate tokio_threadpool;
#[macro_use]
extern crate log;
+#[cfg(feature = "unstable-futures")]
+extern crate futures2;
+
pub mod executor;
pub mod net;
pub mod reactor;
@@ -187,3 +190,19 @@ pub mod prelude {
task,
};
}
+
+#[cfg(feature = "unstable-futures")]
+fn lift_async<T>(old: futures::Async<T>) -> futures2::Async<T> {
+ match old {
+ futures::Async::Ready(x) => futures2::Async::Ready(x),
+ futures::Async::NotReady => futures2::Async::Pending,
+ }
+}
+
+#[cfg(feature = "unstable-futures")]
+fn lower_async<T>(new: futures2::Async<T>) -> futures::Async<T> {
+ match new {
+ futures2::Async::Ready(x) => futures::Async::Ready(x),
+ futures2::Async::Pending => futures::Async::NotReady,
+ }
+}
diff --git a/src/net/tcp/incoming.rs b/src/net/tcp/incoming.rs
index 0e5e5bb8..591acc20 100644
--- a/src/net/tcp/incoming.rs
+++ b/src/net/tcp/incoming.rs
@@ -5,6 +5,9 @@ use std::io;
use futures::stream::Stream;
use futures::{Poll, Async};
+#[cfg(feature = "unstable-futures")]
+use futures2;
+
/// Stream returned by the `TcpListener::incoming` function representing the
/// stream of sockets received from a listener.
#[must_use = "streams do nothing unless polled"]
@@ -28,3 +31,15 @@ impl Stream for Incoming {
Ok(Async::Ready(Some(socket)))
}
}
+
+#[cfg(feature = "unstable-futures")]
+impl futures2::Stream for Incoming {
+ type Item = TcpStream;
+ type Error = io::Error;
+
+ fn poll_next(&mut self, cx: &mut futures2::task::Context)
+ -> futures2::Poll<Option<Self::Item>, io::Error>
+ {
+ Ok(self.inner.poll_accept2(cx)?.map(|(sock, _)| Some(sock)))
+ }
+}
diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs
index de9e38ea..bc9a736a 100644
--- a/src/net/tcp/listener.rs
+++ b/src/net/tcp/listener.rs
@@ -10,6 +10,9 @@ use mio;
use reactor::{Handle, PollEvented2};
+#[cfg(feature = "unstable-futures")]
+use futures2;
+
/// An I/O object representing a TCP socket listening for incoming connections.
///
/// This object can be converted into a stream of incoming connections for
@@ -64,6 +67,22 @@ impl TcpListener {
Ok((io, addr).into())
}
+ /// Like `poll_accept`, but for futures 0.2
+ #[cfg(feature = "unstable-futures")]
+ pub fn poll_accept2(&mut self, cx: &mut futures2::task::Context)
+ -> futures2::Poll<(TcpStream, SocketAddr), io::Error>
+ {
+ let (io, addr) = match self.poll_accept_std2(cx)? {
+ futures2::Async::Ready(x) => x,
+ futures2::Async::Pending => return Ok(futures2::Async::Pending),
+ };
+
+ let io = mio::net::TcpStream::from_stream(io)?;
+ let io = TcpStream::new(io);
+
+ Ok((io, addr).into())
+ }
+
#[deprecated(since = "0.1.2", note = "use poll_accept_std instead")]
#[doc(hidden)]
pub fn accept_std(&mut self) -> io::Result<(net::TcpStream, SocketAddr)> {
@@ -105,6 +124,25 @@ impl TcpListener {
}
}
+ /// Like `poll_accept_std`, but for futures 0.2.
+ #[cfg(feature = "unstable-futures")]
+ pub fn poll_accept_std2(&mut self, cx: &mut futures2::task::Context)
+ -> futures2::Poll<(net::TcpStream, SocketAddr), io::Error>
+ {
+ if let futures2::Async::Pending = self.io.poll_read_ready2(cx, mio::Ready::readable())? {
+ return Ok(futures2::Async::Pending);
+ }
+
+ match self.io.get_ref().accept_std() {
+ Ok(pair) => Ok(pair.into()),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_read_ready2(cx, mio::Ready::readable())?;
+ Ok(futures2::Async::Pending)
+ }
+ Err(e) => Err(e),
+ }
+ }
+
/// Create a new TCP listener from the standard library's TCP listener.
///
/// This method can be used when the `Handle::tcp_listen` method isn't
diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs
index d37a1a43..602bd4bb 100644
--- a/src/net/tcp/stream.rs
+++ b/src/net/tcp/stream.rs
@@ -12,6 +12,9 @@ use tokio_io::{AsyncRead, AsyncWrite};
use reactor::{Handle, PollEvented2};
+#[cfg(feature = "unstable-futures")]
+use futures2;
+
/// An I/O object representing a TCP stream connected to a remote endpoint.
///
/// A TCP stream can either be created by connecting to an endpoint, via the
@@ -208,6 +211,25 @@ impl TcpStream {
}
}
+ /// Like `poll_peek` but compatible with futures 0.2
+ #[cfg(feature = "unstable-futures")]
+ pub fn poll_peek2(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8])
+ -> futures2::Poll<usize, io::Error>
+ {
+ if let futures2::Async::Pending = self.io.poll_read_ready2(cx, mio::Ready::readable())? {
+ return Ok(futures2::Async::Pending);
+ }
+
+ match self.io.get_ref().peek(buf) {
+ Ok(ret) => Ok(ret.into()),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_read_ready2(cx, mio::Ready::readable())?;
+ Ok(futures2::Async::Pending)
+ }
+ Err(e) => Err(e),
+ }
+ }
+
/// Shuts down the read, write, or both halves of this connection.
///
/// This function will cause all pending and future I/O on the specified
@@ -367,6 +389,15 @@ impl AsyncRead for TcpStream {
}
}
+#[cfg(feature = "unstable-futures")]
+impl futures2::io::AsyncRead for TcpStream {
+ fn poll_read(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8])
+ -> futures2::Poll<usize, io::Error>
+ {
+ futures2::io::AsyncRead::poll_read(&mut self.io, cx, buf)
+ }
+}
+
impl AsyncWrite for TcpStream {
fn shutdown(&mut self) -> Poll<(), io::Error> {
<&TcpStream>::shutdown(&mut &*self)
@@ -377,6 +408,23 @@ impl AsyncWrite for TcpStream {
}
}
+#[cfg(feature = "unstable-futures")]
+impl futures2::io::AsyncWrite for TcpStream {
+ fn poll_write(&mut self, cx: &mut futures2::task::Context, buf: &[u8])
+ -> futures2::Poll<usize, io::Error>
+ {
+ futures2::io::AsyncWrite::poll_write(&mut self.io, cx, buf)
+ }
+
+ fn poll_flush(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> {
+ futures2::io::AsyncWrite::poll_flush(&mut self.io, cx)
+ }
+
+ fn poll_close(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> {
+ futures2::io::AsyncWrite::poll_close(&mut self.io, cx)
+ }
+}
+
// ===== impl Read / Write for &'a =====
impl<'a> Read for &'a TcpStream {
@@ -449,6 +497,15 @@ impl<'a> AsyncRead for &'a TcpStream {
}
}
+#[cfg(feature = "unstable-futures")]
+impl<'a> futures2::io::AsyncRead for &'a TcpStream {
+ fn poll_read(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8])
+ -> futures2::Poll<usize, io::Error>
+ {
+ futures2::io::AsyncRead::poll_read(&mut &self.io, cx, buf)
+ }
+}
+
impl<'a> AsyncWrite for &'a TcpStream {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
@@ -483,13 +540,29 @@ impl<'a> AsyncWrite for &'a TcpStream {
}
}
+#[cfg(feature = "unstable-futures")]
+impl<'a> futures2::io::AsyncWrite for &'a TcpStream {
+ fn poll_write(&mut self, cx: &mut futures2::task::Context, buf: &[u8])
+ -> futures2::Poll<usize, io::Error>
+ {
+ futures2::io::AsyncWrite::poll_write(&mut &self.io, cx, buf)
+ }
+
+ fn poll_flush(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> {
+ futures2::io::AsyncWrite::poll_flush(&mut &self.io, cx)
+ }
+
+ fn poll_close(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> {
+ futures2::io::AsyncWrite::poll_close(&mut &self.io, cx)
+ }
+}
+
impl fmt::Debug for TcpStream {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.io.get_ref().fmt(f)
}
}
-
impl Future for ConnectFuture {
type Item = TcpStream;
type Error = io::Error;
@@ -499,11 +572,20 @@ impl Future for ConnectFuture {
}
}
-impl Future for ConnectFutureState {
+#[cfg(feature = "unstable-futures")]
+impl futures2::Future for ConnectFuture {
type Item = TcpStream;
type Error = io::Error;
- fn poll(&mut self) -> Poll<TcpStream, io::Error> {
+ fn poll(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<TcpStream, io::Error> {
+ futures2::Future::poll(&mut self.inner, cx)
+ }
+}
+
+impl ConnectFutureState {
+ fn poll_inner<F>(&mut self, f: F) -> Poll<TcpStream, io::Error>
+ where F: FnOnce(&mut PollEvented2<mio::net::TcpStream>) -> Poll<mio::Ready, io::Error>
+ {
{
let stream = match *self {
ConnectFutureState::Waiting(ref mut s) => s,
@@ -523,7 +605,7 @@ impl Future for ConnectFutureState {
// actually hit an error or not.
//
// If all that succeeded then we ship everything on up.
- if let Async::NotReady = stream.io.poll_write_ready()? {
+ if let Async::NotReady = f(&mut stream.io)? {
return Ok(Async::NotReady)
}
@@ -531,6 +613,7 @@ impl Future for ConnectFutureState {
return Err(e)
}
}
+
match mem::replace(self, ConnectFutureState::Empty) {
ConnectFutureState::Waiting(stream) => Ok(Async::Ready(stream)),
_ => panic!(),
@@ -538,6 +621,26 @@ impl Future for ConnectFutureState {
}
}
+impl Future for ConnectFutureState {
+ type Item = TcpStream;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<TcpStream, io::Error> {
+ self.poll_inner(|io| io.poll_write_ready())
+ }
+}
+
+#[cfg(feature = "unstable-futures")]
+impl futures2::Future for ConnectFutureState {
+ type Item = TcpStream;
+ type Error = io::Error;
+
+ fn poll(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<TcpStream, io::Error> {
+ self.poll_inner(|io| io.poll_write_ready2(cx).map(::lower_async))
+ .map(::lift_async)
+ }
+}
+
#[cfg(all(unix, not(target_os = "fuchsia")))]
mod sys {
use std::os::unix::prelude::*;
diff --git a/src/runtime.rs b/src/runtime.rs
index 3fb3c9aa..c277bb78 100644
--- a/src/runtime.rs
+++ b/src/runtime.rs
@@ -112,6 +112,9 @@ use futures::future::{self, Future};
use std::{fmt, io};
+#[cfg(feature = "unstable-futures")]
+use futures2;
+
/// Handle to the Tokio runtime.
///
/// The Tokio runtime includes a reactor as well as an executor for running
@@ -205,6 +208,18 @@ where F: Future<Item = (), Error = ()> + Send + 'static,
runtime.shutdown_on_idle().wait().unwrap();
}
+/// Start the Tokio runtime using the supplied future to bootstrap execution.
+///
+/// Identical to `run` but works with futures 0.2-style futures.
+#[cfg(feature = "unstable-futures")]
+pub fn run2<F>(future: F)
+ where F: futures2::Future<Item = (), Error = futures2::Never> + Send + 'static,
+{
+ let mut runtime = Runtime::new().unwrap();
+ runtime.spawn2(future);
+ runtime.shutdown_on_idle().wait().unwrap();
+}
+
impl Runtime {
/// Create a new runtime instance with default configuration values.
///
@@ -287,6 +302,19 @@ impl Runtime {
self
}
+ /// Spawn a futures 0.2-style future onto the Tokio runtime.
+ ///
+ /// Otherwise identical to `spawn`
+ #[cfg(feature = "unstable-futures")]
+ pub fn spawn2<F>(&mut self, future: F) -> &mut Self
+ where F: futures2::Future<Item = (), Error = futures2::Never> + Send + 'static,
+ {
+ futures2::executor::Executor::spawn(
+ self.inner_mut().pool.sender_mut(), Box::new(future)
+ ).unwrap();
+ self
+ }
+
/// Signals the runtime to shutdown once it becomes idle.
///
/// Returns a future that completes once the shutdown operation has
@@ -420,8 +448,30 @@ impl ::executor::Executor for TaskExecutor {
{
self.inner.spawn(future)
}
+
+ #[cfg(feature = "unstable-futures")]
+ fn spawn2(&mut self, future: Box<futures2::Future<Item = (), Error = futures2::Never> + Send>)
+ -> Result<(), futures2::executor::SpawnError>
+ {
+ self.inner.spawn2(future)
+ }
}
+#[cfg(feature = "unstable-futures")]
+type Task2 = Box<futures2::Future<Item = (), Error = futures2::Never> + Send>;
+
+#[cfg(feature = "unstable-futures")]
+impl futures2::executor::Executor for TaskExecutor {
+ fn spawn(&mut self, f: Task2) -> Result<(), futures2::executor::SpawnError> {
+ futures2::executor::Executor::spawn(&mut self.inner, f)
+ }
+
+ fn status(&self) -> Result<(), futures2::executor::SpawnError> {
+ futures2::executor::Executor::status(&self.inner)
+ }
+}
+
+
// ===== impl Shutdown =====
impl Shutdown {
diff --git a/tests/current_thread.rs b/tests/current_thread.rs
index dfc80334..1124f727 100644..100755
--- a/tests/current_thread.rs
+++ b/tests/current_thread.rs
@@ -1,3 +1,5 @@
+#![cfg(not(feature = "unstable-futures"))]
+
extern crate tokio;
extern crate tokio_executor;
extern crate futures;
diff --git a/tokio-executor/Cargo.toml b/tokio-executor/Cargo.toml
index 8d65dcb2..a3ba1b3b 100644
--- a/tokio-executor/Cargo.toml
+++ b/tokio-executor/Cargo.toml
@@ -14,3 +14,8 @@ categories = ["concurrency", "asynchronous"]
[dependencies]
futures = "0.1.18"
+futures2 = { version = "0.1", path = "../futures2", optional = true }
+
+[features]
+unstable-futures = ["futures2"]
+default = []
diff --git a/tokio-executor/src/enter.rs b/tokio-executor/src/enter.rs
index 3c9f3cfa..39a445ad 100644
--- a/tokio-executor/src/enter.rs
+++ b/tokio-executor/src/enter.rs
@@ -2,6 +2,9 @@ use std::prelude::v1::*;
use std::cell::Cell;
use std::fmt;
+#[cfg(feature = "unstable-futures")]
+use futures2;
+
thread_local!(static ENTERED: Cell<bool> = Cell::new(false));
/// Represents an executor context.
@@ -10,6 +13,9 @@ thread_local!(static ENTERED: Cell<bool> = Cell::new(false));
pub struct Enter {
on_exit: Vec<Box<Callback>>,
permanent: bool,
+
+ #[cfg(feature = "unstable-futures")]
+ _enter2: futures2::executor::Enter,
}
/// An error returned by `enter` if an execution scope has already been
@@ -40,6 +46,9 @@ pub fn enter() -> Result<Enter, EnterError> {
Ok(Enter {
on_exit: Vec::new(),
permanent: false,
+
+ #[cfg(feature = "unstable-futures")]
+ _enter2: futures2::executor::enter().unwrap(),
})
}
})
diff --git a/tokio-executor/src/global.rs b/tokio-executor/src/global.rs
index 239af6eb..ab4d526d 100644
--- a/tokio-executor/src/global.rs
+++ b/tokio-executor/src/global.rs
@@ -6,6 +6,9 @@ use std::cell::Cell;
use std::marker::PhantomData;
use std::rc::Rc;
+#[cfg(feature = "unstable-futures")]
+use futures2;
+
/// Executes futures on the default executor for the current execution context.
///
/// `DefaultExecutor` implements `Executor` and can be used to spawn futures
@@ -59,6 +62,23 @@ impl super::Executor for DefaultExecutor {
}
})
}
+
+ #[cfg(feature = "unstable-futures")]
+ fn spawn2(&mut self, future: Box<futures2::Future<Item = (), Error = futures2::Never> + Send>)
+ -> Result<(), futures2::executor::SpawnError>
+ {
+ EXECUTOR.with(|current_executor| {
+ match current_executor.get() {
+ Some(executor) => {
+ let executor = unsafe { &mut *executor };
+ executor.spawn2(future)
+ }
+ None => {
+ Err(futures2::executor::SpawnError::shutdown())
+ }
+ }
+ })
+ }
}
// ===== global spawn fns =====
@@ -109,6 +129,15 @@ pub fn spawn<T>(future: T)
.unwrap()
}
+/// Like `spawn` but compatible with futures 0.2
+#[cfg(feature = "unstable-futures")]
+pub fn spawn2<T>(future: T)
+ where T: futures2::Future<Item = (), Error = futures2::Never> + Send + 'static,
+{
+ DefaultExecutor::current().spawn2(Box::new(future))
+ .unwrap()
+}
+
/// Set the default executor for the duration of the closure
///
/// # Panics
diff --git a/tokio-executor/src/lib.rs b/tokio-executor/src/lib.rs
index 7aab4d86..690c1e48 100644
--- a/tokio-executor/src/lib.rs
+++ b/tokio-executor/src/lib.rs
@@ -35,6 +35,9 @@
extern crate futures;
+#[cfg(feature = "unstable-futures")]
+extern crate futures2;
+
mod enter;
mod global;
pub mod park;
@@ -42,6 +45,9 @@ pub mod park;
pub use enter::{enter, Enter, EnterError};
pub use global::{spawn, with_default, DefaultExecutor};
+#[cfg(feature = "unstable-futures")]
+pub use global::spawn2;
+
use futures::Future;
/// A value that executes futures.
@@ -129,7 +135,12 @@ pub trait Executor {
/// # fn main() {}
/// ```
fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>)
- -> Result<(), SpawnError>;
+ -> Result<(), SpawnError>;
+
+ /// Like `spawn`, but compatible with futures 0.2
+ #[cfg(feature = "unstable-futures")]
+ fn spawn2(&mut self, future: Box<futures2::Future<Item = (), Error = futures2::Never> + Send>)
+ -> Result<(), futures2::executor::SpawnError>;
/// Provides a best effort **hint** to whether or not `spawn` will succeed.
///
diff --git a/tokio-io/Cargo.toml b/tokio-io/Cargo.toml
index 95a9db3e..bf823257 100644
--- a/tokio-io/Cargo.toml
+++ b/tokio-io/Cargo.toml
@@ -20,3 +20,8 @@ categories = ["asynchronous"]
bytes = "0.4"
futures = "0.1.18"
log = "0.4"
+futures2 = { version = "0.1", path = "../futures2", optional = true }
+
+[features]
+unstable-futures = ["futures2"]
+default = []
diff --git a/tokio-reactor/Cargo.toml b/tokio-reactor/Cargo.toml
index 453b774e..0e2ab916 100644
--- a/tokio-reactor/Cargo.toml
+++ b/tokio-reactor/Cargo.toml
@@ -24,3 +24,8 @@ mio = "0.6.14"
slab = "0.4.0"
tokio-executor = { version = "0.1.0", path = "../tokio-executor" }
tokio-io = { version = "0.1.6", path = "../tokio-io" }
+futures2 = { version = "0.1", path = "../futures2", optional = true }
+
+[features]
+unstable-futures = ["futures2"]
+default = []
diff --git a/tokio-reactor/src/atomic_task.rs b/tokio-reactor/src/atomic_task.rs
index 9b4ba00c..6a4788e6 100644
--- a/tokio-reactor/src/atomic_task.rs
+++ b/tokio-reactor/src/atomic_task.rs
@@ -1,10 +1,10 @@
-use futures::task::{self, Task};
-
use std::fmt;
use std::cell::UnsafeCell;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Acquire, Release};
+use Task;
+
/// A synchronization primitive for task notification.
///
/// `AtomicTask` will coordinate concurrent notifications with the consumer
@@ -69,11 +69,6 @@ impl AtomicTask {
}
}
- /// Registers the **current** task to be notified on calls to `notify`.
- pub fn register(&self) {
- self.register_task(task::current());
- }
-
/// Registers the task to be notified on calls to `notify`.
///
/// The new task will take place of any previous tasks that were registered
@@ -89,7 +84,7 @@ impl AtomicTask {
/// idea. Concurrent calls to `register` will attempt to register different
/// tasks to be notified. One of the callers will win and have its task set,
/// but there is no guarantee as to which caller will succeed.
- pub fn register_task(&self, task: Task) {
+ pub(crate) fn register(&self, task: Task) {
match self.state.compare_and_swap(WAITING, LOCKED_WRITE, Acquire) {
WAITING => {
unsafe {
diff --git a/tokio-reactor/src/background.rs b/tokio-reactor/src/background.rs
index 1f07c7b8..03f057cb 100644
--- a/tokio-reactor/src/background.rs
+++ b/tokio-reactor/src/background.rs
@@ -1,7 +1,7 @@
-use {Reactor, Handle};
+use {Reactor, Handle, Task};
use atomic_task::AtomicTask;
-use futures::{Future, Async, Poll};
+use futures::{Future, Async, Poll, task};
use std::io;
use std::thread;
@@ -136,7 +136,8 @@ impl Future for Shutdown {
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
- self.inner.shared.shutdown_task.register();
+ let task = Task::Futures1(task::current());
+ self.inner.shared.shutdown_task.register(task);
if !self.inner.is_shutdown() {
return Ok(Async::NotReady);
diff --git a/tokio-reactor/src/lib.rs b/tokio-reactor/src/lib.rs
index ae105824..a0ce1788 100644
--- a/tokio-reactor/src/lib.rs
+++ b/tokio-reactor/src/lib.rs
@@ -39,6 +39,9 @@ extern crate slab;
extern crate tokio_executor;
extern crate tokio_io;
+#[cfg(feature = "unstable-futures")]
+extern crate futures2;
+
pub(crate) mod background;
mod atomic_task;
mod poll_evented;
@@ -69,7 +72,6 @@ use std::time::{Duration, Instant};
use log::Level;
use mio::event::Evented;
use slab::Slab;
-use futures::task::Task;
/// The core reactor, or event loop.
///
@@ -155,6 +157,14 @@ fn _assert_kinds() {
_assert::<Handle>();
}
+/// A wakeup handle for a task, which may be either a futures 0.1 or 0.2 task
+#[derive(Debug, Clone)]
+pub(crate) enum Task {
+ Futures1(futures::task::Task),
+ #[cfg(feature = "unstable-futures")]
+ Futures2(futures2::task::Waker),
+}
+
// ===== impl Reactor =====
/// Set the default reactor for the duration of the closure
@@ -578,7 +588,7 @@ impl Inner {
Direction::Write => (&sched.writer, mio::Ready::writable()),
};
- task.register_task(t);
+ task.register(t);
if sched.readiness.load(SeqCst) & ready.as_usize() != 0 {
task.notify();
@@ -611,6 +621,17 @@ impl Direction {
}
}
+impl Task {
+ fn notify(&self) {
+ match *self {
+ Task::Futures1(ref task) => task.notify(),
+
+ #[cfg(feature = "unstable-futures")]
+ Task::Futures2(ref waker) => waker.wake(),
+ }
+ }
+}
+
#[cfg(all(unix, not(target_os = "fuchsia")))]
mod platform {
use mio::Ready;
@@ -637,3 +658,19 @@ mod platform {
false
}
}
+
+#[cfg(feature = "unstable-futures")]
+fn lift_async<T>(old: futures::Async<T>) -> futures2::Async<T> {
+ match old {
+ futures::Async::Ready(x) => futures2::Async::Ready(x),
+ futures::Async::NotReady => futures2::Async::Pending,
+ }
+}
+
+#[cfg(feature = "unstable-futures")]
+fn lower_async<T>(new: futures2::Async<T>) -> futures::Async<T> {
+ match new {
+ futures2::Async::Ready(x) => futures::Async::Ready(x),
+ futures2::Async::Pending => futures::Async::NotReady,
+ }
+}
diff --git a/tokio-reactor/src/poll_evented.rs b/tokio-reactor/src/poll_evented.rs
index bddf94c7..c9f5b77d 100644
--- a/tokio-reactor/src/poll_evented.rs
+++ b/tokio-reactor/src/poll_evented.rs
@@ -5,6 +5,9 @@ use mio;
use mio::event::Evented;
use tokio_io::{AsyncRead, AsyncWrite};
+#[cfg(feature = "unstable-futures")]
+use futures2;
+
use std::fmt;
use std::io::{self, Read, Write};
use std::sync::atomic::AtomicUsize;
@@ -99,7 +102,7 @@ struct Inner {
// ===== impl PollEvented =====
macro_rules! poll_ready {
- ($me:expr, $mask:expr, $cache:ident, $poll:ident, $take:ident) => {{
+ ($me:expr, $mask:expr, $cache:ident, $take:ident, $poll:expr) => {{
$me.register()?;
// Load cached & encoded readiness.
@@ -114,7 +117,7 @@ macro_rules! poll_ready {