diff options
27 files changed, 1045 insertions, 105 deletions
diff --git a/.travis.yml b/.travis.yml index 00d28a7c..225fbb5f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,6 +26,9 @@ script: cargo check --tests --all --target $TARGET else cargo test --all + cargo test --features unstable-futures + cargo test --manifest-path tokio-threadpool/Cargo.toml --features unstable-futures + cargo test --manifest-path tokio-reactor/Cargo.toml --features unstable-futures fi deploy: @@ -27,6 +27,7 @@ members = [ "tokio-io", "tokio-reactor", "tokio-threadpool", + "futures2", ] [badges] @@ -44,6 +45,7 @@ mio = "0.6.14" slab = "0.4" iovec = "0.1" futures = "0.1.18" +futures2 = { version = "0.1", path = "futures2", optional = true } [dev-dependencies] env_logger = { version = "0.4", default-features = false } @@ -60,3 +62,12 @@ time = "0.1" [patch.crates-io] tokio-io = { path = "tokio-io" } + +[features] +unstable-futures = [ + "futures2", + "tokio-reactor/unstable-futures", + "tokio-threadpool/unstable-futures", + "tokio-executor/unstable-futures" +] +default = [] diff --git a/futures2/Cargo.toml b/futures2/Cargo.toml new file mode 100644 index 00000000..a78a42d8 --- /dev/null +++ b/futures2/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "futures2" + +version = "0.1.0" +authors = ["Aaron Turon <aturon@mozilla.com>"] +license = "MIT/Apache-2.0" +repository = "https://github.com/tokio-rs/tokio" +homepage = "https://tokio.rs" + +[dependencies] +futures = "0.2.0-alpha" diff --git a/futures2/src/lib.rs b/futures2/src/lib.rs new file mode 100644 index 00000000..af0c9dc4 --- /dev/null +++ b/futures2/src/lib.rs @@ -0,0 +1,2 @@ +extern crate futures; +pub use futures::*; diff --git a/src/executor/current_thread/mod.rs b/src/executor/current_thread/mod.rs index a5035d64..d4d2ed29 100644 --- a/src/executor/current_thread/mod.rs +++ b/src/executor/current_thread/mod.rs @@ -119,6 +119,9 @@ use std::marker::PhantomData; use std::rc::Rc; use std::time::{Duration, Instant}; +#[cfg(feature = "unstable-futures")] +use futures2; + /// Executes tasks on the current thread pub struct CurrentThread<P: Park = ParkThread> { /// Execute futures and receive unpark notifications. @@ -386,6 +389,13 @@ impl tokio_executor::Executor for CurrentThread { self.borrow().spawn_local(future); Ok(()) } + + #[cfg(feature = "unstable-futures")] + fn spawn2(&mut self, _future: Box<futures2::Future<Item = (), Error = futures2::Never> + Send>) + -> Result<(), futures2::executor::SpawnError> + { + panic!("Futures 0.2 integration is not available for current_thread"); + } } impl<P: Park> fmt::Debug for CurrentThread<P> { @@ -591,6 +601,13 @@ impl tokio_executor::Executor for TaskExecutor { self.spawn_local(future) } + #[cfg(feature = "unstable-futures")] + fn spawn2(&mut self, _future: Box<futures2::Future<Item = (), Error = futures2::Never> + Send>) + -> Result<(), futures2::executor::SpawnError> + { + panic!("Futures 0.2 integration is not available for current_thread"); + } + fn status(&self) -> Result<(), SpawnError> { CURRENT.with(|current| { if current.spawn.get().is_some() { diff --git a/src/executor/mod.rs b/src/executor/mod.rs index 465439bb..c6955803 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -49,7 +49,6 @@ //! [`Executor`]: # //! [`spawn`]: # - pub mod current_thread; pub mod thread_pool { @@ -79,6 +79,9 @@ extern crate tokio_threadpool; #[macro_use] extern crate log; +#[cfg(feature = "unstable-futures")] +extern crate futures2; + pub mod executor; pub mod net; pub mod reactor; @@ -187,3 +190,19 @@ pub mod prelude { task, }; } + +#[cfg(feature = "unstable-futures")] +fn lift_async<T>(old: futures::Async<T>) -> futures2::Async<T> { + match old { + futures::Async::Ready(x) => futures2::Async::Ready(x), + futures::Async::NotReady => futures2::Async::Pending, + } +} + +#[cfg(feature = "unstable-futures")] +fn lower_async<T>(new: futures2::Async<T>) -> futures::Async<T> { + match new { + futures2::Async::Ready(x) => futures::Async::Ready(x), + futures2::Async::Pending => futures::Async::NotReady, + } +} diff --git a/src/net/tcp/incoming.rs b/src/net/tcp/incoming.rs index 0e5e5bb8..591acc20 100644 --- a/src/net/tcp/incoming.rs +++ b/src/net/tcp/incoming.rs @@ -5,6 +5,9 @@ use std::io; use futures::stream::Stream; use futures::{Poll, Async}; +#[cfg(feature = "unstable-futures")] +use futures2; + /// Stream returned by the `TcpListener::incoming` function representing the /// stream of sockets received from a listener. #[must_use = "streams do nothing unless polled"] @@ -28,3 +31,15 @@ impl Stream for Incoming { Ok(Async::Ready(Some(socket))) } } + +#[cfg(feature = "unstable-futures")] +impl futures2::Stream for Incoming { + type Item = TcpStream; + type Error = io::Error; + + fn poll_next(&mut self, cx: &mut futures2::task::Context) + -> futures2::Poll<Option<Self::Item>, io::Error> + { + Ok(self.inner.poll_accept2(cx)?.map(|(sock, _)| Some(sock))) + } +} diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index de9e38ea..bc9a736a 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -10,6 +10,9 @@ use mio; use reactor::{Handle, PollEvented2}; +#[cfg(feature = "unstable-futures")] +use futures2; + /// An I/O object representing a TCP socket listening for incoming connections. /// /// This object can be converted into a stream of incoming connections for @@ -64,6 +67,22 @@ impl TcpListener { Ok((io, addr).into()) } + /// Like `poll_accept`, but for futures 0.2 + #[cfg(feature = "unstable-futures")] + pub fn poll_accept2(&mut self, cx: &mut futures2::task::Context) + -> futures2::Poll<(TcpStream, SocketAddr), io::Error> + { + let (io, addr) = match self.poll_accept_std2(cx)? { + futures2::Async::Ready(x) => x, + futures2::Async::Pending => return Ok(futures2::Async::Pending), + }; + + let io = mio::net::TcpStream::from_stream(io)?; + let io = TcpStream::new(io); + + Ok((io, addr).into()) + } + #[deprecated(since = "0.1.2", note = "use poll_accept_std instead")] #[doc(hidden)] pub fn accept_std(&mut self) -> io::Result<(net::TcpStream, SocketAddr)> { @@ -105,6 +124,25 @@ impl TcpListener { } } + /// Like `poll_accept_std`, but for futures 0.2. + #[cfg(feature = "unstable-futures")] + pub fn poll_accept_std2(&mut self, cx: &mut futures2::task::Context) + -> futures2::Poll<(net::TcpStream, SocketAddr), io::Error> + { + if let futures2::Async::Pending = self.io.poll_read_ready2(cx, mio::Ready::readable())? { + return Ok(futures2::Async::Pending); + } + + match self.io.get_ref().accept_std() { + Ok(pair) => Ok(pair.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready2(cx, mio::Ready::readable())?; + Ok(futures2::Async::Pending) + } + Err(e) => Err(e), + } + } + /// Create a new TCP listener from the standard library's TCP listener. /// /// This method can be used when the `Handle::tcp_listen` method isn't diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index d37a1a43..602bd4bb 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -12,6 +12,9 @@ use tokio_io::{AsyncRead, AsyncWrite}; use reactor::{Handle, PollEvented2}; +#[cfg(feature = "unstable-futures")] +use futures2; + /// An I/O object representing a TCP stream connected to a remote endpoint. /// /// A TCP stream can either be created by connecting to an endpoint, via the @@ -208,6 +211,25 @@ impl TcpStream { } } + /// Like `poll_peek` but compatible with futures 0.2 + #[cfg(feature = "unstable-futures")] + pub fn poll_peek2(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8]) + -> futures2::Poll<usize, io::Error> + { + if let futures2::Async::Pending = self.io.poll_read_ready2(cx, mio::Ready::readable())? { + return Ok(futures2::Async::Pending); + } + + match self.io.get_ref().peek(buf) { + Ok(ret) => Ok(ret.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready2(cx, mio::Ready::readable())?; + Ok(futures2::Async::Pending) + } + Err(e) => Err(e), + } + } + /// Shuts down the read, write, or both halves of this connection. /// /// This function will cause all pending and future I/O on the specified @@ -367,6 +389,15 @@ impl AsyncRead for TcpStream { } } +#[cfg(feature = "unstable-futures")] +impl futures2::io::AsyncRead for TcpStream { + fn poll_read(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8]) + -> futures2::Poll<usize, io::Error> + { + futures2::io::AsyncRead::poll_read(&mut self.io, cx, buf) + } +} + impl AsyncWrite for TcpStream { fn shutdown(&mut self) -> Poll<(), io::Error> { <&TcpStream>::shutdown(&mut &*self) @@ -377,6 +408,23 @@ impl AsyncWrite for TcpStream { } } +#[cfg(feature = "unstable-futures")] +impl futures2::io::AsyncWrite for TcpStream { + fn poll_write(&mut self, cx: &mut futures2::task::Context, buf: &[u8]) + -> futures2::Poll<usize, io::Error> + { + futures2::io::AsyncWrite::poll_write(&mut self.io, cx, buf) + } + + fn poll_flush(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> { + futures2::io::AsyncWrite::poll_flush(&mut self.io, cx) + } + + fn poll_close(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> { + futures2::io::AsyncWrite::poll_close(&mut self.io, cx) + } +} + // ===== impl Read / Write for &'a ===== impl<'a> Read for &'a TcpStream { @@ -449,6 +497,15 @@ impl<'a> AsyncRead for &'a TcpStream { } } +#[cfg(feature = "unstable-futures")] +impl<'a> futures2::io::AsyncRead for &'a TcpStream { + fn poll_read(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8]) + -> futures2::Poll<usize, io::Error> + { + futures2::io::AsyncRead::poll_read(&mut &self.io, cx, buf) + } +} + impl<'a> AsyncWrite for &'a TcpStream { fn shutdown(&mut self) -> Poll<(), io::Error> { Ok(().into()) @@ -483,13 +540,29 @@ impl<'a> AsyncWrite for &'a TcpStream { } } +#[cfg(feature = "unstable-futures")] +impl<'a> futures2::io::AsyncWrite for &'a TcpStream { + fn poll_write(&mut self, cx: &mut futures2::task::Context, buf: &[u8]) + -> futures2::Poll<usize, io::Error> + { + futures2::io::AsyncWrite::poll_write(&mut &self.io, cx, buf) + } + + fn poll_flush(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> { + futures2::io::AsyncWrite::poll_flush(&mut &self.io, cx) + } + + fn poll_close(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> { + futures2::io::AsyncWrite::poll_close(&mut &self.io, cx) + } +} + impl fmt::Debug for TcpStream { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { self.io.get_ref().fmt(f) } } - impl Future for ConnectFuture { type Item = TcpStream; type Error = io::Error; @@ -499,11 +572,20 @@ impl Future for ConnectFuture { } } -impl Future for ConnectFutureState { +#[cfg(feature = "unstable-futures")] +impl futures2::Future for ConnectFuture { type Item = TcpStream; type Error = io::Error; - fn poll(&mut self) -> Poll<TcpStream, io::Error> { + fn poll(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<TcpStream, io::Error> { + futures2::Future::poll(&mut self.inner, cx) + } +} + +impl ConnectFutureState { + fn poll_inner<F>(&mut self, f: F) -> Poll<TcpStream, io::Error> + where F: FnOnce(&mut PollEvented2<mio::net::TcpStream>) -> Poll<mio::Ready, io::Error> + { { let stream = match *self { ConnectFutureState::Waiting(ref mut s) => s, @@ -523,7 +605,7 @@ impl Future for ConnectFutureState { // actually hit an error or not. // // If all that succeeded then we ship everything on up. - if let Async::NotReady = stream.io.poll_write_ready()? { + if let Async::NotReady = f(&mut stream.io)? { return Ok(Async::NotReady) } @@ -531,6 +613,7 @@ impl Future for ConnectFutureState { return Err(e) } } + match mem::replace(self, ConnectFutureState::Empty) { ConnectFutureState::Waiting(stream) => Ok(Async::Ready(stream)), _ => panic!(), @@ -538,6 +621,26 @@ impl Future for ConnectFutureState { } } +impl Future for ConnectFutureState { + type Item = TcpStream; + type Error = io::Error; + + fn poll(&mut self) -> Poll<TcpStream, io::Error> { + self.poll_inner(|io| io.poll_write_ready()) + } +} + +#[cfg(feature = "unstable-futures")] +impl futures2::Future for ConnectFutureState { + type Item = TcpStream; + type Error = io::Error; + + fn poll(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<TcpStream, io::Error> { + self.poll_inner(|io| io.poll_write_ready2(cx).map(::lower_async)) + .map(::lift_async) + } +} + #[cfg(all(unix, not(target_os = "fuchsia")))] mod sys { use std::os::unix::prelude::*; diff --git a/src/runtime.rs b/src/runtime.rs index 3fb3c9aa..c277bb78 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -112,6 +112,9 @@ use futures::future::{self, Future}; use std::{fmt, io}; +#[cfg(feature = "unstable-futures")] +use futures2; + /// Handle to the Tokio runtime. /// /// The Tokio runtime includes a reactor as well as an executor for running @@ -205,6 +208,18 @@ where F: Future<Item = (), Error = ()> + Send + 'static, runtime.shutdown_on_idle().wait().unwrap(); } +/// Start the Tokio runtime using the supplied future to bootstrap execution. +/// +/// Identical to `run` but works with futures 0.2-style futures. +#[cfg(feature = "unstable-futures")] +pub fn run2<F>(future: F) + where F: futures2::Future<Item = (), Error = futures2::Never> + Send + 'static, +{ + let mut runtime = Runtime::new().unwrap(); + runtime.spawn2(future); + runtime.shutdown_on_idle().wait().unwrap(); +} + impl Runtime { /// Create a new runtime instance with default configuration values. /// @@ -287,6 +302,19 @@ impl Runtime { self } + /// Spawn a futures 0.2-style future onto the Tokio runtime. + /// + /// Otherwise identical to `spawn` + #[cfg(feature = "unstable-futures")] + pub fn spawn2<F>(&mut self, future: F) -> &mut Self + where F: futures2::Future<Item = (), Error = futures2::Never> + Send + 'static, + { + futures2::executor::Executor::spawn( + self.inner_mut().pool.sender_mut(), Box::new(future) + ).unwrap(); + self + } + /// Signals the runtime to shutdown once it becomes idle. /// /// Returns a future that completes once the shutdown operation has @@ -420,8 +448,30 @@ impl ::executor::Executor for TaskExecutor { { self.inner.spawn(future) } + + #[cfg(feature = "unstable-futures")] + fn spawn2(&mut self, future: Box<futures2::Future<Item = (), Error = futures2::Never> + Send>) + -> Result<(), futures2::executor::SpawnError> + { + self.inner.spawn2(future) + } } +#[cfg(feature = "unstable-futures")] +type Task2 = Box<futures2::Future<Item = (), Error = futures2::Never> + Send>; + +#[cfg(feature = "unstable-futures")] +impl futures2::executor::Executor for TaskExecutor { + fn spawn(&mut self, f: Task2) -> Result<(), futures2::executor::SpawnError> { + futures2::executor::Executor::spawn(&mut self.inner, f) + } + + fn status(&self) -> Result<(), futures2::executor::SpawnError> { + futures2::executor::Executor::status(&self.inner) + } +} + + // ===== impl Shutdown ===== impl Shutdown { diff --git a/tests/current_thread.rs b/tests/current_thread.rs index dfc80334..1124f727 100644..100755 --- a/tests/current_thread.rs +++ b/tests/current_thread.rs @@ -1,3 +1,5 @@ +#![cfg(not(feature = "unstable-futures"))] + extern crate tokio; extern crate tokio_executor; extern crate futures; diff --git a/tokio-executor/Cargo.toml b/tokio-executor/Cargo.toml index 8d65dcb2..a3ba1b3b 100644 --- a/tokio-executor/Cargo.toml +++ b/tokio-executor/Cargo.toml @@ -14,3 +14,8 @@ categories = ["concurrency", "asynchronous"] [dependencies] futures = "0.1.18" +futures2 = { version = "0.1", path = "../futures2", optional = true } + +[features] +unstable-futures = ["futures2"] +default = [] diff --git a/tokio-executor/src/enter.rs b/tokio-executor/src/enter.rs index 3c9f3cfa..39a445ad 100644 --- a/tokio-executor/src/enter.rs +++ b/tokio-executor/src/enter.rs @@ -2,6 +2,9 @@ use std::prelude::v1::*; use std::cell::Cell; use std::fmt; +#[cfg(feature = "unstable-futures")] +use futures2; + thread_local!(static ENTERED: Cell<bool> = Cell::new(false)); /// Represents an executor context. @@ -10,6 +13,9 @@ thread_local!(static ENTERED: Cell<bool> = Cell::new(false)); pub struct Enter { on_exit: Vec<Box<Callback>>, permanent: bool, + + #[cfg(feature = "unstable-futures")] + _enter2: futures2::executor::Enter, } /// An error returned by `enter` if an execution scope has already been @@ -40,6 +46,9 @@ pub fn enter() -> Result<Enter, EnterError> { Ok(Enter { on_exit: Vec::new(), permanent: false, + + #[cfg(feature = "unstable-futures")] + _enter2: futures2::executor::enter().unwrap(), }) } }) diff --git a/tokio-executor/src/global.rs b/tokio-executor/src/global.rs index 239af6eb..ab4d526d 100644 --- a/tokio-executor/src/global.rs +++ b/tokio-executor/src/global.rs @@ -6,6 +6,9 @@ use std::cell::Cell; use std::marker::PhantomData; use std::rc::Rc; +#[cfg(feature = "unstable-futures")] +use futures2; + /// Executes futures on the default executor for the current execution context. /// /// `DefaultExecutor` implements `Executor` and can be used to spawn futures @@ -59,6 +62,23 @@ impl super::Executor for DefaultExecutor { } }) } + + #[cfg(feature = "unstable-futures")] + fn spawn2(&mut self, future: Box<futures2::Future<Item = (), Error = futures2::Never> + Send>) + -> Result<(), futures2::executor::SpawnError> + { + EXECUTOR.with(|current_executor| { + match current_executor.get() { + Some(executor) => { + let executor = unsafe { &mut *executor }; + executor.spawn2(future) + } + None => { + Err(futures2::executor::SpawnError::shutdown()) + } + } + }) + } } // ===== global spawn fns ===== @@ -109,6 +129,15 @@ pub fn spawn<T>(future: T) .unwrap() } +/// Like `spawn` but compatible with futures 0.2 +#[cfg(feature = "unstable-futures")] +pub fn spawn2<T>(future: T) + where T: futures2::Future<Item = (), Error = futures2::Never> + Send + 'static, +{ + DefaultExecutor::current().spawn2(Box::new(future)) + .unwrap() +} + /// Set the default executor for the duration of the closure /// /// # Panics diff --git a/tokio-executor/src/lib.rs b/tokio-executor/src/lib.rs index 7aab4d86..690c1e48 100644 --- a/tokio-executor/src/lib.rs +++ b/tokio-executor/src/lib.rs @@ -35,6 +35,9 @@ extern crate futures; +#[cfg(feature = "unstable-futures")] +extern crate futures2; + mod enter; mod global; pub mod park; @@ -42,6 +45,9 @@ pub mod park; pub use enter::{enter, Enter, EnterError}; pub use global::{spawn, with_default, DefaultExecutor}; +#[cfg(feature = "unstable-futures")] +pub use global::spawn2; + use futures::Future; /// A value that executes futures. @@ -129,7 +135,12 @@ pub trait Executor { /// # fn main() {} /// ``` fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>) - -> Result<(), SpawnError>; + -> Result<(), SpawnError>; + + /// Like `spawn`, but compatible with futures 0.2 + #[cfg(feature = "unstable-futures")] + fn spawn2(&mut self, future: Box<futures2::Future<Item = (), Error = futures2::Never> + Send>) + -> Result<(), futures2::executor::SpawnError>; /// Provides a best effort **hint** to whether or not `spawn` will succeed. /// diff --git a/tokio-io/Cargo.toml b/tokio-io/Cargo.toml index 95a9db3e..bf823257 100644 --- a/tokio-io/Cargo.toml +++ b/tokio-io/Cargo.toml @@ -20,3 +20,8 @@ categories = ["asynchronous"] bytes = "0.4" futures = "0.1.18" log = "0.4" +futures2 = { version = "0.1", path = "../futures2", optional = true } + +[features] +unstable-futures = ["futures2"] +default = [] diff --git a/tokio-reactor/Cargo.toml b/tokio-reactor/Cargo.toml index 453b774e..0e2ab916 100644 --- a/tokio-reactor/Cargo.toml +++ b/tokio-reactor/Cargo.toml @@ -24,3 +24,8 @@ mio = "0.6.14" slab = "0.4.0" tokio-executor = { version = "0.1.0", path = "../tokio-executor" } tokio-io = { version = "0.1.6", path = "../tokio-io" } +futures2 = { version = "0.1", path = "../futures2", optional = true } + +[features] +unstable-futures = ["futures2"] +default = [] diff --git a/tokio-reactor/src/atomic_task.rs b/tokio-reactor/src/atomic_task.rs index 9b4ba00c..6a4788e6 100644 --- a/tokio-reactor/src/atomic_task.rs +++ b/tokio-reactor/src/atomic_task.rs @@ -1,10 +1,10 @@ -use futures::task::{self, Task}; - use std::fmt; use std::cell::UnsafeCell; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::{Acquire, Release}; +use Task; + /// A synchronization primitive for task notification. /// /// `AtomicTask` will coordinate concurrent notifications with the consumer @@ -69,11 +69,6 @@ impl AtomicTask { } } - /// Registers the **current** task to be notified on calls to `notify`. - pub fn register(&self) { - self.register_task(task::current()); - } - /// Registers the task to be notified on calls to `notify`. /// /// The new task will take place of any previous tasks that were registered @@ -89,7 +84,7 @@ impl AtomicTask { /// idea. Concurrent calls to `register` will attempt to register different /// tasks to be notified. One of the callers will win and have its task set, /// but there is no guarantee as to which caller will succeed. - pub fn register_task(&self, task: Task) { + pub(crate) fn register(&self, task: Task) { match self.state.compare_and_swap(WAITING, LOCKED_WRITE, Acquire) { WAITING => { unsafe { diff --git a/tokio-reactor/src/background.rs b/tokio-reactor/src/background.rs index 1f07c7b8..03f057cb 100644 --- a/tokio-reactor/src/background.rs +++ b/tokio-reactor/src/background.rs @@ -1,7 +1,7 @@ -use {Reactor, Handle}; +use {Reactor, Handle, Task}; use atomic_task::AtomicTask; -use futures::{Future, Async, Poll}; +use futures::{Future, Async, Poll, task}; use std::io; use std::thread; @@ -136,7 +136,8 @@ impl Future for Shutdown { type Error = (); fn poll(&mut self) -> Poll<(), ()> { - self.inner.shared.shutdown_task.register(); + let task = Task::Futures1(task::current()); + self.inner.shared.shutdown_task.register(task); if !self.inner.is_shutdown() { return Ok(Async::NotReady); diff --git a/tokio-reactor/src/lib.rs b/tokio-reactor/src/lib.rs index ae105824..a0ce1788 100644 --- a/tokio-reactor/src/lib.rs +++ b/tokio-reactor/src/lib.rs @@ -39,6 +39,9 @@ extern crate slab; extern crate tokio_executor; extern crate tokio_io; +#[cfg(feature = "unstable-futures")] +extern crate futures2; + pub(crate) mod background; mod atomic_task; mod poll_evented; @@ -69,7 +72,6 @@ use std::time::{Duration, Instant}; use log::Level; use mio::event::Evented; use slab::Slab; -use futures::task::Task; /// The core reactor, or event loop. /// @@ -155,6 +157,14 @@ fn _assert_kinds() { _assert::<Handle>(); } +/// A wakeup handle for a task, which may be either a futures 0.1 or 0.2 task +#[derive(Debug, Clone)] +pub(crate) enum Task { + Futures1(futures::task::Task), + #[cfg(feature = "unstable-futures")] + Futures2(futures2::task::Waker), +} + // ===== impl Reactor ===== /// Set the default reactor for the duration of the closure @@ -578,7 +588,7 @@ impl Inner { Direction::Write => (&sched.writer, mio::Ready::writable()), }; - task.register_task(t); + task.register(t); if sched.readiness.load(SeqCst) & ready.as_usize() != 0 { task.notify(); @@ -611,6 +621,17 @@ impl Direction { } } +impl Task { + fn notify(&self) { + match *self { + Task::Futures1(ref task) => task.notify(), + + #[cfg(feature = "unstable-futures")] + Task::Futures2(ref waker) => waker.wake(), + } + } +} + #[cfg(all(unix, not(target_os = "fuchsia")))] mod platform { use mio::Ready; @@ -637,3 +658,19 @@ mod platform { false } } + +#[cfg(feature = "unstable-futures")] +fn lift_async<T>(old: futures::Async<T>) -> futures2::Async<T> { + match old { + futures::Async::Ready(x) => futures2::Async::Ready(x), + futures::Async::NotReady => futures2::Async::Pending, + } +} + +#[cfg(feature = "unstable-futures")] +fn lower_async<T>(new: futures2::Async<T>) -> futures::Async<T> { + match new { + futures2::Async::Ready(x) => futures::Async::Ready(x), + futures2::Async::Pending => futures::Async::NotReady, + } +} diff --git a/tokio-reactor/src/poll_evented.rs b/tokio-reactor/src/poll_evented.rs index bddf94c7..c9f5b77d 100644 --- a/tokio-reactor/src/poll_evented.rs +++ b/tokio-reactor/src/poll_evented.rs @@ -5,6 +5,9 @@ use mio; use mio::event::Evented; use tokio_io::{AsyncRead, AsyncWrite}; +#[cfg(feature = "unstable-futures")] +use futures2; + use std::fmt; use std::io::{self, Read, Write}; use std::sync::atomic::AtomicUsize; @@ -99,7 +102,7 @@ struct Inner { // ===== impl PollEvented ===== macro_rules! poll_ready { - ($me:expr, $mask:expr, $cache:ident, $poll:ident, $take:ident) => {{ + ($me:expr, $mask:expr, $cache:ident, $take:ident, $poll:expr) => {{ $me.register()?; // Load cached & encoded readiness. @@ -114,7 +117,7 @@ macro_rules! poll_ready { |