diff options
author | Alex Crichton <alex@alexcrichton.com> | 2017-05-30 10:43:19 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-05-30 10:43:19 -0500 |
commit | d23c1a2b98b367d3c06637da9cfb240615bb7c61 (patch) | |
tree | 82b4325db1ca5583613ad13f7595043885efbd6a | |
parent | 0be8eab2609bb949ac0d8b4c4189ca3a5b1707a0 (diff) | |
parent | 16d15520adbf496b2effaf89109667e5dc0838a0 (diff) |
Merge pull request #214 from alexcrichton/futures-next
Update tokio-core with new task system
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | src/reactor/io_token.rs | 4 | ||||
-rw-r--r-- | src/reactor/mod.rs | 69 | ||||
-rw-r--r-- | src/reactor/timeout_token.rs | 2 |
4 files changed, 52 insertions, 25 deletions
@@ -24,7 +24,7 @@ scoped-tls = "0.1.0" slab = "0.3" iovec = "0.1" tokio-io = "0.1" -futures = "0.1.11" +futures = "0.1.14" [dev-dependencies] env_logger = { version = "0.3", default-features = false } diff --git a/src/reactor/io_token.rs b/src/reactor/io_token.rs index e8c4880b..8b3cac9d 100644 --- a/src/reactor/io_token.rs +++ b/src/reactor/io_token.rs @@ -83,7 +83,7 @@ impl IoToken { /// This function will also panic if there is not a currently running future /// task. pub fn schedule_read(&self, handle: &Remote) { - handle.send(Message::Schedule(self.token, task::park(), Direction::Read)); + handle.send(Message::Schedule(self.token, task::current(), Direction::Read)); } /// Schedule the current future task to receive a notification when the @@ -110,7 +110,7 @@ impl IoToken { /// This function will also panic if there is not a currently running future /// task. pub fn schedule_write(&self, handle: &Remote) { - handle.send(Message::Schedule(self.token, task::park(), Direction::Write)); + handle.send(Message::Schedule(self.token, task::current(), Direction::Write)); } /// Unregister all information associated with a token on an event loop, diff --git a/src/reactor/mod.rs b/src/reactor/mod.rs index e2b50958..52eb4c7a 100644 --- a/src/reactor/mod.rs +++ b/src/reactor/mod.rs @@ -15,8 +15,8 @@ use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; use std::time::{Instant, Duration}; use futures::{Future, IntoFuture, Async}; -use futures::future; -use futures::executor::{self, Spawn, Unpark}; +use futures::future::{self, Executor, ExecuteError}; +use futures::executor::{self, Spawn, Notify}; use futures::sync::mpsc; use futures::task::Task; use mio; @@ -116,7 +116,7 @@ struct ScheduledIo { struct ScheduledTask { _registration: mio::Registration, spawn: Option<Spawn<Box<Future<Item=(), Error=()>>>>, - wake: Arc<MySetReadiness>, + wake: Option<Arc<MySetReadiness>>, } enum TimeoutState { @@ -160,7 +160,7 @@ impl Core { mio::Ready::readable(), mio::PollOpt::level())); let rx_readiness = Arc::new(MySetReadiness(channel_pair.1)); - rx_readiness.unpark(); + rx_readiness.notify(0); Ok(Core { events: mio::Events::with_capacity(1024), @@ -227,13 +227,12 @@ impl Core { where F: Future, { let mut task = executor::spawn(f); - let ready = self.future_readiness.clone(); let mut future_fired = true; loop { if future_fired { let res = try!(CURRENT_LOOP.set(self, || { - task.poll_future(ready.clone()) + task.poll_future_notify(&self.future_readiness, 0) })); if let Async::Ready(e) = res { return Ok(e) @@ -344,22 +343,25 @@ impl Core { fn dispatch_task(&mut self, token: usize) { let mut inner = self.inner.borrow_mut(); let (task, wake) = match inner.task_dispatch.get_mut(token) { - Some(slot) => (slot.spawn.take(), slot.wake.clone()), + Some(slot) => (slot.spawn.take(), slot.wake.take()), None => return, }; - wake.0.set_readiness(mio::Ready::empty()).unwrap(); - let mut task = match task { - Some(task) => task, - None => return, + let (mut task, wake) = match (task, wake) { + (Some(task), Some(wake)) => (task, wake), + _ => return, }; + wake.0.set_readiness(mio::Ready::empty()).unwrap(); drop(inner); - let res = CURRENT_LOOP.set(self, || task.poll_future(wake)); + let res = CURRENT_LOOP.set(self, || { + task.poll_future_notify(&wake, 0) + }); let _task_to_drop; inner = self.inner.borrow_mut(); match res { Ok(Async::NotReady) => { assert!(inner.task_dispatch[token].spawn.is_none()); inner.task_dispatch[token].spawn = Some(task); + inner.task_dispatch[token].wake = Some(wake); } Ok(Async::Ready(())) | Err(()) => { @@ -391,19 +393,18 @@ impl Core { /// Method used to notify a task handle. /// - /// Note that this should be used instead of `handle.unpark()` to ensure + /// Note that this should be used instead of `handle.notify()` to ensure /// that the `CURRENT_LOOP` variable is set appropriately. fn notify_handle(&self, handle: Task) { debug!("notifying a task handle"); - CURRENT_LOOP.set(&self, || handle.unpark()); + CURRENT_LOOP.set(&self, || handle.notify()); } fn consume_queue(&self) { debug!("consuming notification queue"); // TODO: can we do better than `.unwrap()` here? - let unpark = self.rx_readiness.clone(); loop { - let msg = self.rx.borrow_mut().poll_stream(unpark.clone()).unwrap(); + let msg = self.rx.borrow_mut().poll_stream_notify(&self.rx_readiness, 0).unwrap(); match msg { Async::Ready(Some(msg)) => self.notify(msg), Async::NotReady | @@ -443,6 +444,14 @@ impl Core { } } +impl<F> Executor<F> for Core + where F: Future<Item = (), Error = ()> + 'static, +{ + fn execute(&self, future: F) -> Result<(), ExecuteError<F>> { + self.handle().execute(future) + } +} + impl fmt::Debug for Core { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Core") @@ -554,12 +563,12 @@ impl Inner { mio::PollOpt::level()) .expect("cannot fail future registration with mio"); let unpark = Arc::new(MySetReadiness(pair.1)); - let entry = entry.insert(ScheduledTask { + unpark.notify(0); + entry.insert(ScheduledTask { spawn: Some(executor::spawn(future)), - wake: unpark, + wake: Some(unpark), _registration: pair.0, }); - entry.get().wake.clone().unpark(); } } @@ -656,6 +665,15 @@ impl Remote { } } +impl<F> Executor<F> for Remote + where F: Future<Item = (), Error = ()> + Send + 'static, +{ + fn execute(&self, future: F) -> Result<(), ExecuteError<F>> { + self.spawn(|_| future); + Ok(()) + } +} + impl fmt::Debug for Remote { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Remote") @@ -700,6 +718,15 @@ impl Handle { } } +impl<F> Executor<F> for Handle + where F: Future<Item = (), Error = ()> + 'static, +{ + fn execute(&self, future: F) -> Result<(), ExecuteError<F>> { + self.spawn(future); + Ok(()) + } +} + impl fmt::Debug for Handle { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Handle") @@ -729,8 +756,8 @@ impl TimeoutState { struct MySetReadiness(mio::SetReadiness); -impl Unpark for MySetReadiness { - fn unpark(&self) { +impl Notify for MySetReadiness { + fn notify(&self, _id: usize) { self.0.set_readiness(mio::Ready::readable()) .expect("failed to set readiness"); } diff --git a/src/reactor/timeout_token.rs b/src/reactor/timeout_token.rs index ca82bde2..5c086216 100644 --- a/src/reactor/timeout_token.rs +++ b/src/reactor/timeout_token.rs @@ -30,7 +30,7 @@ impl TimeoutToken { /// This method will panic if the timeout specified was not created by this /// loop handle's `add_timeout` method. pub fn update_timeout(&self, handle: &Remote) { - handle.send(Message::UpdateTimeout(self.token, task::park())) + handle.send(Message::UpdateTimeout(self.token, task::current())) } /// Resets previously added (or fired) timeout to an new timeout |