summaryrefslogtreecommitdiffstats
path: root/tokio-current-thread/src/scheduler.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-current-thread/src/scheduler.rs')
-rw-r--r--tokio-current-thread/src/scheduler.rs170
1 files changed, 102 insertions, 68 deletions
diff --git a/tokio-current-thread/src/scheduler.rs b/tokio-current-thread/src/scheduler.rs
index decef395..709ac415 100644
--- a/tokio-current-thread/src/scheduler.rs
+++ b/tokio-current-thread/src/scheduler.rs
@@ -1,14 +1,14 @@
use crate::Borrow;
-use futures::executor::{self, NotifyHandle, Spawn, UnsafeNotify};
-use futures::{Async, Future};
use std::cell::UnsafeCell;
use std::fmt::{self, Debug};
-use std::marker::PhantomData;
+use std::future::Future;
use std::mem;
+use std::pin::Pin;
use std::ptr;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst};
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize};
use std::sync::{Arc, Weak};
+use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::thread;
use std::usize;
use tokio_executor::park::Unpark;
@@ -22,8 +22,6 @@ pub struct Scheduler<U> {
nodes: List<U>,
}
-pub struct Notify<'a, U>(&'a Arc<Node<U>>);
-
// A linked-list of nodes
struct List<U> {
len: usize,
@@ -78,12 +76,6 @@ struct Inner<U> {
unsafe impl<U: Sync + Send> Send for Inner<U> {}
unsafe impl<U: Sync + Send> Sync for Inner<U> {}
-impl<U: Unpark> executor::Notify for Inner<U> {
- fn notify(&self, _: usize) {
- self.unpark.unpark();
- }
-}
-
struct Node<U> {
// The item
item: UnsafeCell<Option<Task>>,
@@ -123,12 +115,12 @@ enum Dequeue<U> {
}
/// Wraps a spawned boxed future
-struct Task(Spawn<Box<dyn Future<Item = (), Error = ()>>>);
+struct Task(Pin<Box<dyn Future<Output = ()>>>);
/// A task that is scheduled. `turn` must be called
pub struct Scheduled<'a, U> {
task: &'a mut Task,
- notify: &'a Notify<'a, U>,
+ node: &'a Arc<Node<U>>,
done: &'a mut bool,
}
@@ -165,11 +157,11 @@ where
}
}
- pub fn notify(&self) -> NotifyHandle {
- self.inner.clone().into()
+ pub fn waker(&self) -> Waker {
+ waker_inner(self.inner.clone())
}
- pub fn schedule(&mut self, item: Box<dyn Future<Item = (), Error = ()>>) {
+ pub fn schedule(&mut self, item: Pin<Box<dyn Future<Output = ()>>>) {
// Get the current scheduler tick
let tick_num = self.inner.tick_num.load(SeqCst);
@@ -317,11 +309,10 @@ where
// deallocating the node if need be.
let borrow = &mut *bomb.borrow;
let enter = &mut *bomb.enter;
- let notify = Notify(bomb.node.as_ref().unwrap());
let mut scheduled = Scheduled {
task: item,
- notify: &notify,
+ node: bomb.node.as_ref().unwrap(),
done: &mut done,
};
@@ -345,10 +336,15 @@ where
impl<'a, U: Unpark> Scheduled<'a, U> {
/// Polls the task, returns `true` if the task has completed.
pub fn tick(&mut self) -> bool {
- // Tick the future
- let ret = match self.task.0.poll_future_notify(self.notify, 0) {
- Ok(Async::Ready(_)) | Err(_) => true,
- Ok(Async::NotReady) => false,
+ let waker = unsafe {
+ // Safety: we don't hold this waker ref longer than
+ // this `tick` function
+ waker_ref(self.node)
+ };
+ let mut cx = Context::from_waker(&waker);
+ let ret = match self.task.0.as_mut().poll(&mut cx) {
+ Poll::Ready(()) => true,
+ Poll::Pending => false,
};
*self.done = ret;
@@ -357,8 +353,8 @@ impl<'a, U: Unpark> Scheduled<'a, U> {
}
impl Task {
- pub fn new(future: Box<dyn Future<Item = (), Error = ()> + 'static>) -> Self {
- Task(executor::spawn(future))
+ pub fn new(future: Pin<Box<dyn Future<Output = ()> + 'static>>) -> Self {
+ Task(future)
}
}
@@ -630,63 +626,101 @@ impl<U> List<U> {
}
}
-impl<'a, U> Clone for Notify<'a, U> {
- fn clone(&self) -> Self {
- Notify(self.0)
- }
+unsafe fn noop(_: *const ()) {}
+
+// ===== Raw Waker Inner<U> ======
+
+fn waker_inner<U: Unpark>(inner: Arc<Inner<U>>) -> Waker {
+ let ptr = Arc::into_raw(inner) as *const ();
+ let vtable = &RawWakerVTable::new(
+ clone_inner::<U>,
+ wake_inner::<U>,
+ wake_by_ref_inner::<U>,
+ drop_inner::<U>,
+ );
+
+ unsafe { Waker::from_raw(RawWaker::new(ptr, vtable)) }
}
-impl<'a, U> fmt::Debug for Notify<'a, U> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Notify").finish()
- }
+unsafe fn clone_inner<U: Unpark>(data: *const ()) -> RawWaker {
+ let arc: Arc<Inner<U>> = Arc::from_raw(data as *const Inner<U>);
+ let clone = arc.clone();
+ // forget both Arcs so the refcounts don't get decremented
+ mem::forget(arc);
+ mem::forget(clone);
+
+ let vtable = &RawWakerVTable::new(
+ clone_inner::<U>,
+ wake_inner::<U>,
+ wake_by_ref_inner::<U>,
+ drop_inner::<U>,
+ );
+ RawWaker::new(data, vtable)
}
-impl<'a, U: Unpark> From<Notify<'a, U>> for NotifyHandle {
- fn from(handle: Notify<'a, U>) -> NotifyHandle {
- unsafe {
- let ptr = handle.0.clone();
- let ptr = mem::transmute::<Arc<Node<U>>, *mut ArcNode<U>>(ptr);
- NotifyHandle::new(hide_lt(ptr))
- }
- }
+unsafe fn wake_inner<U: Unpark>(data: *const ()) {
+ let arc: Arc<Inner<U>> = Arc::from_raw(data as *const Inner<U>);
+ arc.unpark.unpark();
}
-struct ArcNode<U>(PhantomData<U>);
+unsafe fn wake_by_ref_inner<U: Unpark>(data: *const ()) {
+ let arc: Arc<Inner<U>> = Arc::from_raw(data as *const Inner<U>);
+ arc.unpark.unpark();
+ // by_ref means we don't own the Node, so forget the Arc
+ mem::forget(arc);
+}
-// We should never touch `Task` on any thread other than the one owning
-// `Scheduler`, so this should be a safe operation.
-unsafe impl<U: Sync + Send> Send for ArcNode<U> {}
-unsafe impl<U: Sync + Send> Sync for ArcNode<U> {}
+unsafe fn drop_inner<U>(data: *const ()) {
+ drop(Arc::<Inner<U>>::from_raw(data as *const Inner<U>));
+}
+// ===== Raw Waker Node<U> ======
+
+unsafe fn waker_ref<U: Unpark>(node: &Arc<Node<U>>) -> Waker {
+ let ptr = &*node as &Node<U> as *const Node<U> as *const ();
+ let vtable = &RawWakerVTable::new(
+ clone_node::<U>,
+ wake_unreachable,
+ wake_by_ref_node::<U>,
+ noop,
+ );
+
+ Waker::from_raw(RawWaker::new(ptr, vtable))
+}
-impl<U: Unpark> executor::Notify for ArcNode<U> {
- fn notify(&self, _id: usize) {
- unsafe {
- let me: *const ArcNode<U> = self;
- let me: *const *const ArcNode<U> = &me;
- let me = me as *const Arc<Node<U>>;
- Node::notify(&*me)
- }
- }
+unsafe fn wake_unreachable(_data: *const ()) {
+ unreachable!("waker_ref::wake()");
}
-unsafe impl<U: Unpark> UnsafeNotify for ArcNode<U> {
- unsafe fn clone_raw(&self) -> NotifyHandle {
- let me: *const ArcNode<U> = self;
- let me: *const *const ArcNode<U> = &me;
- let me = &*(me as *const Arc<Node<U>>);
- Notify(me).into()
- }
+unsafe fn clone_node<U: Unpark>(data: *const ()) -> RawWaker {
+ let arc: Arc<Node<U>> = Arc::from_raw(data as *const Node<U>);
+ let clone = arc.clone();
+ // forget both Arcs so the refcounts don't get decremented
+ mem::forget(arc);
+ mem::forget(clone);
+
+ let vtable = &RawWakerVTable::new(
+ clone_node::<U>,
+ wake_node::<U>,
+ wake_by_ref_node::<U>,
+ drop_node::<U>,
+ );
+ RawWaker::new(data, vtable)
+}
- unsafe fn drop_raw(&self) {
- let mut me: *const ArcNode<U> = self;
- let me = &mut me as *mut *const ArcNode<U> as *mut Arc<Node<U>>;
- ptr::drop_in_place(me);
- }
+unsafe fn wake_node<U: Unpark>(data: *const ()) {
+ let arc: Arc<Node<U>> = Arc::from_raw(data as *const Node<U>);
+ Node::<U>::notify(&arc);
+}
+
+unsafe fn wake_by_ref_node<U: Unpark>(data: *const ()) {
+ let arc: Arc<Node<U>> = Arc::from_raw(data as *const Node<U>);
+ Node::<U>::notify(&arc);
+ // by_ref means we don't own the Node, so forget the Arc
+ mem::forget(arc);
}
-unsafe fn hide_lt<U: Unpark>(p: *mut ArcNode<U>) -> *mut dyn UnsafeNotify {
- mem::transmute(p as *mut dyn UnsafeNotify)
+unsafe fn drop_node<U>(data: *const ()) {
+ drop(Arc::<Node<U>>::from_raw(data as *const Node<U>));
}
impl<U: Unpark> Node<U> {