summaryrefslogtreecommitdiffstats
path: root/tokio-sync
diff options
context:
space:
mode:
authorSean McArthur <sean@seanmonstar.com>2019-02-11 14:54:33 -0800
committerSean McArthur <sean@seanmonstar.com>2019-02-14 13:26:53 -0800
commit860ca79d620102a9b00568a821ec8172f2e6af82 (patch)
tree86754dee947c9dbeef28cf331d28c61c60d2ed60 /tokio-sync
parent7b98bf7da3e75d57bd564852d3d410663d8d6b18 (diff)
Check Task::will_notify_current before cloning in AtomicTask
Diffstat (limited to 'tokio-sync')
-rw-r--r--tokio-sync/benches/mpsc.rs32
-rw-r--r--tokio-sync/src/task/atomic_task.rs50
-rw-r--r--tokio-sync/tests/atomic_task.rs42
3 files changed, 120 insertions, 4 deletions
diff --git a/tokio-sync/benches/mpsc.rs b/tokio-sync/benches/mpsc.rs
index d182319b..d83ae475 100644
--- a/tokio-sync/benches/mpsc.rs
+++ b/tokio-sync/benches/mpsc.rs
@@ -87,6 +87,22 @@ mod tokio {
}
#[bench]
+ fn unbounded_rx_not_ready_x5(b: &mut Bencher) {
+ let (_tx, mut rx) = unbounded_channel::<i32>();
+ b.iter(|| {
+ future::lazy(|| {
+ assert!(rx.poll().unwrap().is_not_ready());
+ assert!(rx.poll().unwrap().is_not_ready());
+ assert!(rx.poll().unwrap().is_not_ready());
+ assert!(rx.poll().unwrap().is_not_ready());
+ assert!(rx.poll().unwrap().is_not_ready());
+
+ Ok::<_, ()>(())
+ }).wait().unwrap();
+ })
+ }
+
+ #[bench]
fn bounded_uncontended_1(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel(1_000);
@@ -288,6 +304,22 @@ mod legacy {
}
#[bench]
+ fn unbounded_rx_not_ready_x5(b: &mut Bencher) {
+ let (_tx, mut rx) = unbounded::<i32>();
+ b.iter(|| {
+ future::lazy(|| {
+ assert!(rx.poll().unwrap().is_not_ready());
+ assert!(rx.poll().unwrap().is_not_ready());
+ assert!(rx.poll().unwrap().is_not_ready());
+ assert!(rx.poll().unwrap().is_not_ready());
+ assert!(rx.poll().unwrap().is_not_ready());
+
+ Ok::<_, ()>(())
+ }).wait().unwrap();
+ })
+ }
+
+ #[bench]
fn unbounded_uncontended_1(b: &mut Bencher) {
b.iter(|| {
let (tx, mut rx) = unbounded();
diff --git a/tokio-sync/src/task/atomic_task.rs b/tokio-sync/src/task/atomic_task.rs
index 86ced5aa..c1124494 100644
--- a/tokio-sync/src/task/atomic_task.rs
+++ b/tokio-sync/src/task/atomic_task.rs
@@ -149,7 +149,7 @@ impl AtomicTask {
///
/// This is the same as calling `register_task` with `task::current()`.
pub fn register(&self) {
- self.register_task(task::current());
+ self.do_register(CurrentTask);
}
/// Registers the provided task to be notified on calls to `notify`.
@@ -168,12 +168,19 @@ impl AtomicTask {
/// tasks to be notified. One of the callers will win and have its task set,
/// but there is no guarantee as to which caller will succeed.
pub fn register_task(&self, task: Task) {
+ self.do_register(ExactTask(task));
+ }
+
+ fn do_register<R>(&self, reg: R)
+ where
+ R: Register,
+ {
debug!(" + register_task");
match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) {
WAITING => {
unsafe {
// Locked acquired, update the waker cell
- self.task.with_mut(|t| *t = Some(task));
+ self.task.with_mut(|t| reg.register(&mut *t));
// Release the lock. If the state transitioned to include
// the `NOTIFYING` bit, this means that a notify has been
@@ -213,7 +220,7 @@ impl AtomicTask {
// Currently in the process of notifying the task, i.e.,
// `notify` is currently being called on the old task handle.
// So, we call notify on the new task handle
- task.notify();
+ reg.notify();
}
state => {
// In this case, a concurrent thread is holding the
@@ -284,3 +291,40 @@ impl fmt::Debug for AtomicTask {
unsafe impl Send for AtomicTask {}
unsafe impl Sync for AtomicTask {}
+
+trait Register {
+ fn register(self, slot: &mut Option<Task>);
+ fn notify(self);
+}
+
+struct CurrentTask;
+
+impl Register for CurrentTask {
+ fn register(self, slot: &mut Option<Task>) {
+ let should_update = (&*slot).as_ref()
+ .map(|prev| !prev.will_notify_current())
+ .unwrap_or(true);
+ if should_update {
+ *slot = Some(task::current());
+ }
+ }
+
+ fn notify(self) {
+ task::current().notify();
+ }
+}
+
+struct ExactTask(Task);
+
+impl Register for ExactTask {
+ fn register(self, slot: &mut Option<Task>) {
+ // When calling register_task with an exact task, it doesn't matter
+ // if the previous task would have notified current. We *always* want
+ // to save that exact task.
+ *slot = Some(self.0);
+ }
+
+ fn notify(self) {
+ self.0.notify();
+ }
+}
diff --git a/tokio-sync/tests/atomic_task.rs b/tokio-sync/tests/atomic_task.rs
index ccdcbbb4..d630e3ea 100644
--- a/tokio-sync/tests/atomic_task.rs
+++ b/tokio-sync/tests/atomic_task.rs
@@ -1,7 +1,9 @@
extern crate futures;
+extern crate tokio_mock_task;
extern crate tokio_sync;
-use futures::task::Task;
+use futures::task::{self, Task};
+use tokio_mock_task::*;
use tokio_sync::task::AtomicTask;
trait AssertSend: Send {}
@@ -12,3 +14,41 @@ impl AssertSync for AtomicTask {}
impl AssertSend for Task {}
impl AssertSync for Task {}
+
+#[test]
+fn register_task() {
+ // AtomicTask::register_task should *always* register the
+ // arbitrary task.
+
+ let atomic = AtomicTask::new();
+
+ let mut mock1 = MockTask::new();
+ let mut mock2 = MockTask::new();
+
+ // Register once...
+ mock1.enter(|| atomic.register());
+
+ // Grab the actual 2nd task from the mock...
+ let task2 = mock2.enter(task::current);
+
+ // Now register the 2nd task, even though in the context where
+ // the first task would be considered 'current'...
+ {
+ // Need a block to grab a reference, so that we only move
+ // task2 into the closure, not the AtomicTask...
+ let atomic = &atomic;
+ mock1.enter(move || {
+ atomic.register_task(task2);
+ });
+ }
+
+ // Just proving that they haven't been notified yet...
+ assert!(!mock1.is_notified(), "mock1 shouldn't be notified yet");
+ assert!(!mock2.is_notified(), "mock2 shouldn't be notified yet");
+
+ // Now trigger the notify, and ensure it was task2
+ atomic.notify();
+
+ assert!(!mock1.is_notified(), "mock1 shouldn't be notified");
+ assert!(mock2.is_notified(), "mock2 should be notified");
+}