summaryrefslogtreecommitdiffstats
path: root/tokio/src/executor/task/harness.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/executor/task/harness.rs')
-rw-r--r--tokio/src/executor/task/harness.rs61
1 files changed, 36 insertions, 25 deletions
diff --git a/tokio/src/executor/task/harness.rs b/tokio/src/executor/task/harness.rs
index e5355e4f..e71e9a64 100644
--- a/tokio/src/executor/task/harness.rs
+++ b/tokio/src/executor/task/harness.rs
@@ -2,16 +2,18 @@ use crate::executor::loom::alloc::Track;
use crate::executor::loom::cell::CausalCheck;
use crate::executor::task::core::{Cell, Core, Header, Trailer};
use crate::executor::task::state::Snapshot;
-use crate::executor::task::{Error, Schedule, Task};
+use crate::executor::task::{JoinError, Schedule, Task};
use std::future::Future;
+use std::marker::PhantomData;
use std::mem::{ManuallyDrop, MaybeUninit};
use std::ptr::NonNull;
use std::task::{Poll, Waker};
/// Typed raw task handle
pub(super) struct Harness<T: Future, S: 'static> {
- cell: NonNull<Cell<T, S>>,
+ cell: NonNull<Cell<T>>,
+ _p: PhantomData<S>,
}
impl<T, S> Harness<T, S>
@@ -22,11 +24,13 @@ where
pub(super) unsafe fn from_raw(ptr: *mut ()) -> Harness<T, S> {
debug_assert!(!ptr.is_null());
- let cell = NonNull::new_unchecked(ptr as *mut Cell<T, S>);
- Harness { cell }
+ Harness {
+ cell: NonNull::new_unchecked(ptr as *mut Cell<T>),
+ _p: PhantomData,
+ }
}
- fn header(&self) -> &Header<S> {
+ fn header(&self) -> &Header {
unsafe { &self.cell.as_ref().header }
}
@@ -51,7 +55,11 @@ where
/// Panics raised while polling the future are handled.
///
/// Returns `true` if the task needs to be scheduled again
- pub(super) fn poll(mut self, executor: &mut dyn FnMut() -> Option<NonNull<S>>) -> bool {
+ ///
+ /// # Safety
+ ///
+ /// The pointer returned by the `executor` fn must be castable to `*mut S`
+ pub(super) unsafe fn poll(mut self, executor: &mut dyn FnMut() -> Option<NonNull<()>>) -> bool {
use std::panic;
// Transition the task to the running state.
@@ -67,7 +75,7 @@ where
debug_assert!(join_interest || !res.has_join_waker());
// Get the cell components
- let cell = unsafe { &mut self.cell.as_mut() };
+ let cell = &mut self.cell.as_mut();
let header = &cell.header;
let core = &mut cell.core;
@@ -76,15 +84,13 @@ where
// point, there are no outstanding wakers which might access the
// field concurrently.
if header.executor().is_none() {
- unsafe {
- // We don't want the destructor to run because we don't really
- // own the task here.
- let task = ManuallyDrop::new(Task::from_raw(header.into()));
- // Call the scheduler's bind callback
- let executor = executor().expect("first poll must happen from an executor");
- executor.as_ref().bind(&task);
- header.executor.with_mut(|ptr| *ptr = Some(executor));
- }
+ // We don't want the destructor to run because we don't really
+ // own the task here.
+ let task = ManuallyDrop::new(Task::from_raw(header.into()));
+ // Call the scheduler's bind callback
+ let executor = executor().expect("first poll must happen from an executor");
+ executor.cast::<S>().as_ref().bind(&task);
+ header.executor.with_mut(|ptr| *ptr = Some(executor.cast()));
}
// The transition to `Running` done above ensures that a lock on the
@@ -111,7 +117,7 @@ where
polled: false,
};
- let res = guard.core.poll(header);
+ let res = guard.core.poll::<S>(header);
// prevent the guard from dropping the future
guard.polled = true;
@@ -136,7 +142,7 @@ where
}
}
Err(err) => {
- self.complete(executor, join_interest, Err(Error::panic(err)));
+ self.complete(executor, join_interest, Err(JoinError::panic(err)));
false
}
}
@@ -186,7 +192,7 @@ where
state: Snapshot,
) {
if state.is_canceled() {
- dst.write(Track::new(Err(Error::cancelled())));
+ dst.write(Track::new(Err(JoinError::cancelled())));
} else {
self.core().read_output(dst);
}
@@ -306,7 +312,7 @@ where
None => panic!("executor should be set"),
};
- S::schedule(executor.as_ref(), self.to_task());
+ S::schedule(executor.cast().as_ref(), self.to_task());
}
}
}
@@ -382,7 +388,7 @@ where
let task = self.to_task();
if let Some(executor) = bound_executor {
- executor.as_ref().release(task);
+ executor.cast::<S>().as_ref().release(task);
} else {
// Just drop the task. This will release / deallocate memory.
drop(task);
@@ -394,7 +400,7 @@ where
fn complete(
mut self,
- executor: &mut dyn FnMut() -> Option<NonNull<S>>,
+ executor: &mut dyn FnMut() -> Option<NonNull<()>>,
join_interest: bool,
output: super::Result<T::Output>,
) {
@@ -412,7 +418,12 @@ where
unsafe {
// perform a local release
let task = ManuallyDrop::new(self.to_task());
- executor.as_ref().unwrap().as_ref().release_local(&task);
+ executor
+ .as_ref()
+ .unwrap()
+ .cast::<S>()
+ .as_ref()
+ .release_local(&task);
if self.transition_to_released(join_interest).is_final_ref() {
self.dealloc();
@@ -438,7 +449,7 @@ where
None => panic!("executor should be set"),
};
- executor.as_ref().release(task);
+ executor.cast::<S>().as_ref().release(task);
}
}
}
@@ -542,7 +553,7 @@ where
}
unsafe fn to_task(&self) -> Task<S> {
- let ptr = self.cell.as_ptr() as *mut Header<S>;
+ let ptr = self.cell.as_ptr() as *mut Header;
Task::from_raw(NonNull::new_unchecked(ptr))
}
}