diff options
Diffstat (limited to 'tokio-current-thread/src/scheduler.rs')
-rw-r--r-- | tokio-current-thread/src/scheduler.rs | 170 |
1 files changed, 102 insertions, 68 deletions
diff --git a/tokio-current-thread/src/scheduler.rs b/tokio-current-thread/src/scheduler.rs index decef395..709ac415 100644 --- a/tokio-current-thread/src/scheduler.rs +++ b/tokio-current-thread/src/scheduler.rs @@ -1,14 +1,14 @@ use crate::Borrow; -use futures::executor::{self, NotifyHandle, Spawn, UnsafeNotify}; -use futures::{Async, Future}; use std::cell::UnsafeCell; use std::fmt::{self, Debug}; -use std::marker::PhantomData; +use std::future::Future; use std::mem; +use std::pin::Pin; use std::ptr; use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst}; use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize}; use std::sync::{Arc, Weak}; +use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; use std::thread; use std::usize; use tokio_executor::park::Unpark; @@ -22,8 +22,6 @@ pub struct Scheduler<U> { nodes: List<U>, } -pub struct Notify<'a, U>(&'a Arc<Node<U>>); - // A linked-list of nodes struct List<U> { len: usize, @@ -78,12 +76,6 @@ struct Inner<U> { unsafe impl<U: Sync + Send> Send for Inner<U> {} unsafe impl<U: Sync + Send> Sync for Inner<U> {} -impl<U: Unpark> executor::Notify for Inner<U> { - fn notify(&self, _: usize) { - self.unpark.unpark(); - } -} - struct Node<U> { // The item item: UnsafeCell<Option<Task>>, @@ -123,12 +115,12 @@ enum Dequeue<U> { } /// Wraps a spawned boxed future -struct Task(Spawn<Box<dyn Future<Item = (), Error = ()>>>); +struct Task(Pin<Box<dyn Future<Output = ()>>>); /// A task that is scheduled. `turn` must be called pub struct Scheduled<'a, U> { task: &'a mut Task, - notify: &'a Notify<'a, U>, + node: &'a Arc<Node<U>>, done: &'a mut bool, } @@ -165,11 +157,11 @@ where } } - pub fn notify(&self) -> NotifyHandle { - self.inner.clone().into() + pub fn waker(&self) -> Waker { + waker_inner(self.inner.clone()) } - pub fn schedule(&mut self, item: Box<dyn Future<Item = (), Error = ()>>) { + pub fn schedule(&mut self, item: Pin<Box<dyn Future<Output = ()>>>) { // Get the current scheduler tick let tick_num = self.inner.tick_num.load(SeqCst); @@ -317,11 +309,10 @@ where // deallocating the node if need be. let borrow = &mut *bomb.borrow; let enter = &mut *bomb.enter; - let notify = Notify(bomb.node.as_ref().unwrap()); let mut scheduled = Scheduled { task: item, - notify: ¬ify, + node: bomb.node.as_ref().unwrap(), done: &mut done, }; @@ -345,10 +336,15 @@ where impl<'a, U: Unpark> Scheduled<'a, U> { /// Polls the task, returns `true` if the task has completed. pub fn tick(&mut self) -> bool { - // Tick the future - let ret = match self.task.0.poll_future_notify(self.notify, 0) { - Ok(Async::Ready(_)) | Err(_) => true, - Ok(Async::NotReady) => false, + let waker = unsafe { + // Safety: we don't hold this waker ref longer than + // this `tick` function + waker_ref(self.node) + }; + let mut cx = Context::from_waker(&waker); + let ret = match self.task.0.as_mut().poll(&mut cx) { + Poll::Ready(()) => true, + Poll::Pending => false, }; *self.done = ret; @@ -357,8 +353,8 @@ impl<'a, U: Unpark> Scheduled<'a, U> { } impl Task { - pub fn new(future: Box<dyn Future<Item = (), Error = ()> + 'static>) -> Self { - Task(executor::spawn(future)) + pub fn new(future: Pin<Box<dyn Future<Output = ()> + 'static>>) -> Self { + Task(future) } } @@ -630,63 +626,101 @@ impl<U> List<U> { } } -impl<'a, U> Clone for Notify<'a, U> { - fn clone(&self) -> Self { - Notify(self.0) - } +unsafe fn noop(_: *const ()) {} + +// ===== Raw Waker Inner<U> ====== + +fn waker_inner<U: Unpark>(inner: Arc<Inner<U>>) -> Waker { + let ptr = Arc::into_raw(inner) as *const (); + let vtable = &RawWakerVTable::new( + clone_inner::<U>, + wake_inner::<U>, + wake_by_ref_inner::<U>, + drop_inner::<U>, + ); + + unsafe { Waker::from_raw(RawWaker::new(ptr, vtable)) } } -impl<'a, U> fmt::Debug for Notify<'a, U> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Notify").finish() - } +unsafe fn clone_inner<U: Unpark>(data: *const ()) -> RawWaker { + let arc: Arc<Inner<U>> = Arc::from_raw(data as *const Inner<U>); + let clone = arc.clone(); + // forget both Arcs so the refcounts don't get decremented + mem::forget(arc); + mem::forget(clone); + + let vtable = &RawWakerVTable::new( + clone_inner::<U>, + wake_inner::<U>, + wake_by_ref_inner::<U>, + drop_inner::<U>, + ); + RawWaker::new(data, vtable) } -impl<'a, U: Unpark> From<Notify<'a, U>> for NotifyHandle { - fn from(handle: Notify<'a, U>) -> NotifyHandle { - unsafe { - let ptr = handle.0.clone(); - let ptr = mem::transmute::<Arc<Node<U>>, *mut ArcNode<U>>(ptr); - NotifyHandle::new(hide_lt(ptr)) - } - } +unsafe fn wake_inner<U: Unpark>(data: *const ()) { + let arc: Arc<Inner<U>> = Arc::from_raw(data as *const Inner<U>); + arc.unpark.unpark(); } -struct ArcNode<U>(PhantomData<U>); +unsafe fn wake_by_ref_inner<U: Unpark>(data: *const ()) { + let arc: Arc<Inner<U>> = Arc::from_raw(data as *const Inner<U>); + arc.unpark.unpark(); + // by_ref means we don't own the Node, so forget the Arc + mem::forget(arc); +} -// We should never touch `Task` on any thread other than the one owning -// `Scheduler`, so this should be a safe operation. -unsafe impl<U: Sync + Send> Send for ArcNode<U> {} -unsafe impl<U: Sync + Send> Sync for ArcNode<U> {} +unsafe fn drop_inner<U>(data: *const ()) { + drop(Arc::<Inner<U>>::from_raw(data as *const Inner<U>)); +} +// ===== Raw Waker Node<U> ====== + +unsafe fn waker_ref<U: Unpark>(node: &Arc<Node<U>>) -> Waker { + let ptr = &*node as &Node<U> as *const Node<U> as *const (); + let vtable = &RawWakerVTable::new( + clone_node::<U>, + wake_unreachable, + wake_by_ref_node::<U>, + noop, + ); + + Waker::from_raw(RawWaker::new(ptr, vtable)) +} -impl<U: Unpark> executor::Notify for ArcNode<U> { - fn notify(&self, _id: usize) { - unsafe { - let me: *const ArcNode<U> = self; - let me: *const *const ArcNode<U> = &me; - let me = me as *const Arc<Node<U>>; - Node::notify(&*me) - } - } +unsafe fn wake_unreachable(_data: *const ()) { + unreachable!("waker_ref::wake()"); } -unsafe impl<U: Unpark> UnsafeNotify for ArcNode<U> { - unsafe fn clone_raw(&self) -> NotifyHandle { - let me: *const ArcNode<U> = self; - let me: *const *const ArcNode<U> = &me; - let me = &*(me as *const Arc<Node<U>>); - Notify(me).into() - } +unsafe fn clone_node<U: Unpark>(data: *const ()) -> RawWaker { + let arc: Arc<Node<U>> = Arc::from_raw(data as *const Node<U>); + let clone = arc.clone(); + // forget both Arcs so the refcounts don't get decremented + mem::forget(arc); + mem::forget(clone); + + let vtable = &RawWakerVTable::new( + clone_node::<U>, + wake_node::<U>, + wake_by_ref_node::<U>, + drop_node::<U>, + ); + RawWaker::new(data, vtable) +} - unsafe fn drop_raw(&self) { - let mut me: *const ArcNode<U> = self; - let me = &mut me as *mut *const ArcNode<U> as *mut Arc<Node<U>>; - ptr::drop_in_place(me); - } +unsafe fn wake_node<U: Unpark>(data: *const ()) { + let arc: Arc<Node<U>> = Arc::from_raw(data as *const Node<U>); + Node::<U>::notify(&arc); +} + +unsafe fn wake_by_ref_node<U: Unpark>(data: *const ()) { + let arc: Arc<Node<U>> = Arc::from_raw(data as *const Node<U>); + Node::<U>::notify(&arc); + // by_ref means we don't own the Node, so forget the Arc + mem::forget(arc); } -unsafe fn hide_lt<U: Unpark>(p: *mut ArcNode<U>) -> *mut dyn UnsafeNotify { - mem::transmute(p as *mut dyn UnsafeNotify) +unsafe fn drop_node<U>(data: *const ()) { + drop(Arc::<Node<U>>::from_raw(data as *const Node<U>)); } impl<U: Unpark> Node<U> { |