summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-04-02 07:52:02 -0700
committerGitHub <noreply@github.com>2020-04-02 07:52:02 -0700
commitfa4fe9ef6feea7c8c88c81559797e57da7368b36 (patch)
treeda38800980cbb49554f83c449535afa8952dacb4 /tokio
parentf01136b5c0bbbe72fa674df4924cc53a872cffff (diff)
rt: fix queue regression (#2362)
The new queue uses `u8` to track offsets. Cursors are expected to wrap. An operation was performed with `+` instead of `wrapping_add`. This was not _obviously_ issue before as it is difficult to wrap a `usize` on 64bit platforms, but wrapping a `u8` is trivial. The fix is to use `wrapping_add` instead of `+`. A new test is added that catches the issue. Fixes #2361
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/runtime/queue.rs8
-rw-r--r--tokio/tests/rt_common.rs80
2 files changed, 79 insertions, 9 deletions
diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/queue.rs
index 81408135..dc78dbd0 100644
--- a/tokio/src/runtime/queue.rs
+++ b/tokio/src/runtime/queue.rs
@@ -209,8 +209,8 @@ impl<T> Local<T> {
for i in 0..n {
let j = i + 1;
- let i_idx = (i + head) as usize & MASK;
- let j_idx = (j + head) as usize & MASK;
+ let i_idx = i.wrapping_add(head) as usize & MASK;
+ let j_idx = j.wrapping_add(head) as usize & MASK;
// Get the next pointer
let next = if j == n {
@@ -319,10 +319,6 @@ impl<T> Steal<T> {
return Some(ret);
}
- // Synchronize with stealers
- let (dst_steal, dst_real) = unpack(dst.inner.head.load(Acquire));
- assert_eq!(dst_steal, dst_real);
-
// Make the stolen items available to consumers
dst.inner.tail.store(dst_tail.wrapping_add(n), Release);
diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs
index ae16721e..8dc0da3c 100644
--- a/tokio/tests/rt_common.rs
+++ b/tokio/tests/rt_common.rs
@@ -18,12 +18,26 @@ macro_rules! rt_test {
}
}
- mod threaded_scheduler {
+ mod threaded_scheduler_4_threads {
$($t)*
fn rt() -> Runtime {
tokio::runtime::Builder::new()
.threaded_scheduler()
+ .core_threads(4)
+ .enable_all()
+ .build()
+ .unwrap()
+ }
+ }
+
+ mod threaded_scheduler_1_thread {
+ $($t)*
+
+ fn rt() -> Runtime {
+ tokio::runtime::Builder::new()
+ .threaded_scheduler()
+ .core_threads(1)
.enable_all()
.build()
.unwrap()
@@ -150,10 +164,10 @@ rt_test! {
}
#[test]
- fn spawn_many() {
+ fn spawn_many_from_block_on() {
use tokio::sync::mpsc;
- const ITER: usize = 20;
+ const ITER: usize = 200;
let mut rt = rt();
@@ -200,6 +214,66 @@ rt_test! {
}
#[test]
+ fn spawn_many_from_task() {
+ use tokio::sync::mpsc;
+
+ const ITER: usize = 500;
+
+ let mut rt = rt();
+
+ let out = rt.block_on(async {
+ tokio::spawn(async move {
+ let (done_tx, mut done_rx) = mpsc::unbounded_channel();
+
+ /*
+ for _ in 0..100 {
+ tokio::spawn(async move { });
+ }
+
+ tokio::task::yield_now().await;
+ */
+
+ let mut txs = (0..ITER)
+ .map(|i| {
+ let (tx, rx) = oneshot::channel();
+ let done_tx = done_tx.clone();
+
+ tokio::spawn(async move {
+ let msg = assert_ok!(rx.await);
+ assert_eq!(i, msg);
+ assert_ok!(done_tx.send(msg));
+ });
+
+ tx
+ })
+ .collect::<Vec<_>>();
+
+ drop(done_tx);
+
+ thread::spawn(move || {
+ for (i, tx) in txs.drain(..).enumerate() {
+ assert_ok!(tx.send(i));
+ }
+ });
+
+ let mut out = vec![];
+ while let Some(i) = done_rx.recv().await {
+ out.push(i);
+ }
+
+ out.sort();
+ out
+ }).await.unwrap()
+ });
+
+ assert_eq!(ITER, out.len());
+
+ for i in 0..ITER {
+ assert_eq!(i, out[i]);
+ }
+ }
+
+ #[test]
fn spawn_await_chain() {
let mut rt = rt();