diff options
Diffstat (limited to 'tokio/src/executor/task/harness.rs')
-rw-r--r-- | tokio/src/executor/task/harness.rs | 61 |
1 files changed, 36 insertions, 25 deletions
diff --git a/tokio/src/executor/task/harness.rs b/tokio/src/executor/task/harness.rs index e5355e4f..e71e9a64 100644 --- a/tokio/src/executor/task/harness.rs +++ b/tokio/src/executor/task/harness.rs @@ -2,16 +2,18 @@ use crate::executor::loom::alloc::Track; use crate::executor::loom::cell::CausalCheck; use crate::executor::task::core::{Cell, Core, Header, Trailer}; use crate::executor::task::state::Snapshot; -use crate::executor::task::{Error, Schedule, Task}; +use crate::executor::task::{JoinError, Schedule, Task}; use std::future::Future; +use std::marker::PhantomData; use std::mem::{ManuallyDrop, MaybeUninit}; use std::ptr::NonNull; use std::task::{Poll, Waker}; /// Typed raw task handle pub(super) struct Harness<T: Future, S: 'static> { - cell: NonNull<Cell<T, S>>, + cell: NonNull<Cell<T>>, + _p: PhantomData<S>, } impl<T, S> Harness<T, S> @@ -22,11 +24,13 @@ where pub(super) unsafe fn from_raw(ptr: *mut ()) -> Harness<T, S> { debug_assert!(!ptr.is_null()); - let cell = NonNull::new_unchecked(ptr as *mut Cell<T, S>); - Harness { cell } + Harness { + cell: NonNull::new_unchecked(ptr as *mut Cell<T>), + _p: PhantomData, + } } - fn header(&self) -> &Header<S> { + fn header(&self) -> &Header { unsafe { &self.cell.as_ref().header } } @@ -51,7 +55,11 @@ where /// Panics raised while polling the future are handled. /// /// Returns `true` if the task needs to be scheduled again - pub(super) fn poll(mut self, executor: &mut dyn FnMut() -> Option<NonNull<S>>) -> bool { + /// + /// # Safety + /// + /// The pointer returned by the `executor` fn must be castable to `*mut S` + pub(super) unsafe fn poll(mut self, executor: &mut dyn FnMut() -> Option<NonNull<()>>) -> bool { use std::panic; // Transition the task to the running state. @@ -67,7 +75,7 @@ where debug_assert!(join_interest || !res.has_join_waker()); // Get the cell components - let cell = unsafe { &mut self.cell.as_mut() }; + let cell = &mut self.cell.as_mut(); let header = &cell.header; let core = &mut cell.core; @@ -76,15 +84,13 @@ where // point, there are no outstanding wakers which might access the // field concurrently. if header.executor().is_none() { - unsafe { - // We don't want the destructor to run because we don't really - // own the task here. - let task = ManuallyDrop::new(Task::from_raw(header.into())); - // Call the scheduler's bind callback - let executor = executor().expect("first poll must happen from an executor"); - executor.as_ref().bind(&task); - header.executor.with_mut(|ptr| *ptr = Some(executor)); - } + // We don't want the destructor to run because we don't really + // own the task here. + let task = ManuallyDrop::new(Task::from_raw(header.into())); + // Call the scheduler's bind callback + let executor = executor().expect("first poll must happen from an executor"); + executor.cast::<S>().as_ref().bind(&task); + header.executor.with_mut(|ptr| *ptr = Some(executor.cast())); } // The transition to `Running` done above ensures that a lock on the @@ -111,7 +117,7 @@ where polled: false, }; - let res = guard.core.poll(header); + let res = guard.core.poll::<S>(header); // prevent the guard from dropping the future guard.polled = true; @@ -136,7 +142,7 @@ where } } Err(err) => { - self.complete(executor, join_interest, Err(Error::panic(err))); + self.complete(executor, join_interest, Err(JoinError::panic(err))); false } } @@ -186,7 +192,7 @@ where state: Snapshot, ) { if state.is_canceled() { - dst.write(Track::new(Err(Error::cancelled()))); + dst.write(Track::new(Err(JoinError::cancelled()))); } else { self.core().read_output(dst); } @@ -306,7 +312,7 @@ where None => panic!("executor should be set"), }; - S::schedule(executor.as_ref(), self.to_task()); + S::schedule(executor.cast().as_ref(), self.to_task()); } } } @@ -382,7 +388,7 @@ where let task = self.to_task(); if let Some(executor) = bound_executor { - executor.as_ref().release(task); + executor.cast::<S>().as_ref().release(task); } else { // Just drop the task. This will release / deallocate memory. drop(task); @@ -394,7 +400,7 @@ where fn complete( mut self, - executor: &mut dyn FnMut() -> Option<NonNull<S>>, + executor: &mut dyn FnMut() -> Option<NonNull<()>>, join_interest: bool, output: super::Result<T::Output>, ) { @@ -412,7 +418,12 @@ where unsafe { // perform a local release let task = ManuallyDrop::new(self.to_task()); - executor.as_ref().unwrap().as_ref().release_local(&task); + executor + .as_ref() + .unwrap() + .cast::<S>() + .as_ref() + .release_local(&task); if self.transition_to_released(join_interest).is_final_ref() { self.dealloc(); @@ -438,7 +449,7 @@ where None => panic!("executor should be set"), }; - executor.as_ref().release(task); + executor.cast::<S>().as_ref().release(task); } } } @@ -542,7 +553,7 @@ where } unsafe fn to_task(&self) -> Task<S> { - let ptr = self.cell.as_ptr() as *mut Header<S>; + let ptr = self.cell.as_ptr() as *mut Header; Task::from_raw(NonNull::new_unchecked(ptr)) } } |