summaryrefslogtreecommitdiffstats
path: root/tokio-current-thread
diff options
context:
space:
mode:
authorJon Gjengset <jon@thesquareplanet.com>2018-07-24 16:49:01 -0400
committerCarl Lerche <me@carllerche.com>2018-07-24 13:49:01 -0700
commit1e90e2772071cd3de6e751ebb1abcacbe31e8b1b (patch)
treed6b8e55f4dafc44649619a156b80f27f19aa46d3 /tokio-current-thread
parentf212a2ab9d0fd1506c050b04e3710b914ca48521 (diff)
Count in-transit spawned futures to current thread executor as pending (#478)
Diffstat (limited to 'tokio-current-thread')
-rw-r--r--tokio-current-thread/src/lib.rs106
-rw-r--r--tokio-current-thread/src/scheduler.rs7
2 files changed, 86 insertions, 27 deletions
diff --git a/tokio-current-thread/src/lib.rs b/tokio-current-thread/src/lib.rs
index 036893b5..1c22333c 100644
--- a/tokio-current-thread/src/lib.rs
+++ b/tokio-current-thread/src/lib.rs
@@ -42,8 +42,8 @@ use std::fmt;
use std::cell::Cell;
use std::error::Error;
use std::rc::Rc;
+use std::sync::{atomic, mpsc, Arc};
use std::time::{Duration, Instant};
-use std::sync::mpsc;
#[cfg(feature = "unstable-futures")]
use futures2;
@@ -53,8 +53,11 @@ pub struct CurrentThread<P: Park = ParkThread> {
/// Execute futures and receive unpark notifications.
scheduler: Scheduler<P::Unpark>,
- /// Current number of futures being executed
- num_futures: usize,
+ /// Current number of futures being executed.
+ ///
+ /// The LSB is used to indicate that the runtime is preparing to shut down.
+ /// Thus, to get the actual number of pending futures, `>>1`.
+ num_futures: Arc<atomic::AtomicUsize>,
/// Thread park handle
park: P,
@@ -177,11 +180,11 @@ impl<T: fmt::Debug> Error for BlockError<T> {
/// This is mostly split out to make the borrow checker happy.
struct Borrow<'a, U: 'a> {
scheduler: &'a mut Scheduler<U>,
- num_futures: &'a mut usize,
+ num_futures: &'a atomic::AtomicUsize,
}
trait SpawnLocal {
- fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>);
+ fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>, already_counted: bool);
}
struct CurrentRunner {
@@ -260,11 +263,18 @@ impl<P: Park> CurrentThread<P> {
let scheduler = Scheduler::new(unpark);
let notify = scheduler.notify();
+ let num_futures = Arc::new(atomic::AtomicUsize::new(0));
+
CurrentThread {
scheduler: scheduler,
- num_futures: 0,
+ num_futures: num_futures.clone(),
park,
- spawn_handle: Handle { sender: spawn_sender, notify: notify },
+ spawn_handle: Handle {
+ sender: spawn_sender,
+ num_futures: num_futures,
+ notify: notify,
+ shut_down: Cell::new(false),
+ },
spawn_receiver: spawn_receiver,
}
}
@@ -272,8 +282,11 @@ impl<P: Park> CurrentThread<P> {
/// Returns `true` if the executor is currently idle.
///
/// An idle executor is defined by not currently having any spawned tasks.
+ ///
+ /// Note that this method is inherently racy -- if a future is spawned from a remote `Handle`,
+ /// this method may return `true` even though there are more futures to be executed.
pub fn is_idle(&self) -> bool {
- self.num_futures == 0
+ self.num_futures.load(atomic::Ordering::SeqCst) <= 1
}
/// Spawn the future on the executor.
@@ -282,7 +295,7 @@ impl<P: Park> CurrentThread<P> {
pub fn spawn<F>(&mut self, future: F) -> &mut Self
where F: Future<Item = (), Error = ()> + 'static,
{
- self.borrow().spawn_local(Box::new(future));
+ self.borrow().spawn_local(Box::new(future), false);
self
}
@@ -358,7 +371,7 @@ impl<P: Park> CurrentThread<P> {
fn borrow(&mut self) -> Borrow<P::Unpark> {
Borrow {
scheduler: &mut self.scheduler,
- num_futures: &mut self.num_futures,
+ num_futures: &*self.num_futures,
}
}
@@ -371,11 +384,30 @@ impl<P: Park> CurrentThread<P> {
}
}
+impl<P: Park> Drop for CurrentThread<P> {
+ fn drop(&mut self) {
+ // Signal to Handles that no more futures can be spawned by setting LSB.
+ //
+ // NOTE: this isn't technically necessary since the send on the mpsc will fail once the
+ // receiver is dropped, but it's useful to illustrate how clean shutdown will be
+ // implemented (e.g., by setting the LSB).
+ let pending = self.num_futures.fetch_add(1, atomic::Ordering::SeqCst);
+
+ // TODO: We currently ignore any pending futures at the time we shut down.
+ //
+ // The "proper" fix for this is to have an explicit shutdown phase (`shutdown_on_idle`)
+ // which sets LSB (as above) do make Handle::spawn stop working, and then runs until
+ // num_futures.load() == 1.
+ let _ = pending;
+ }
+}
+
impl tokio_executor::Executor for CurrentThread {
- fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>)
- -> Result<(), SpawnError>
- {
- self.borrow().spawn_local(future);
+ fn spawn(
+ &mut self,
+ future: Box<Future<Item = (), Error = ()> + Send>,
+ ) -> Result<(), SpawnError> {
+ self.borrow().spawn_local(future, false);
Ok(())
}
@@ -391,7 +423,7 @@ impl<P: Park> fmt::Debug for CurrentThread<P> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("CurrentThread")
.field("scheduler", &self.scheduler)
- .field("num_futures", &self.num_futures)
+ .field("num_futures", &self.num_futures.load(atomic::Ordering::SeqCst))
.finish()
}
}
@@ -405,7 +437,7 @@ impl<'a, P: Park> Entered<'a, P> {
pub fn spawn<F>(&mut self, future: F) -> &mut Self
where F: Future<Item = (), Error = ()> + 'static,
{
- self.executor.borrow().spawn_local(Box::new(future));
+ self.executor.borrow().spawn_local(Box::new(future), false);
self
}
@@ -546,13 +578,13 @@ impl<'a, P: Park> Entered<'a, P> {
let (mut borrow, spawn_receiver) = (
Borrow {
scheduler: &mut self.executor.scheduler,
- num_futures: &mut self.executor.num_futures,
+ num_futures: &*self.executor.num_futures,
},
&mut self.executor.spawn_receiver,
);
while let Ok(future) = spawn_receiver.try_recv() {
- borrow.spawn_local(future);
+ borrow.spawn_local(future, true);
}
// After any pending futures were scheduled, do the actual tick
@@ -577,6 +609,8 @@ impl<'a, P: Park> fmt::Debug for Entered<'a, P> {
#[derive(Clone)]
pub struct Handle {
sender: mpsc::Sender<Box<Future<Item = (), Error = ()> + Send + 'static>>,
+ num_futures: Arc<atomic::AtomicUsize>,
+ shut_down: Cell<bool>,
notify: executor::NotifyHandle,
}
@@ -584,6 +618,7 @@ pub struct Handle {
impl fmt::Debug for Handle {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Handle")
+ .field("shut_down", &self.shut_down.get())
.finish()
}
}
@@ -596,8 +631,27 @@ impl Handle {
/// This function panics if the spawn fails. Failure occurs if the `CurrentThread`
/// instance of the `Handle` does not exist anymore.
pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError>
- where F: Future<Item = (), Error = ()> + Send + 'static {
- self.sender.send(Box::new(future))
+ where
+ F: Future<Item = (), Error = ()> + Send + 'static,
+ {
+ if self.shut_down.get() {
+ return Err(SpawnError::shutdown());
+ }
+
+ // NOTE: += 2 since LSB is the shutdown bit
+ let pending = self.num_futures.fetch_add(2, atomic::Ordering::SeqCst);
+ if pending % 2 == 1 {
+ // Bring the count back so we still know when the Runtime is idle.
+ self.num_futures.fetch_sub(2, atomic::Ordering::SeqCst);
+
+ // Once the Runtime is shutting down, we know it won't come back.
+ self.shut_down.set(true);
+
+ return Err(SpawnError::shutdown());
+ }
+
+ self.sender
+ .send(Box::new(future))
.expect("CurrentThread does not exist anymore");
// use 0 for the id, CurrentThread does not make use of it
self.notify.notify(0);
@@ -628,7 +682,7 @@ impl TaskExecutor {
CURRENT.with(|current| {
match current.spawn.get() {
Some(spawn) => {
- unsafe { (*spawn).spawn_local(future) };
+ unsafe { (*spawn).spawn_local(future, false) };
Ok(())
}
None => {
@@ -671,7 +725,7 @@ where F: Future<Item = (), Error = ()> + 'static
CURRENT.with(|current| {
match current.spawn.get() {
Some(spawn) => {
- unsafe { (*spawn).spawn_local(Box::new(future)) };
+ unsafe { (*spawn).spawn_local(Box::new(future), false) };
Ok(())
}
None => {
@@ -697,8 +751,12 @@ impl<'a, U: Unpark> Borrow<'a, U> {
}
impl<'a, U: Unpark> SpawnLocal for Borrow<'a, U> {
- fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>) {
- *self.num_futures += 1;
+ fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>, already_counted: bool) {
+ if !already_counted {
+ // NOTE: we have a borrow of the Runtime, so we know that it isn't shut down.
+ // NOTE: += 2 since LSB is the shutdown bit
+ self.num_futures.fetch_add(2, atomic::Ordering::SeqCst);
+ }
self.scheduler.schedule(future);
}
}
diff --git a/tokio-current-thread/src/scheduler.rs b/tokio-current-thread/src/scheduler.rs
index c66523bf..c82b9d12 100644
--- a/tokio-current-thread/src/scheduler.rs
+++ b/tokio-current-thread/src/scheduler.rs
@@ -10,7 +10,7 @@ use std::fmt::{self, Debug};
use std::mem;
use std::ptr;
use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel};
-use std::sync::atomic::{AtomicPtr, AtomicBool, AtomicUsize};
+use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize};
use std::sync::{Arc, Weak};
use std::usize;
use std::thread;
@@ -210,7 +210,7 @@ where U: Unpark,
///
/// This function should be called whenever the caller is notified via a
/// wakeup.
- pub fn tick(&mut self, enter: &mut Enter, num_futures: &mut usize) -> bool
+ pub fn tick(&mut self, enter: &mut Enter, num_futures: &AtomicUsize) -> bool
{
let mut ret = false;
let tick = self.inner.tick_num.fetch_add(1, SeqCst)
@@ -330,7 +330,8 @@ where U: Unpark,
};
if borrow.enter(enter, || scheduled.tick()) {
- *borrow.num_futures -= 1;
+ // we have a borrow of the Runtime, so we know it's not shut down
+ borrow.num_futures.fetch_sub(2, SeqCst);
}
}