summaryrefslogtreecommitdiffstats
path: root/tokio-sync
diff options
context:
space:
mode:
authorSean McArthur <sean@seanmonstar.com>2019-02-20 12:50:29 -0800
committerCarl Lerche <me@carllerche.com>2019-02-20 12:50:29 -0800
commitf9345f99bb24c2c235ff564033c9d940a18ae0ac (patch)
tree25559556dffb5c1ee8ddc93319760be9ac28057f /tokio-sync
parentab206b976c76b628d6770af4b4d4a443aa6ef239 (diff)
sync: drop old tasks in oneshot (#911)
Diffstat (limited to 'tokio-sync')
-rw-r--r--tokio-sync/src/oneshot.rs43
-rw-r--r--tokio-sync/tests/fuzz_oneshot.rs73
-rw-r--r--tokio-sync/tests/oneshot.rs12
3 files changed, 113 insertions, 15 deletions
diff --git a/tokio-sync/src/oneshot.rs b/tokio-sync/src/oneshot.rs
index 9c19febe..c304cccb 100644
--- a/tokio-sync/src/oneshot.rs
+++ b/tokio-sync/src/oneshot.rs
@@ -156,13 +156,17 @@ impl<T> Sender<T> {
}
if state.is_tx_task_set() {
- let tx_task = unsafe { inner.tx_task() };
+ let will_notify = inner.tx_task.with(|ptr| unsafe {
+ (&*ptr).will_notify_current()
+ });
- if !tx_task.will_notify_current() {
+ if !will_notify {
state = State::unset_tx_task(&inner.state);
if state.is_closed() {
return Ok(Async::Ready(()));
+ } else {
+ unsafe { inner.drop_tx_task() };
}
}
}
@@ -294,8 +298,9 @@ impl<T> Inner<T> {
}
if prev.is_rx_task_set() {
- let rx_task = unsafe { self.rx_task() };
- rx_task.notify();
+ self.rx_task.with(|ptr| unsafe {
+ (&*ptr).notify()
+ });
}
true
@@ -316,18 +321,21 @@ impl<T> Inner<T> {
Err(RecvError(()))
} else {
if state.is_rx_task_set() {
- let rx_task = unsafe { self.rx_task() };
+ let will_notify = self.rx_task.with(|ptr| unsafe {
+ (&*ptr).will_notify_current()
+ });
// Check if the task is still the same
- if !rx_task.will_notify_current() {
+ if !will_notify {
// Unset the task
state = State::unset_rx_task(&self.state);
-
if state.is_complete() {
return match unsafe { self.consume_value() } {
Some(value) => Ok(Ready(value)),
None => Err(RecvError(())),
};
+ } else {
+ unsafe { self.drop_rx_task() };
}
}
}
@@ -358,8 +366,9 @@ impl<T> Inner<T> {
let prev = State::set_closed(&self.state);
if prev.is_tx_task_set() && !prev.is_complete() {
- let tx_task = unsafe { self.tx_task() };
- tx_task.notify();
+ self.tx_task.with(|ptr| unsafe {
+ (&*ptr).notify()
+ });
}
}
@@ -370,8 +379,16 @@ impl<T> Inner<T> {
})
}
- unsafe fn rx_task(&self) -> &Task {
- &*self.rx_task.with(|ptr| ptr)
+ unsafe fn drop_rx_task(&self) {
+ self.rx_task.with_mut(|ptr| {
+ ManuallyDrop::drop(&mut *ptr)
+ })
+ }
+
+ unsafe fn drop_tx_task(&self) {
+ self.tx_task.with_mut(|ptr| {
+ ManuallyDrop::drop(&mut *ptr)
+ })
}
unsafe fn set_rx_task(&self) {
@@ -380,10 +397,6 @@ impl<T> Inner<T> {
});
}
- unsafe fn tx_task(&self) -> &Task {
- &*self.tx_task.with(|ptr| ptr)
- }
-
unsafe fn set_tx_task(&self) {
self.tx_task.with_mut(|ptr| {
*ptr = ManuallyDrop::new(task::current())
diff --git a/tokio-sync/tests/fuzz_oneshot.rs b/tokio-sync/tests/fuzz_oneshot.rs
index 088f9fc6..1b8cbe33 100644
--- a/tokio-sync/tests/fuzz_oneshot.rs
+++ b/tokio-sync/tests/fuzz_oneshot.rs
@@ -5,6 +5,7 @@ extern crate loom;
#[allow(warnings)]
mod oneshot;
+use futures::{Async, Future};
use loom::thread;
use loom::futures::block_on;
@@ -21,3 +22,75 @@ fn smoke() {
assert_eq!(1, value);
});
}
+
+#[test]
+fn changing_rx_task() {
+ loom::fuzz(|| {
+ let (tx, mut rx) = oneshot::channel();
+
+ thread::spawn(move || {
+ tx.send(1).unwrap();
+ });
+
+ let rx = thread::spawn(move || {
+ let t1 = block_on(futures::future::poll_fn(|| {
+ Ok::<_, ()>(rx.poll().into())
+ })).unwrap();
+
+ match t1 {
+ Ok(Async::Ready(value)) => {
+ // ok
+ assert_eq!(1, value);
+ None
+ },
+ Ok(Async::NotReady) => {
+ Some(rx)
+ },
+ Err(_) => unreachable!(),
+ }
+ }).join().unwrap();
+
+
+ if let Some(rx) = rx {
+ // Previous task parked, use a new task...
+ let value = block_on(rx).unwrap();
+ assert_eq!(1, value);
+ }
+
+ });
+}
+
+#[test]
+fn changing_tx_task() {
+ loom::fuzz(|| {
+ let (mut tx, rx) = oneshot::channel::<i32>();
+
+ thread::spawn(move || {
+ drop(rx);
+ });
+
+ let tx = thread::spawn(move || {
+ let t1 = block_on(futures::future::poll_fn(|| {
+ Ok::<_, ()>(tx.poll_close().into())
+ })).unwrap();
+
+ match t1 {
+ Ok(Async::Ready(())) => {
+ None
+ },
+ Ok(Async::NotReady) => {
+ Some(tx)
+ },
+ Err(_) => unreachable!(),
+ }
+ }).join().unwrap();
+
+
+ if let Some(mut tx) = tx {
+ // Previous task parked, use a new task...
+ block_on(futures::future::poll_fn(move || {
+ tx.poll_close()
+ })).unwrap();
+ }
+ });
+}
diff --git a/tokio-sync/tests/oneshot.rs b/tokio-sync/tests/oneshot.rs
index 57de76fc..3919d858 100644
--- a/tokio-sync/tests/oneshot.rs
+++ b/tokio-sync/tests/oneshot.rs
@@ -208,10 +208,16 @@ fn receiver_changes_task() {
assert_not_ready!(rx.poll());
});
+ assert_eq!(2, task1.notifier_ref_count());
+ assert_eq!(1, task2.notifier_ref_count());
+
task2.enter(|| {
assert_not_ready!(rx.poll());
});
+ assert_eq!(1, task1.notifier_ref_count());
+ assert_eq!(2, task2.notifier_ref_count());
+
tx.send(1).unwrap();
assert!(!task1.is_notified());
@@ -231,10 +237,16 @@ fn sender_changes_task() {
assert_not_ready!(tx.poll_close());
});
+ assert_eq!(2, task1.notifier_ref_count());
+ assert_eq!(1, task2.notifier_ref_count());
+
task2.enter(|| {
assert_not_ready!(tx.poll_close());
});
+ assert_eq!(1, task1.notifier_ref_count());
+ assert_eq!(2, task2.notifier_ref_count());
+
drop(rx);
assert!(!task1.is_notified());