summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2017-05-30 10:43:19 -0500
committerGitHub <noreply@github.com>2017-05-30 10:43:19 -0500
commitd23c1a2b98b367d3c06637da9cfb240615bb7c61 (patch)
tree82b4325db1ca5583613ad13f7595043885efbd6a
parent0be8eab2609bb949ac0d8b4c4189ca3a5b1707a0 (diff)
parent16d15520adbf496b2effaf89109667e5dc0838a0 (diff)
Merge pull request #214 from alexcrichton/futures-next
Update tokio-core with new task system
-rw-r--r--Cargo.toml2
-rw-r--r--src/reactor/io_token.rs4
-rw-r--r--src/reactor/mod.rs69
-rw-r--r--src/reactor/timeout_token.rs2
4 files changed, 52 insertions, 25 deletions
diff --git a/Cargo.toml b/Cargo.toml
index a0ad744f..368098d9 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -24,7 +24,7 @@ scoped-tls = "0.1.0"
slab = "0.3"
iovec = "0.1"
tokio-io = "0.1"
-futures = "0.1.11"
+futures = "0.1.14"
[dev-dependencies]
env_logger = { version = "0.3", default-features = false }
diff --git a/src/reactor/io_token.rs b/src/reactor/io_token.rs
index e8c4880b..8b3cac9d 100644
--- a/src/reactor/io_token.rs
+++ b/src/reactor/io_token.rs
@@ -83,7 +83,7 @@ impl IoToken {
/// This function will also panic if there is not a currently running future
/// task.
pub fn schedule_read(&self, handle: &Remote) {
- handle.send(Message::Schedule(self.token, task::park(), Direction::Read));
+ handle.send(Message::Schedule(self.token, task::current(), Direction::Read));
}
/// Schedule the current future task to receive a notification when the
@@ -110,7 +110,7 @@ impl IoToken {
/// This function will also panic if there is not a currently running future
/// task.
pub fn schedule_write(&self, handle: &Remote) {
- handle.send(Message::Schedule(self.token, task::park(), Direction::Write));
+ handle.send(Message::Schedule(self.token, task::current(), Direction::Write));
}
/// Unregister all information associated with a token on an event loop,
diff --git a/src/reactor/mod.rs b/src/reactor/mod.rs
index e2b50958..52eb4c7a 100644
--- a/src/reactor/mod.rs
+++ b/src/reactor/mod.rs
@@ -15,8 +15,8 @@ use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use std::time::{Instant, Duration};
use futures::{Future, IntoFuture, Async};
-use futures::future;
-use futures::executor::{self, Spawn, Unpark};
+use futures::future::{self, Executor, ExecuteError};
+use futures::executor::{self, Spawn, Notify};
use futures::sync::mpsc;
use futures::task::Task;
use mio;
@@ -116,7 +116,7 @@ struct ScheduledIo {
struct ScheduledTask {
_registration: mio::Registration,
spawn: Option<Spawn<Box<Future<Item=(), Error=()>>>>,
- wake: Arc<MySetReadiness>,
+ wake: Option<Arc<MySetReadiness>>,
}
enum TimeoutState {
@@ -160,7 +160,7 @@ impl Core {
mio::Ready::readable(),
mio::PollOpt::level()));
let rx_readiness = Arc::new(MySetReadiness(channel_pair.1));
- rx_readiness.unpark();
+ rx_readiness.notify(0);
Ok(Core {
events: mio::Events::with_capacity(1024),
@@ -227,13 +227,12 @@ impl Core {
where F: Future,
{
let mut task = executor::spawn(f);
- let ready = self.future_readiness.clone();
let mut future_fired = true;
loop {
if future_fired {
let res = try!(CURRENT_LOOP.set(self, || {
- task.poll_future(ready.clone())
+ task.poll_future_notify(&self.future_readiness, 0)
}));
if let Async::Ready(e) = res {
return Ok(e)
@@ -344,22 +343,25 @@ impl Core {
fn dispatch_task(&mut self, token: usize) {
let mut inner = self.inner.borrow_mut();
let (task, wake) = match inner.task_dispatch.get_mut(token) {
- Some(slot) => (slot.spawn.take(), slot.wake.clone()),
+ Some(slot) => (slot.spawn.take(), slot.wake.take()),
None => return,
};
- wake.0.set_readiness(mio::Ready::empty()).unwrap();
- let mut task = match task {
- Some(task) => task,
- None => return,
+ let (mut task, wake) = match (task, wake) {
+ (Some(task), Some(wake)) => (task, wake),
+ _ => return,
};
+ wake.0.set_readiness(mio::Ready::empty()).unwrap();
drop(inner);
- let res = CURRENT_LOOP.set(self, || task.poll_future(wake));
+ let res = CURRENT_LOOP.set(self, || {
+ task.poll_future_notify(&wake, 0)
+ });
let _task_to_drop;
inner = self.inner.borrow_mut();
match res {
Ok(Async::NotReady) => {
assert!(inner.task_dispatch[token].spawn.is_none());
inner.task_dispatch[token].spawn = Some(task);
+ inner.task_dispatch[token].wake = Some(wake);
}
Ok(Async::Ready(())) |
Err(()) => {
@@ -391,19 +393,18 @@ impl Core {
/// Method used to notify a task handle.
///
- /// Note that this should be used instead of `handle.unpark()` to ensure
+ /// Note that this should be used instead of `handle.notify()` to ensure
/// that the `CURRENT_LOOP` variable is set appropriately.
fn notify_handle(&self, handle: Task) {
debug!("notifying a task handle");
- CURRENT_LOOP.set(&self, || handle.unpark());
+ CURRENT_LOOP.set(&self, || handle.notify());
}
fn consume_queue(&self) {
debug!("consuming notification queue");
// TODO: can we do better than `.unwrap()` here?
- let unpark = self.rx_readiness.clone();
loop {
- let msg = self.rx.borrow_mut().poll_stream(unpark.clone()).unwrap();
+ let msg = self.rx.borrow_mut().poll_stream_notify(&self.rx_readiness, 0).unwrap();
match msg {
Async::Ready(Some(msg)) => self.notify(msg),
Async::NotReady |
@@ -443,6 +444,14 @@ impl Core {
}
}
+impl<F> Executor<F> for Core
+ where F: Future<Item = (), Error = ()> + 'static,
+{
+ fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
+ self.handle().execute(future)
+ }
+}
+
impl fmt::Debug for Core {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Core")
@@ -554,12 +563,12 @@ impl Inner {
mio::PollOpt::level())
.expect("cannot fail future registration with mio");
let unpark = Arc::new(MySetReadiness(pair.1));
- let entry = entry.insert(ScheduledTask {
+ unpark.notify(0);
+ entry.insert(ScheduledTask {
spawn: Some(executor::spawn(future)),
- wake: unpark,
+ wake: Some(unpark),
_registration: pair.0,
});
- entry.get().wake.clone().unpark();
}
}
@@ -656,6 +665,15 @@ impl Remote {
}
}
+impl<F> Executor<F> for Remote
+ where F: Future<Item = (), Error = ()> + Send + 'static,
+{
+ fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
+ self.spawn(|_| future);
+ Ok(())
+ }
+}
+
impl fmt::Debug for Remote {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Remote")
@@ -700,6 +718,15 @@ impl Handle {
}
}
+impl<F> Executor<F> for Handle
+ where F: Future<Item = (), Error = ()> + 'static,
+{
+ fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
+ self.spawn(future);
+ Ok(())
+ }
+}
+
impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Handle")
@@ -729,8 +756,8 @@ impl TimeoutState {
struct MySetReadiness(mio::SetReadiness);
-impl Unpark for MySetReadiness {
- fn unpark(&self) {
+impl Notify for MySetReadiness {
+ fn notify(&self, _id: usize) {
self.0.set_readiness(mio::Ready::readable())
.expect("failed to set readiness");
}
diff --git a/src/reactor/timeout_token.rs b/src/reactor/timeout_token.rs
index ca82bde2..5c086216 100644
--- a/src/reactor/timeout_token.rs
+++ b/src/reactor/timeout_token.rs
@@ -30,7 +30,7 @@ impl TimeoutToken {
/// This method will panic if the timeout specified was not created by this
/// loop handle's `add_timeout` method.
pub fn update_timeout(&self, handle: &Remote) {
- handle.send(Message::UpdateTimeout(self.token, task::park()))
+ handle.send(Message::UpdateTimeout(self.token, task::current()))
}
/// Resets previously added (or fired) timeout to an new timeout