diff options
author | Sean McArthur <sean@seanmonstar.com> | 2019-02-11 14:54:33 -0800 |
---|---|---|
committer | Sean McArthur <sean@seanmonstar.com> | 2019-02-14 13:26:53 -0800 |
commit | 860ca79d620102a9b00568a821ec8172f2e6af82 (patch) | |
tree | 86754dee947c9dbeef28cf331d28c61c60d2ed60 /tokio-sync | |
parent | 7b98bf7da3e75d57bd564852d3d410663d8d6b18 (diff) |
Check Task::will_notify_current before cloning in AtomicTask
Diffstat (limited to 'tokio-sync')
-rw-r--r-- | tokio-sync/benches/mpsc.rs | 32 | ||||
-rw-r--r-- | tokio-sync/src/task/atomic_task.rs | 50 | ||||
-rw-r--r-- | tokio-sync/tests/atomic_task.rs | 42 |
3 files changed, 120 insertions, 4 deletions
diff --git a/tokio-sync/benches/mpsc.rs b/tokio-sync/benches/mpsc.rs index d182319b..d83ae475 100644 --- a/tokio-sync/benches/mpsc.rs +++ b/tokio-sync/benches/mpsc.rs @@ -87,6 +87,22 @@ mod tokio { } #[bench] + fn unbounded_rx_not_ready_x5(b: &mut Bencher) { + let (_tx, mut rx) = unbounded_channel::<i32>(); + b.iter(|| { + future::lazy(|| { + assert!(rx.poll().unwrap().is_not_ready()); + assert!(rx.poll().unwrap().is_not_ready()); + assert!(rx.poll().unwrap().is_not_ready()); + assert!(rx.poll().unwrap().is_not_ready()); + assert!(rx.poll().unwrap().is_not_ready()); + + Ok::<_, ()>(()) + }).wait().unwrap(); + }) + } + + #[bench] fn bounded_uncontended_1(b: &mut Bencher) { b.iter(|| { let (mut tx, mut rx) = channel(1_000); @@ -288,6 +304,22 @@ mod legacy { } #[bench] + fn unbounded_rx_not_ready_x5(b: &mut Bencher) { + let (_tx, mut rx) = unbounded::<i32>(); + b.iter(|| { + future::lazy(|| { + assert!(rx.poll().unwrap().is_not_ready()); + assert!(rx.poll().unwrap().is_not_ready()); + assert!(rx.poll().unwrap().is_not_ready()); + assert!(rx.poll().unwrap().is_not_ready()); + assert!(rx.poll().unwrap().is_not_ready()); + + Ok::<_, ()>(()) + }).wait().unwrap(); + }) + } + + #[bench] fn unbounded_uncontended_1(b: &mut Bencher) { b.iter(|| { let (tx, mut rx) = unbounded(); diff --git a/tokio-sync/src/task/atomic_task.rs b/tokio-sync/src/task/atomic_task.rs index 86ced5aa..c1124494 100644 --- a/tokio-sync/src/task/atomic_task.rs +++ b/tokio-sync/src/task/atomic_task.rs @@ -149,7 +149,7 @@ impl AtomicTask { /// /// This is the same as calling `register_task` with `task::current()`. pub fn register(&self) { - self.register_task(task::current()); + self.do_register(CurrentTask); } /// Registers the provided task to be notified on calls to `notify`. @@ -168,12 +168,19 @@ impl AtomicTask { /// tasks to be notified. One of the callers will win and have its task set, /// but there is no guarantee as to which caller will succeed. pub fn register_task(&self, task: Task) { + self.do_register(ExactTask(task)); + } + + fn do_register<R>(&self, reg: R) + where + R: Register, + { debug!(" + register_task"); match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) { WAITING => { unsafe { // Locked acquired, update the waker cell - self.task.with_mut(|t| *t = Some(task)); + self.task.with_mut(|t| reg.register(&mut *t)); // Release the lock. If the state transitioned to include // the `NOTIFYING` bit, this means that a notify has been @@ -213,7 +220,7 @@ impl AtomicTask { // Currently in the process of notifying the task, i.e., // `notify` is currently being called on the old task handle. // So, we call notify on the new task handle - task.notify(); + reg.notify(); } state => { // In this case, a concurrent thread is holding the @@ -284,3 +291,40 @@ impl fmt::Debug for AtomicTask { unsafe impl Send for AtomicTask {} unsafe impl Sync for AtomicTask {} + +trait Register { + fn register(self, slot: &mut Option<Task>); + fn notify(self); +} + +struct CurrentTask; + +impl Register for CurrentTask { + fn register(self, slot: &mut Option<Task>) { + let should_update = (&*slot).as_ref() + .map(|prev| !prev.will_notify_current()) + .unwrap_or(true); + if should_update { + *slot = Some(task::current()); + } + } + + fn notify(self) { + task::current().notify(); + } +} + +struct ExactTask(Task); + +impl Register for ExactTask { + fn register(self, slot: &mut Option<Task>) { + // When calling register_task with an exact task, it doesn't matter + // if the previous task would have notified current. We *always* want + // to save that exact task. + *slot = Some(self.0); + } + + fn notify(self) { + self.0.notify(); + } +} diff --git a/tokio-sync/tests/atomic_task.rs b/tokio-sync/tests/atomic_task.rs index ccdcbbb4..d630e3ea 100644 --- a/tokio-sync/tests/atomic_task.rs +++ b/tokio-sync/tests/atomic_task.rs @@ -1,7 +1,9 @@ extern crate futures; +extern crate tokio_mock_task; extern crate tokio_sync; -use futures::task::Task; +use futures::task::{self, Task}; +use tokio_mock_task::*; use tokio_sync::task::AtomicTask; trait AssertSend: Send {} @@ -12,3 +14,41 @@ impl AssertSync for AtomicTask {} impl AssertSend for Task {} impl AssertSync for Task {} + +#[test] +fn register_task() { + // AtomicTask::register_task should *always* register the + // arbitrary task. + + let atomic = AtomicTask::new(); + + let mut mock1 = MockTask::new(); + let mut mock2 = MockTask::new(); + + // Register once... + mock1.enter(|| atomic.register()); + + // Grab the actual 2nd task from the mock... + let task2 = mock2.enter(task::current); + + // Now register the 2nd task, even though in the context where + // the first task would be considered 'current'... + { + // Need a block to grab a reference, so that we only move + // task2 into the closure, not the AtomicTask... + let atomic = &atomic; + mock1.enter(move || { + atomic.register_task(task2); + }); + } + + // Just proving that they haven't been notified yet... + assert!(!mock1.is_notified(), "mock1 shouldn't be notified yet"); + assert!(!mock2.is_notified(), "mock2 shouldn't be notified yet"); + + // Now trigger the notify, and ensure it was task2 + atomic.notify(); + + assert!(!mock1.is_notified(), "mock1 shouldn't be notified"); + assert!(mock2.is_notified(), "mock2 should be notified"); +} |