diff options
author | Jon Gjengset <jon@thesquareplanet.com> | 2018-07-24 16:49:01 -0400 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2018-07-24 13:49:01 -0700 |
commit | 1e90e2772071cd3de6e751ebb1abcacbe31e8b1b (patch) | |
tree | d6b8e55f4dafc44649619a156b80f27f19aa46d3 /tokio-current-thread | |
parent | f212a2ab9d0fd1506c050b04e3710b914ca48521 (diff) |
Count in-transit spawned futures to current thread executor as pending (#478)
Diffstat (limited to 'tokio-current-thread')
-rw-r--r-- | tokio-current-thread/src/lib.rs | 106 | ||||
-rw-r--r-- | tokio-current-thread/src/scheduler.rs | 7 |
2 files changed, 86 insertions, 27 deletions
diff --git a/tokio-current-thread/src/lib.rs b/tokio-current-thread/src/lib.rs index 036893b5..1c22333c 100644 --- a/tokio-current-thread/src/lib.rs +++ b/tokio-current-thread/src/lib.rs @@ -42,8 +42,8 @@ use std::fmt; use std::cell::Cell; use std::error::Error; use std::rc::Rc; +use std::sync::{atomic, mpsc, Arc}; use std::time::{Duration, Instant}; -use std::sync::mpsc; #[cfg(feature = "unstable-futures")] use futures2; @@ -53,8 +53,11 @@ pub struct CurrentThread<P: Park = ParkThread> { /// Execute futures and receive unpark notifications. scheduler: Scheduler<P::Unpark>, - /// Current number of futures being executed - num_futures: usize, + /// Current number of futures being executed. + /// + /// The LSB is used to indicate that the runtime is preparing to shut down. + /// Thus, to get the actual number of pending futures, `>>1`. + num_futures: Arc<atomic::AtomicUsize>, /// Thread park handle park: P, @@ -177,11 +180,11 @@ impl<T: fmt::Debug> Error for BlockError<T> { /// This is mostly split out to make the borrow checker happy. struct Borrow<'a, U: 'a> { scheduler: &'a mut Scheduler<U>, - num_futures: &'a mut usize, + num_futures: &'a atomic::AtomicUsize, } trait SpawnLocal { - fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>); + fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>, already_counted: bool); } struct CurrentRunner { @@ -260,11 +263,18 @@ impl<P: Park> CurrentThread<P> { let scheduler = Scheduler::new(unpark); let notify = scheduler.notify(); + let num_futures = Arc::new(atomic::AtomicUsize::new(0)); + CurrentThread { scheduler: scheduler, - num_futures: 0, + num_futures: num_futures.clone(), park, - spawn_handle: Handle { sender: spawn_sender, notify: notify }, + spawn_handle: Handle { + sender: spawn_sender, + num_futures: num_futures, + notify: notify, + shut_down: Cell::new(false), + }, spawn_receiver: spawn_receiver, } } @@ -272,8 +282,11 @@ impl<P: Park> CurrentThread<P> { /// Returns `true` if the executor is currently idle. /// /// An idle executor is defined by not currently having any spawned tasks. + /// + /// Note that this method is inherently racy -- if a future is spawned from a remote `Handle`, + /// this method may return `true` even though there are more futures to be executed. pub fn is_idle(&self) -> bool { - self.num_futures == 0 + self.num_futures.load(atomic::Ordering::SeqCst) <= 1 } /// Spawn the future on the executor. @@ -282,7 +295,7 @@ impl<P: Park> CurrentThread<P> { pub fn spawn<F>(&mut self, future: F) -> &mut Self where F: Future<Item = (), Error = ()> + 'static, { - self.borrow().spawn_local(Box::new(future)); + self.borrow().spawn_local(Box::new(future), false); self } @@ -358,7 +371,7 @@ impl<P: Park> CurrentThread<P> { fn borrow(&mut self) -> Borrow<P::Unpark> { Borrow { scheduler: &mut self.scheduler, - num_futures: &mut self.num_futures, + num_futures: &*self.num_futures, } } @@ -371,11 +384,30 @@ impl<P: Park> CurrentThread<P> { } } +impl<P: Park> Drop for CurrentThread<P> { + fn drop(&mut self) { + // Signal to Handles that no more futures can be spawned by setting LSB. + // + // NOTE: this isn't technically necessary since the send on the mpsc will fail once the + // receiver is dropped, but it's useful to illustrate how clean shutdown will be + // implemented (e.g., by setting the LSB). + let pending = self.num_futures.fetch_add(1, atomic::Ordering::SeqCst); + + // TODO: We currently ignore any pending futures at the time we shut down. + // + // The "proper" fix for this is to have an explicit shutdown phase (`shutdown_on_idle`) + // which sets LSB (as above) do make Handle::spawn stop working, and then runs until + // num_futures.load() == 1. + let _ = pending; + } +} + impl tokio_executor::Executor for CurrentThread { - fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>) - -> Result<(), SpawnError> - { - self.borrow().spawn_local(future); + fn spawn( + &mut self, + future: Box<Future<Item = (), Error = ()> + Send>, + ) -> Result<(), SpawnError> { + self.borrow().spawn_local(future, false); Ok(()) } @@ -391,7 +423,7 @@ impl<P: Park> fmt::Debug for CurrentThread<P> { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("CurrentThread") .field("scheduler", &self.scheduler) - .field("num_futures", &self.num_futures) + .field("num_futures", &self.num_futures.load(atomic::Ordering::SeqCst)) .finish() } } @@ -405,7 +437,7 @@ impl<'a, P: Park> Entered<'a, P> { pub fn spawn<F>(&mut self, future: F) -> &mut Self where F: Future<Item = (), Error = ()> + 'static, { - self.executor.borrow().spawn_local(Box::new(future)); + self.executor.borrow().spawn_local(Box::new(future), false); self } @@ -546,13 +578,13 @@ impl<'a, P: Park> Entered<'a, P> { let (mut borrow, spawn_receiver) = ( Borrow { scheduler: &mut self.executor.scheduler, - num_futures: &mut self.executor.num_futures, + num_futures: &*self.executor.num_futures, }, &mut self.executor.spawn_receiver, ); while let Ok(future) = spawn_receiver.try_recv() { - borrow.spawn_local(future); + borrow.spawn_local(future, true); } // After any pending futures were scheduled, do the actual tick @@ -577,6 +609,8 @@ impl<'a, P: Park> fmt::Debug for Entered<'a, P> { #[derive(Clone)] pub struct Handle { sender: mpsc::Sender<Box<Future<Item = (), Error = ()> + Send + 'static>>, + num_futures: Arc<atomic::AtomicUsize>, + shut_down: Cell<bool>, notify: executor::NotifyHandle, } @@ -584,6 +618,7 @@ pub struct Handle { impl fmt::Debug for Handle { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("Handle") + .field("shut_down", &self.shut_down.get()) .finish() } } @@ -596,8 +631,27 @@ impl Handle { /// This function panics if the spawn fails. Failure occurs if the `CurrentThread` /// instance of the `Handle` does not exist anymore. pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError> - where F: Future<Item = (), Error = ()> + Send + 'static { - self.sender.send(Box::new(future)) + where + F: Future<Item = (), Error = ()> + Send + 'static, + { + if self.shut_down.get() { + return Err(SpawnError::shutdown()); + } + + // NOTE: += 2 since LSB is the shutdown bit + let pending = self.num_futures.fetch_add(2, atomic::Ordering::SeqCst); + if pending % 2 == 1 { + // Bring the count back so we still know when the Runtime is idle. + self.num_futures.fetch_sub(2, atomic::Ordering::SeqCst); + + // Once the Runtime is shutting down, we know it won't come back. + self.shut_down.set(true); + + return Err(SpawnError::shutdown()); + } + + self.sender + .send(Box::new(future)) .expect("CurrentThread does not exist anymore"); // use 0 for the id, CurrentThread does not make use of it self.notify.notify(0); @@ -628,7 +682,7 @@ impl TaskExecutor { CURRENT.with(|current| { match current.spawn.get() { Some(spawn) => { - unsafe { (*spawn).spawn_local(future) }; + unsafe { (*spawn).spawn_local(future, false) }; Ok(()) } None => { @@ -671,7 +725,7 @@ where F: Future<Item = (), Error = ()> + 'static CURRENT.with(|current| { match current.spawn.get() { Some(spawn) => { - unsafe { (*spawn).spawn_local(Box::new(future)) }; + unsafe { (*spawn).spawn_local(Box::new(future), false) }; Ok(()) } None => { @@ -697,8 +751,12 @@ impl<'a, U: Unpark> Borrow<'a, U> { } impl<'a, U: Unpark> SpawnLocal for Borrow<'a, U> { - fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>) { - *self.num_futures += 1; + fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>, already_counted: bool) { + if !already_counted { + // NOTE: we have a borrow of the Runtime, so we know that it isn't shut down. + // NOTE: += 2 since LSB is the shutdown bit + self.num_futures.fetch_add(2, atomic::Ordering::SeqCst); + } self.scheduler.schedule(future); } } diff --git a/tokio-current-thread/src/scheduler.rs b/tokio-current-thread/src/scheduler.rs index c66523bf..c82b9d12 100644 --- a/tokio-current-thread/src/scheduler.rs +++ b/tokio-current-thread/src/scheduler.rs @@ -10,7 +10,7 @@ use std::fmt::{self, Debug}; use std::mem; use std::ptr; use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel}; -use std::sync::atomic::{AtomicPtr, AtomicBool, AtomicUsize}; +use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize}; use std::sync::{Arc, Weak}; use std::usize; use std::thread; @@ -210,7 +210,7 @@ where U: Unpark, /// /// This function should be called whenever the caller is notified via a /// wakeup. - pub fn tick(&mut self, enter: &mut Enter, num_futures: &mut usize) -> bool + pub fn tick(&mut self, enter: &mut Enter, num_futures: &AtomicUsize) -> bool { let mut ret = false; let tick = self.inner.tick_num.fetch_add(1, SeqCst) @@ -330,7 +330,8 @@ where U: Unpark, }; if borrow.enter(enter, || scheduled.tick()) { - *borrow.num_futures -= 1; + // we have a borrow of the Runtime, so we know it's not shut down + borrow.num_futures.fetch_sub(2, SeqCst); } } |