diff options
author | Sean McArthur <sean@seanmonstar.com> | 2019-02-20 12:50:29 -0800 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2019-02-20 12:50:29 -0800 |
commit | f9345f99bb24c2c235ff564033c9d940a18ae0ac (patch) | |
tree | 25559556dffb5c1ee8ddc93319760be9ac28057f /tokio-sync | |
parent | ab206b976c76b628d6770af4b4d4a443aa6ef239 (diff) |
sync: drop old tasks in oneshot (#911)
Diffstat (limited to 'tokio-sync')
-rw-r--r-- | tokio-sync/src/oneshot.rs | 43 | ||||
-rw-r--r-- | tokio-sync/tests/fuzz_oneshot.rs | 73 | ||||
-rw-r--r-- | tokio-sync/tests/oneshot.rs | 12 |
3 files changed, 113 insertions, 15 deletions
diff --git a/tokio-sync/src/oneshot.rs b/tokio-sync/src/oneshot.rs index 9c19febe..c304cccb 100644 --- a/tokio-sync/src/oneshot.rs +++ b/tokio-sync/src/oneshot.rs @@ -156,13 +156,17 @@ impl<T> Sender<T> { } if state.is_tx_task_set() { - let tx_task = unsafe { inner.tx_task() }; + let will_notify = inner.tx_task.with(|ptr| unsafe { + (&*ptr).will_notify_current() + }); - if !tx_task.will_notify_current() { + if !will_notify { state = State::unset_tx_task(&inner.state); if state.is_closed() { return Ok(Async::Ready(())); + } else { + unsafe { inner.drop_tx_task() }; } } } @@ -294,8 +298,9 @@ impl<T> Inner<T> { } if prev.is_rx_task_set() { - let rx_task = unsafe { self.rx_task() }; - rx_task.notify(); + self.rx_task.with(|ptr| unsafe { + (&*ptr).notify() + }); } true @@ -316,18 +321,21 @@ impl<T> Inner<T> { Err(RecvError(())) } else { if state.is_rx_task_set() { - let rx_task = unsafe { self.rx_task() }; + let will_notify = self.rx_task.with(|ptr| unsafe { + (&*ptr).will_notify_current() + }); // Check if the task is still the same - if !rx_task.will_notify_current() { + if !will_notify { // Unset the task state = State::unset_rx_task(&self.state); - if state.is_complete() { return match unsafe { self.consume_value() } { Some(value) => Ok(Ready(value)), None => Err(RecvError(())), }; + } else { + unsafe { self.drop_rx_task() }; } } } @@ -358,8 +366,9 @@ impl<T> Inner<T> { let prev = State::set_closed(&self.state); if prev.is_tx_task_set() && !prev.is_complete() { - let tx_task = unsafe { self.tx_task() }; - tx_task.notify(); + self.tx_task.with(|ptr| unsafe { + (&*ptr).notify() + }); } } @@ -370,8 +379,16 @@ impl<T> Inner<T> { }) } - unsafe fn rx_task(&self) -> &Task { - &*self.rx_task.with(|ptr| ptr) + unsafe fn drop_rx_task(&self) { + self.rx_task.with_mut(|ptr| { + ManuallyDrop::drop(&mut *ptr) + }) + } + + unsafe fn drop_tx_task(&self) { + self.tx_task.with_mut(|ptr| { + ManuallyDrop::drop(&mut *ptr) + }) } unsafe fn set_rx_task(&self) { @@ -380,10 +397,6 @@ impl<T> Inner<T> { }); } - unsafe fn tx_task(&self) -> &Task { - &*self.tx_task.with(|ptr| ptr) - } - unsafe fn set_tx_task(&self) { self.tx_task.with_mut(|ptr| { *ptr = ManuallyDrop::new(task::current()) diff --git a/tokio-sync/tests/fuzz_oneshot.rs b/tokio-sync/tests/fuzz_oneshot.rs index 088f9fc6..1b8cbe33 100644 --- a/tokio-sync/tests/fuzz_oneshot.rs +++ b/tokio-sync/tests/fuzz_oneshot.rs @@ -5,6 +5,7 @@ extern crate loom; #[allow(warnings)] mod oneshot; +use futures::{Async, Future}; use loom::thread; use loom::futures::block_on; @@ -21,3 +22,75 @@ fn smoke() { assert_eq!(1, value); }); } + +#[test] +fn changing_rx_task() { + loom::fuzz(|| { + let (tx, mut rx) = oneshot::channel(); + + thread::spawn(move || { + tx.send(1).unwrap(); + }); + + let rx = thread::spawn(move || { + let t1 = block_on(futures::future::poll_fn(|| { + Ok::<_, ()>(rx.poll().into()) + })).unwrap(); + + match t1 { + Ok(Async::Ready(value)) => { + // ok + assert_eq!(1, value); + None + }, + Ok(Async::NotReady) => { + Some(rx) + }, + Err(_) => unreachable!(), + } + }).join().unwrap(); + + + if let Some(rx) = rx { + // Previous task parked, use a new task... + let value = block_on(rx).unwrap(); + assert_eq!(1, value); + } + + }); +} + +#[test] +fn changing_tx_task() { + loom::fuzz(|| { + let (mut tx, rx) = oneshot::channel::<i32>(); + + thread::spawn(move || { + drop(rx); + }); + + let tx = thread::spawn(move || { + let t1 = block_on(futures::future::poll_fn(|| { + Ok::<_, ()>(tx.poll_close().into()) + })).unwrap(); + + match t1 { + Ok(Async::Ready(())) => { + None + }, + Ok(Async::NotReady) => { + Some(tx) + }, + Err(_) => unreachable!(), + } + }).join().unwrap(); + + + if let Some(mut tx) = tx { + // Previous task parked, use a new task... + block_on(futures::future::poll_fn(move || { + tx.poll_close() + })).unwrap(); + } + }); +} diff --git a/tokio-sync/tests/oneshot.rs b/tokio-sync/tests/oneshot.rs index 57de76fc..3919d858 100644 --- a/tokio-sync/tests/oneshot.rs +++ b/tokio-sync/tests/oneshot.rs @@ -208,10 +208,16 @@ fn receiver_changes_task() { assert_not_ready!(rx.poll()); }); + assert_eq!(2, task1.notifier_ref_count()); + assert_eq!(1, task2.notifier_ref_count()); + task2.enter(|| { assert_not_ready!(rx.poll()); }); + assert_eq!(1, task1.notifier_ref_count()); + assert_eq!(2, task2.notifier_ref_count()); + tx.send(1).unwrap(); assert!(!task1.is_notified()); @@ -231,10 +237,16 @@ fn sender_changes_task() { assert_not_ready!(tx.poll_close()); }); + assert_eq!(2, task1.notifier_ref_count()); + assert_eq!(1, task2.notifier_ref_count()); + task2.enter(|| { assert_not_ready!(tx.poll_close()); }); + assert_eq!(1, task1.notifier_ref_count()); + assert_eq!(2, task2.notifier_ref_count()); + drop(rx); assert!(!task1.is_notified()); |