summaryrefslogtreecommitdiffstats
path: root/tokio/src/task
diff options
context:
space:
mode:
authorEliza Weisman <eliza@buoyant.io>2019-12-05 12:00:10 -0800
committerCarl Lerche <me@carllerche.com>2019-12-05 12:00:10 -0800
commitb7ecd350365c2695b2cc6f513ef8a5ec7e320916 (patch)
treeb64b1f2a52d73aa6828c713c4124d2506d58c5c1 /tokio/src/task
parentdbcd1f9a0964c64d5aa335649eabea281ac59574 (diff)
task: fix `LocalSet` failing to poll all local futures (#1905)
Currently, a `LocalSet` does not notify the `LocalFuture` again at the end of a tick. This means that if we didn't poll every task in the run queue during that tick (e.g. there are more than 61 tasks enqueued), those tasks will not be polled. This commit fixes this issue by changing `local::Scheduler::tick` to return whether or not the local future needs to be notified again, and waking the task if so. Fixes #1899 Fixes #1900 Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Diffstat (limited to 'tokio/src/task')
-rw-r--r--tokio/src/task/local.rs89
1 files changed, 81 insertions, 8 deletions
diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs
index aed5105d..203a2f39 100644
--- a/tokio/src/task/local.rs
+++ b/tokio/src/task/local.rs
@@ -308,7 +308,12 @@ impl<F: Future> Future for LocalFuture<F> {
return Poll::Ready(output);
}
- scheduler.tick();
+ if scheduler.tick() {
+ // If `tick` returns true, we need to notify the local future again:
+ // there are still tasks remaining in the run queue.
+ cx.waker().wake_by_ref();
+ }
+
Poll::Pending
}
}
@@ -388,7 +393,9 @@ impl Scheduler {
.unwrap_or(false)
}
- fn tick(&self) {
+ /// Tick the scheduler, returning whether the local future needs to be
+ /// notified again.
+ fn tick(&self) -> bool {
assert!(self.is_current());
for _ in 0..MAX_TASKS_PER_TICK {
let tick = self.tick.get().wrapping_add(1);
@@ -400,7 +407,10 @@ impl Scheduler {
self.queues.next_task(tick)
} {
Some(task) => task,
- None => return,
+ // We have fully drained the queue of notified tasks, so the
+ // local future doesn't need to be notified again — it can wait
+ // until something else wakes a task in the local set.
+ None => return false,
};
if let Some(task) = task.run(&mut || Some(self.into())) {
@@ -411,6 +421,8 @@ impl Scheduler {
}
}
}
+
+ true
}
}
@@ -435,7 +447,11 @@ impl Drop for Scheduler {
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
- use crate::{runtime, task};
+ use crate::{
+ runtime,
+ sync::{mpsc, oneshot},
+ task, time,
+ };
use std::time::Duration;
#[test]
@@ -469,7 +485,6 @@ mod tests {
fn local_threadpool_timer() {
// This test ensures that runtime services like the timer are properly
// set for the local task set.
- use std::time::Duration;
thread_local! {
static ON_RT_THREAD: Cell<bool> = Cell::new(false);
}
@@ -659,8 +674,6 @@ mod tests {
#[test]
fn drop_cancels_tasks() {
// This test reproduces issue #1842
- use crate::sync::oneshot;
-
let mut rt = runtime::Builder::new()
.enable_time()
.basic_scheduler()
@@ -673,7 +686,7 @@ mod tests {
local.spawn_local(async move {
started_tx.send(()).unwrap();
loop {
- crate::time::delay_for(Duration::from_secs(3600)).await;
+ time::delay_for(Duration::from_secs(3600)).await;
}
});
@@ -741,4 +754,64 @@ mod tests {
thread.join().expect("test thread should not panic!")
}
+
+ #[test]
+ fn local_tasks_are_polled_after_tick() {
+ // Reproduces issues #1899 and #1900
+ use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
+
+ static RX1: AtomicUsize = AtomicUsize::new(0);
+ static RX2: AtomicUsize = AtomicUsize::new(0);
+ static EXPECTED: usize = 500;
+
+ let (tx, mut rx) = mpsc::unbounded_channel();
+
+ let mut rt = runtime::Builder::new()
+ .basic_scheduler()
+ .enable_all()
+ .build()
+ .unwrap();
+
+ let local = LocalSet::new();
+
+ local.block_on(&mut rt, async {
+ let task2 = task::spawn(async move {
+ // Wait a bit
+ time::delay_for(Duration::from_millis(100)).await;
+
+ let mut oneshots = Vec::with_capacity(EXPECTED);
+
+ // Send values
+ for _ in 0..EXPECTED {
+ let (oneshot_tx, oneshot_rx) = oneshot::channel();
+ oneshots.push(oneshot_tx);
+ tx.send(oneshot_rx).unwrap();
+ }
+
+ time::delay_for(Duration::from_millis(100)).await;
+
+ for tx in oneshots.drain(..) {
+ tx.send(()).unwrap();
+ }
+
+ time::delay_for(Duration::from_millis(300)).await;
+ let rx1 = RX1.load(SeqCst);
+ let rx2 = RX2.load(SeqCst);
+ println!("EXPECT = {}; RX1 = {}; RX2 = {}", EXPECTED, rx1, rx2);
+ assert_eq!(EXPECTED, rx1);
+ assert_eq!(EXPECTED, rx2);
+ });
+
+ while let Some(oneshot) = rx.recv().await {
+ RX1.fetch_add(1, SeqCst);
+
+ task::spawn_local(async move {
+ oneshot.await.unwrap();
+ RX2.fetch_add(1, SeqCst);
+ });
+ }
+
+ task2.await.unwrap();
+ });
+ }
}