summaryrefslogtreecommitdiffstats
path: root/tokio/src/runtime/task/tests/task.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/runtime/task/tests/task.rs')
-rw-r--r--tokio/src/runtime/task/tests/task.rs643
1 files changed, 643 insertions, 0 deletions
diff --git a/tokio/src/runtime/task/tests/task.rs b/tokio/src/runtime/task/tests/task.rs
new file mode 100644
index 00000000..4b2dd2d8
--- /dev/null
+++ b/tokio/src/runtime/task/tests/task.rs
@@ -0,0 +1,643 @@
+use crate::runtime::task::{self, Header};
+use crate::runtime::tests::backoff::*;
+use crate::runtime::tests::mock_schedule::{mock, Mock};
+use crate::runtime::tests::track_drop::track_drop;
+use crate::sync::oneshot;
+
+use tokio_test::task::spawn;
+use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok};
+
+use futures_util::future::poll_fn;
+use std::sync::mpsc;
+
+#[test]
+fn header_lte_cache_line() {
+ use std::mem::size_of;
+
+ assert!(size_of::<Header>() <= 8 * size_of::<*const ()>());
+}
+
+#[test]
+fn create_complete_drop() {
+ let (tx, rx) = mpsc::channel();
+
+ let (task, did_drop) = track_drop(async move {
+ tx.send(1).unwrap();
+ });
+
+ let task = task::background(task);
+
+ let mock = mock().bind(&task).release_local();
+ let mock = &mut || Some(From::from(&mock));
+
+ // Nothing is returned
+ assert!(task.run(mock).is_none());
+
+ // The message was sent
+ assert!(rx.try_recv().is_ok());
+
+ // The future & output were dropped.
+ assert!(did_drop.did_drop_future());
+ assert!(did_drop.did_drop_output());
+}
+
+#[test]
+fn create_yield_complete_drop() {
+ let (tx, rx) = mpsc::channel();
+
+ let (task, did_drop) = track_drop(async move {
+ backoff(1).await;
+ tx.send(1).unwrap();
+ });
+
+ let task = task::background(task);
+
+ let mock = mock().bind(&task).release_local();
+ let mock = || Some(From::from(&mock));
+
+ // Task is returned
+ let task = assert_some!(task.run(mock));
+
+ // The future was **not** dropped.
+ assert!(!did_drop.did_drop_future());
+
+ assert_none!(task.run(mock));
+
+ // The message was sent
+ assert!(rx.try_recv().is_ok());
+
+ // The future was dropped.
+ assert!(did_drop.did_drop_future());
+ assert!(did_drop.did_drop_output());
+}
+
+#[test]
+fn create_clone_yield_complete_drop() {
+ let (tx, rx) = mpsc::channel();
+
+ let (task, did_drop) = track_drop(async move {
+ backoff_clone(1).await;
+ tx.send(1).unwrap();
+ });
+
+ let task = task::background(task);
+
+ let mock = mock().bind(&task).release_local();
+ let mock = || Some(From::from(&mock));
+
+ // Task is returned
+ let task = assert_some!(task.run(mock));
+
+ // The future was **not** dropped.
+ assert!(!did_drop.did_drop_future());
+
+ assert_none!(task.run(mock));
+
+ // The message was sent
+ assert!(rx.try_recv().is_ok());
+
+ // The future was dropped.
+ assert!(did_drop.did_drop_future());
+ assert!(did_drop.did_drop_output());
+}
+
+#[test]
+fn create_wake_drop() {
+ let (tx, rx) = oneshot::channel();
+
+ let (task, did_drop) = track_drop(async move { rx.await });
+
+ let task = task::background(task);
+
+ let mock = mock().bind(&task).schedule().release_local();
+
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
+ assert_none!(mock.next_pending_run());
+
+ // The future was **not** dropped.
+ assert!(!did_drop.did_drop_future());
+
+ tx.send("hello").unwrap();
+
+ let task = assert_some!(mock.next_pending_run());
+
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
+
+ // The future was dropped.
+ assert!(did_drop.did_drop_future());
+ assert!(did_drop.did_drop_output());
+}
+
+#[test]
+fn notify_complete() {
+ use std::task::Poll::Ready;
+
+ let (task, did_drop) = track_drop(async move {
+ poll_fn(|cx| {
+ cx.waker().wake_by_ref();
+ Ready(())
+ })
+ .await;
+ });
+
+ let task = task::background(task);
+
+ let mock = mock().bind(&task).release_local();
+ let mock = &mut || Some(From::from(&mock));
+
+ assert_none!(task.run(mock));
+ assert!(did_drop.did_drop_future());
+ assert!(did_drop.did_drop_output());
+}
+
+#[test]
+fn complete_on_second_schedule_obj() {
+ let (tx, rx) = mpsc::channel();
+
+ let (task, did_drop) = track_drop(async move {
+ backoff(1).await;
+ tx.send(1).unwrap();
+ });
+
+ let task = task::background(task);
+
+ let mock1 = mock();
+ let mock2 = mock().bind(&task).release();
+
+ // Task is returned
+ let task = assert_some!(task.run(&mut || Some(From::from(&mock2))));
+
+ assert_none!(task.run(&mut || Some(From::from(&mock1))));
+
+ // The message was sent
+ assert!(rx.try_recv().is_ok());
+
+ // The future was dropped.
+ assert!(did_drop.did_drop_future());
+ assert!(did_drop.did_drop_output());
+
+ let _ = assert_some!(mock2.next_pending_drop());
+}
+
+#[test]
+fn join_task_immediate_drop_handle() {
+ let (task, did_drop) = track_drop(async move { "hello".to_string() });
+
+ let (task, _) = task::joinable(task);
+
+ let mock = mock().bind(&task).release_local();
+
+ assert!(task.run(&mut || Some(From::from(&mock))).is_none());
+
+ assert!(did_drop.did_drop_future());
+ assert!(did_drop.did_drop_output());
+}
+
+#[test]
+fn join_task_immediate_complete_1() {
+ let (task, did_drop) = track_drop(async move { "hello".to_string() });
+
+ let (task, handle) = task::joinable(task);
+ let mut handle = spawn(handle);
+
+ let mock = mock().bind(&task).release_local();
+
+ assert!(task.run(&mut || Some(From::from(&mock))).is_none());
+
+ assert!(did_drop.did_drop_future());
+ assert!(!did_drop.did_drop_output());
+ assert!(!handle.is_woken());
+
+ let out = assert_ready_ok!(handle.poll());
+ assert_eq!(out.get_ref(), "hello");
+
+ drop(out);
+
+ assert!(did_drop.did_drop_output());
+}
+
+#[test]
+fn join_task_immediate_complete_2() {
+ let (task, did_drop) = track_drop(async move { "hello".to_string() });
+
+ let (task, handle) = task::joinable(task);
+ let mut handle = spawn(handle);
+
+ let mock = mock().bind(&task).release_local();
+
+ assert_pending!(handle.poll());
+
+ assert!(task.run(&mut || Some(From::from(&mock))).is_none());
+
+ assert!(did_drop.did_drop_future());
+ assert!(!did_drop.did_drop_output());
+ assert!(handle.is_woken());
+
+ let out = assert_ready_ok!(handle.poll());
+ assert_eq!(out.get_ref(), "hello");
+
+ drop(out);
+
+ assert!(did_drop.did_drop_output());
+}
+
+#[test]
+fn join_task_complete_later() {
+ let (task, did_drop) = track_drop(async move {
+ backoff(1).await;
+ "hello".to_string()
+ });
+
+ let (task, handle) = task::joinable(task);
+ let mut handle = spawn(async { handle.await });
+
+ let mock = mock().bind(&task).release_local();
+
+ let task = assert_some!(task.run(&mut || Some(From::from(&mock))));
+
+ assert!(!did_drop.did_drop_future());
+ assert!(!did_drop.did_drop_output());
+
+ assert_pending!(handle.poll());
+
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
+ assert!(handle.is_woken());
+
+ let out = assert_ready_ok!(handle.poll());
+ assert_eq!(out.get_ref(), "hello");
+
+ drop(out);
+
+ assert!(did_drop.did_drop_output());
+
+ assert_eq!(1, handle.waker_ref_count());
+}
+
+#[test]
+fn drop_join_after_poll() {
+ let (task, did_drop) = track_drop(async move {
+ backoff(1).await;
+ "hello".to_string()
+ });
+
+ let (task, handle) = task::joinable(task);
+ let mut handle = spawn(async { handle.await });
+
+ let mock = mock().bind(&task).release_local();
+
+ assert_pending!(handle.poll());
+ drop(handle);
+
+ let task = assert_some!(task.run(&mut || Some(From::from(&mock))));
+
+ assert!(!did_drop.did_drop_future());
+ assert!(!did_drop.did_drop_output());
+
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
+
+ assert!(did_drop.did_drop_future());
+ assert!(did_drop.did_drop_output());
+}
+
+#[test]
+fn join_handle_change_task_complete() {
+ use std::future::Future;
+ use std::pin::Pin;
+
+ let (task, did_drop) = track_drop(async move {
+ backoff(1).await;
+ "hello".to_string()
+ });
+
+ let (task, mut handle) = task::joinable(task);
+ let mut t1 = spawn(poll_fn(|cx| Pin::new(&mut handle).poll(cx)));
+
+ let mock = mock().bind(&task).release_local();
+
+ assert_pending!(t1.poll());
+ drop(t1);
+
+ let task = assert_some!(task.run(&mut || Some(From::from(&mock))));
+
+ let mut t2 = spawn(poll_fn(|cx| Pin::new(&mut handle).poll(cx)));
+ assert_pending!(t2.poll());
+
+ assert!(!did_drop.did_drop_future());
+ assert!(!did_drop.did_drop_output());
+
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
+
+ assert!(t2.is_woken());
+
+ let out = assert_ready_ok!(t2.poll());
+ assert_eq!(out.get_ref(), "hello");
+
+ drop(out);
+
+ assert!(did_drop.did_drop_output());
+
+ assert_eq!(1, t2.waker_ref_count());
+}
+
+#[test]
+fn drop_handle_after_complete() {
+ let (task, did_drop) = track_drop(async move { "hello".to_string() });
+
+ let (task, handle) = task::joinable(task);
+
+ let mock = mock().bind(&task).release_local();
+
+ assert!(task.run(&mut || Some(From::from(&mock))).is_none());
+
+ assert!(did_drop.did_drop_future());
+ assert!(!did_drop.did_drop_output());
+
+ drop(handle);
+
+ assert!(did_drop.did_drop_output());
+}
+
+#[test]
+fn non_initial_task_state_drop_join_handle_without_polling() {
+ let (tx, rx) = oneshot::channel::<()>();
+
+ let (task, did_drop) = track_drop(async move {
+ rx.await.unwrap();
+ "hello".to_string()
+ });
+
+ let (task, handle) = task::joinable(task);
+
+ let mock = mock().bind(&task).schedule().release_local();
+
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
+
+ drop(handle);
+
+ assert!(!did_drop.did_drop_future());
+ assert!(!did_drop.did_drop_output());
+
+ tx.send(()).unwrap();
+ let task = assert_some!(mock.next_pending_run());
+
+ assert!(task.run(&mut || Some(From::from(&mock))).is_none());
+
+ assert!(did_drop.did_drop_future());
+ assert!(did_drop.did_drop_output());
+}
+
+#[test]
+#[cfg(not(miri))]
+fn task_panic_background() {
+ let (task, did_drop) = track_drop(async move {
+ if true {
+ panic!()
+ }
+ "hello"
+ });
+
+ let task = task::background(task);
+
+ let mock = mock().bind(&task).release_local();
+
+ assert!(task.run(&mut || Some(From::from(&mock))).is_none());
+
+ assert!(did_drop.did_drop_future());
+}
+
+#[test]
+#[cfg(not(miri))]
+fn task_panic_join() {
+ let (task, did_drop) = track_drop(async move {
+ if true {
+ panic!()
+ }
+ "hello"
+ });
+
+ let (task, handle) = task::joinable(task);
+ let mut handle = spawn(handle);
+
+ let mock = mock().bind(&task).release_local();
+
+ assert_pending!(handle.poll());
+
+ assert!(task.run(&mut || Some(From::from(&mock))).is_none());
+ assert!(did_drop.did_drop_future());
+ assert!(handle.is_woken());
+
+ assert_ready_err!(handle.poll());
+}
+
+#[test]
+fn complete_second_schedule_obj_before_join() {
+ let (tx, rx) = oneshot::channel();
+
+ let (task, did_drop) = track_drop(async move { rx.await.unwrap() });
+
+ let (task, handle) = task::joinable(task);
+ let mut handle = spawn(handle);
+
+ let mock1 = mock();
+ let mock2 = mock().bind(&task).schedule().release();
+
+ assert_pending!(handle.poll());
+
+ assert_none!(task.run(&mut || Some(From::from(&mock2))));
+
+ tx.send("hello").unwrap();
+
+ let task = assert_some!(mock2.next_pending_run());
+ assert_none!(task.run(&mut || Some(From::from(&mock1))));
+ assert!(did_drop.did_drop_future());
+
+ // The join handle was notified
+ assert!(handle.is_woken());
+
+ // Drop the task
+ let _ = assert_some!(mock2.next_pending_drop());
+
+ // Get the output
+ let out = assert_ready_ok!(handle.poll());
+ assert_eq!(*out.get_ref(), "hello");
+}
+
+#[test]
+fn complete_second_schedule_obj_after_join() {
+ let (tx, rx) = oneshot::channel();
+
+ let (task, did_drop) = track_drop(async move { rx.await.unwrap() });
+
+ let (task, handle) = task::joinable(task);
+ let mut handle = spawn(handle);
+
+ let mock1 = mock();
+ let mock2 = mock().bind(&task).schedule().release();
+
+ assert_pending!(handle.poll());
+
+ assert_none!(task.run(&mut || Some(From::from(&mock2))));
+
+ tx.send("hello").unwrap();
+
+ let task = assert_some!(mock2.next_pending_run());
+ assert_none!(task.run(&mut || Some(From::from(&mock1))));
+ assert!(did_drop.did_drop_future());
+
+ // The join handle was notified
+ assert!(handle.is_woken());
+
+ // Get the output
+ let out = assert_ready_ok!(handle.poll());
+ assert_eq!(*out.get_ref(), "hello");
+
+ // Drop the task
+ let _ = assert_some!(mock2.next_pending_drop());
+
+ assert_eq!(1, handle.waker_ref_count());
+}
+
+#[test]
+fn shutdown_from_list_before_notified() {
+ let (tx, rx) = oneshot::channel::<()>();
+ let mut list = task::OwnedList::new();
+
+ let (task, did_drop) = track_drop(async move { rx.await });
+
+ let (task, handle) = task::joinable(task);
+ let mut handle = spawn(handle);
+
+ list.insert(&task);
+
+ let mock = mock().bind(&task).release();
+
+ assert_pending!(handle.poll());
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
+
+ list.shutdown();
+ assert!(did_drop.did_drop_future());
+
+ assert!(handle.is_woken());
+
+ let task = assert_some!(mock.next_pending_drop());
+ drop(task);
+
+ assert_ready_err!(handle.poll());
+
+ drop(tx);
+}
+
+#[test]
+fn shutdown_from_list_after_notified() {
+ let (tx, rx) = oneshot::channel::<()>();
+ let mut list = task::OwnedList::new();
+
+ let (task, did_drop) = track_drop(async move { rx.await });
+
+ let (task, handle) = task::joinable(task);
+ let mut handle = spawn(handle);
+
+ list.insert(&task);
+
+ let mock = mock().bind(&task).schedule().release();
+
+ assert_pending!(handle.poll());
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
+
+ tx.send(()).unwrap();
+
+ let task = assert_some!(mock.next_pending_run());
+
+ list.shutdown();
+
+ assert_none!(mock.next_pending_drop());
+
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
+ assert!(did_drop.did_drop_future());
+ assert!(handle.is_woken());
+
+ let task = assert_some!(mock.next_pending_drop());
+ drop(task);
+
+ assert_ready_err!(handle.poll());
+}
+
+#[test]
+fn shutdown_from_list_after_complete() {
+ let mut list = task::OwnedList::new();
+
+ let (task, did_drop) = track_drop(async move {
+ backoff(1).await;
+ "hello"
+ });
+
+ let (task, handle) = task::joinable(task);
+ let mut handle = spawn(handle);
+
+ list.insert(&task);
+
+ let m1 = mock().bind(&task).release();
+ let m2 = mock();
+
+ assert_pending!(handle.poll());
+ let task = assert_some!(task.run(&mut || Some(From::from(&m1))));
+ assert_none!(task.run(&mut || Some(From::from(&m2))));
+ assert!(did_drop.did_drop_future());
+ assert!(handle.is_woken());
+
+ list.shutdown();
+
+ let task = assert_some!(m1.next_pending_drop());
+ drop(task);
+
+ let out = assert_ready_ok!(handle.poll());
+ assert_eq!(*out.get_ref(), "hello");
+}
+
+#[test]
+fn shutdown_from_task_before_notified() {
+ let (tx, rx) = oneshot::channel::<()>();
+
+ let (task, did_drop) = track_drop(async move { rx.await });
+
+ let (task, handle) = task::joinable::<_, Mock>(task);
+ let mut handle = spawn(handle);
+
+ assert_pending!(handle.poll());
+
+ task.shutdown();
+ assert!(did_drop.did_drop_future());
+ assert!(handle.is_woken());
+
+ assert_ready_err!(handle.poll());
+
+ drop(tx);
+}
+
+#[test]
+fn shutdown_from_task_after_notified() {
+ let (tx, rx) = oneshot::channel::<()>();
+
+ let (task, did_drop) = track_drop(async move { rx.await });
+
+ let (task, handle) = task::joinable(task);
+ let mut handle = spawn(handle);
+
+ let mock = mock().bind(&task).schedule().release();
+
+ assert_pending!(handle.poll());
+ assert_none!(task.run(&mut || Some(From::from(&mock))));
+
+ tx.send(()).unwrap();
+
+ let task = assert_some!(mock.next_pending_run());
+
+ task.shutdown();
+ assert!(did_drop.did_drop_future());
+ assert!(handle.is_woken());
+
+ let task = assert_some!(mock.next_pending_drop());
+ drop(task);
+
+ assert_ready_err!(handle.poll());
+}