diff options
author | Carl Lerche <me@carllerche.com> | 2020-04-02 07:52:02 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-02 07:52:02 -0700 |
commit | fa4fe9ef6feea7c8c88c81559797e57da7368b36 (patch) | |
tree | da38800980cbb49554f83c449535afa8952dacb4 /tokio | |
parent | f01136b5c0bbbe72fa674df4924cc53a872cffff (diff) |
rt: fix queue regression (#2362)
The new queue uses `u8` to track offsets. Cursors are expected to wrap.
An operation was performed with `+` instead of `wrapping_add`. This was
not _obviously_ issue before as it is difficult to wrap a `usize` on
64bit platforms, but wrapping a `u8` is trivial.
The fix is to use `wrapping_add` instead of `+`. A new test is added
that catches the issue.
Fixes #2361
Diffstat (limited to 'tokio')
-rw-r--r-- | tokio/src/runtime/queue.rs | 8 | ||||
-rw-r--r-- | tokio/tests/rt_common.rs | 80 |
2 files changed, 79 insertions, 9 deletions
diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/queue.rs index 81408135..dc78dbd0 100644 --- a/tokio/src/runtime/queue.rs +++ b/tokio/src/runtime/queue.rs @@ -209,8 +209,8 @@ impl<T> Local<T> { for i in 0..n { let j = i + 1; - let i_idx = (i + head) as usize & MASK; - let j_idx = (j + head) as usize & MASK; + let i_idx = i.wrapping_add(head) as usize & MASK; + let j_idx = j.wrapping_add(head) as usize & MASK; // Get the next pointer let next = if j == n { @@ -319,10 +319,6 @@ impl<T> Steal<T> { return Some(ret); } - // Synchronize with stealers - let (dst_steal, dst_real) = unpack(dst.inner.head.load(Acquire)); - assert_eq!(dst_steal, dst_real); - // Make the stolen items available to consumers dst.inner.tail.store(dst_tail.wrapping_add(n), Release); diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index ae16721e..8dc0da3c 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -18,12 +18,26 @@ macro_rules! rt_test { } } - mod threaded_scheduler { + mod threaded_scheduler_4_threads { $($t)* fn rt() -> Runtime { tokio::runtime::Builder::new() .threaded_scheduler() + .core_threads(4) + .enable_all() + .build() + .unwrap() + } + } + + mod threaded_scheduler_1_thread { + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new() + .threaded_scheduler() + .core_threads(1) .enable_all() .build() .unwrap() @@ -150,10 +164,10 @@ rt_test! { } #[test] - fn spawn_many() { + fn spawn_many_from_block_on() { use tokio::sync::mpsc; - const ITER: usize = 20; + const ITER: usize = 200; let mut rt = rt(); @@ -200,6 +214,66 @@ rt_test! { } #[test] + fn spawn_many_from_task() { + use tokio::sync::mpsc; + + const ITER: usize = 500; + + let mut rt = rt(); + + let out = rt.block_on(async { + tokio::spawn(async move { + let (done_tx, mut done_rx) = mpsc::unbounded_channel(); + + /* + for _ in 0..100 { + tokio::spawn(async move { }); + } + + tokio::task::yield_now().await; + */ + + let mut txs = (0..ITER) + .map(|i| { + let (tx, rx) = oneshot::channel(); + let done_tx = done_tx.clone(); + + tokio::spawn(async move { + let msg = assert_ok!(rx.await); + assert_eq!(i, msg); + assert_ok!(done_tx.send(msg)); + }); + + tx + }) + .collect::<Vec<_>>(); + + drop(done_tx); + + thread::spawn(move || { + for (i, tx) in txs.drain(..).enumerate() { + assert_ok!(tx.send(i)); + } + }); + + let mut out = vec![]; + while let Some(i) = done_rx.recv().await { + out.push(i); + } + + out.sort(); + out + }).await.unwrap() + }); + + assert_eq!(ITER, out.len()); + + for i in 0..ITER { + assert_eq!(i, out[i]); + } + } + + #[test] fn spawn_await_chain() { let mut rt = rt(); |