summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/tests
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-04-09 11:35:16 -0700
committerGitHub <noreply@github.com>2020-04-09 11:35:16 -0700
commit58ba45a38cc101e695895cc39d8ee4ce74176397 (patch)
tree18dd0eef6611562bd517f49f2c298714c53f5d2d /tokio/src/runtime/tests
parentde8326a5a490aafdf12848820d1496b758f1ec57 (diff)
rt: fix bug in work-stealing queue (#2387)
Fixes a couple bugs in the work-stealing queue introduced as part of #2315. First, the cursor needs to be able to represent more values than the size of the buffer. This is to be able to track if `tail` is ahead of `head` or if they are identical. This bug resulted in the "overflow" path being taken before the buffer was full. The second bug can happen when a queue is being stolen from concurrently with stealing into. In this case, it is possible for buffer slots to be overwritten before they are released by the stealer. This is harder to happen in practice due to the first bug preventing the queue from filling up 100%, but could still happen. It triggered an assertion in `steal_into`. This bug slipped through due to a bug in loom not correctly catching the case. The loom bug is fixed as part of tokio-rs/loom#119. Fixes: #2382
Diffstat (limited to 'tokio/src/runtime/tests')
-rw-r--r--tokio/src/runtime/tests/loom_queue.rs141
-rw-r--r--tokio/src/runtime/tests/queue.rs157
2 files changed, 298 insertions, 0 deletions
diff --git a/tokio/src/runtime/tests/loom_queue.rs b/tokio/src/runtime/tests/loom_queue.rs
index 4e0b6454..de02610d 100644
--- a/tokio/src/runtime/tests/loom_queue.rs
+++ b/tokio/src/runtime/tests/loom_queue.rs
@@ -4,6 +4,110 @@ use crate::runtime::task::{self, Schedule, Task};
use loom::thread;
#[test]
+fn basic() {
+ loom::model(|| {
+ let (steal, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ let th = thread::spawn(move || {
+ let (_, mut local) = queue::local();
+ let mut n = 0;
+
+ for _ in 0..3 {
+ if steal.steal_into(&mut local).is_some() {
+ n += 1;
+ }
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+ }
+
+ n
+ });
+
+ let mut n = 0;
+
+ for _ in 0..2 {
+ for _ in 0..2 {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+ }
+
+ if local.pop().is_some() {
+ n += 1;
+ }
+
+ // Push another task
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+ }
+
+ while inject.pop().is_some() {
+ n += 1;
+ }
+
+ n += th.join().unwrap();
+
+ assert_eq!(6, n);
+ });
+}
+
+#[test]
+fn steal_overflow() {
+ loom::model(|| {
+ let (steal, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ let th = thread::spawn(move || {
+ let (_, mut local) = queue::local();
+ let mut n = 0;
+
+ if steal.steal_into(&mut local).is_some() {
+ n += 1;
+ }
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+
+ n
+ });
+
+ let mut n = 0;
+
+ // push a task, pop a task
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+
+ if local.pop().is_some() {
+ n += 1;
+ }
+
+ for _ in 0..6 {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+ }
+
+ n += th.join().unwrap();
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+
+ while inject.pop().is_some() {
+ n += 1;
+ }
+
+ assert_eq!(7, n);
+ });
+}
+
+#[test]
fn multi_stealer() {
const NUM_TASKS: usize = 5;
@@ -57,6 +161,43 @@ fn multi_stealer() {
});
}
+#[test]
+fn chained_steal() {
+ loom::model(|| {
+ let (s1, mut l1) = queue::local();
+ let (s2, mut l2) = queue::local();
+ let inject = queue::Inject::new();
+
+ // Load up some tasks
+ for _ in 0..4 {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ l1.push_back(task, &inject);
+
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ l2.push_back(task, &inject);
+ }
+
+ // Spawn a task to steal from **our** queue
+ let th = thread::spawn(move || {
+ let (_, mut local) = queue::local();
+ s1.steal_into(&mut local);
+
+ while local.pop().is_some() {}
+ });
+
+ // Drain our tasks, then attempt to steal
+ while l1.pop().is_some() {}
+
+ s2.steal_into(&mut l1);
+
+ th.join().unwrap();
+
+ while l1.pop().is_some() {}
+ while l2.pop().is_some() {}
+ while inject.pop().is_some() {}
+ });
+}
+
struct Runtime;
impl Schedule for Runtime {
diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs
index 912fb56a..d228d5dc 100644
--- a/tokio/src/runtime/tests/queue.rs
+++ b/tokio/src/runtime/tests/queue.rs
@@ -1,6 +1,47 @@
use crate::runtime::queue;
use crate::runtime::task::{self, Schedule, Task};
+use std::thread;
+use std::time::Duration;
+
+#[test]
+fn fits_256() {
+ let (_, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ for _ in 0..256 {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+ }
+
+ assert!(inject.pop().is_none());
+
+ while local.pop().is_some() {}
+}
+
+#[test]
+fn overflow() {
+ let (_, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ for _ in 0..257 {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+ }
+
+ let mut n = 0;
+
+ while inject.pop().is_some() {
+ n += 1;
+ }
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+
+ assert_eq!(n, 257);
+}
+
#[test]
fn steal_batch() {
let (steal1, mut local1) = queue::local();
@@ -27,6 +68,122 @@ fn steal_batch() {
assert!(local1.pop().is_none());
}
+#[test]
+fn stress1() {
+ const NUM_ITER: usize = 1;
+ const NUM_STEAL: usize = 1_000;
+ const NUM_LOCAL: usize = 1_000;
+ const NUM_PUSH: usize = 500;
+ const NUM_POP: usize = 250;
+
+ for _ in 0..NUM_ITER {
+ let (steal, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ let th = thread::spawn(move || {
+ let (_, mut local) = queue::local();
+ let mut n = 0;
+
+ for _ in 0..NUM_STEAL {
+ if steal.steal_into(&mut local).is_some() {
+ n += 1;
+ }
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+
+ thread::yield_now();
+ }
+
+ n
+ });
+
+ let mut n = 0;
+
+ for _ in 0..NUM_LOCAL {
+ for _ in 0..NUM_PUSH {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+ }
+
+ for _ in 0..NUM_POP {
+ if local.pop().is_some() {
+ n += 1;
+ } else {
+ break;
+ }
+ }
+ }
+
+ while inject.pop().is_some() {
+ n += 1;
+ }
+
+ n += th.join().unwrap();
+
+ assert_eq!(n, NUM_LOCAL * NUM_PUSH);
+ }
+}
+
+#[test]
+fn stress2() {
+ const NUM_ITER: usize = 1;
+ const NUM_TASKS: usize = 1_000_000;
+ const NUM_STEAL: usize = 1_000;
+
+ for _ in 0..NUM_ITER {
+ let (steal, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ let th = thread::spawn(move || {
+ let (_, mut local) = queue::local();
+ let mut n = 0;
+
+ for _ in 0..NUM_STEAL {
+ if steal.steal_into(&mut local).is_some() {
+ n += 1;
+ }
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+
+ thread::sleep(Duration::from_micros(10));
+ }
+
+ n
+ });
+
+ let mut num_pop = 0;
+
+ for i in 0..NUM_TASKS {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+
+ if i % 128 == 0 && local.pop().is_some() {
+ num_pop += 1;
+ }
+
+ while inject.pop().is_some() {
+ num_pop += 1;
+ }
+ }
+
+ num_pop += th.join().unwrap();
+
+ while local.pop().is_some() {
+ num_pop += 1;
+ }
+
+ while inject.pop().is_some() {
+ num_pop += 1;
+ }
+
+ assert_eq!(num_pop, NUM_TASKS);
+ }
+}
+
struct Runtime;
impl Schedule for Runtime {