diff options
author | Carl Lerche <me@carllerche.com> | 2020-03-20 21:06:50 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-20 21:06:50 -0700 |
commit | dd27f1a259033cc6328ccad1f73f753e52976a65 (patch) | |
tree | 266ab5d9e0a0e035da6f97e064c2c6295d8c2c98 | |
parent | 5fd1b8f67c4a9976ed76f304484a4213283a3d6b (diff) |
rt: remove `unsafe` from shell runtime. (#2333)
Since the original shell runtime was implemented, utilities have been
added to encapsulate `unsafe`. The shell runtime is now able to use
those utilities and not include its own `unsafe` code.
-rw-r--r-- | tests-integration/Cargo.toml | 1 | ||||
-rw-r--r-- | tests-integration/tests/rt_shell.rs | 32 | ||||
-rw-r--r-- | tokio/src/runtime/shell.rs | 65 | ||||
-rw-r--r-- | tokio/src/util/mod.rs | 6 |
4 files changed, 57 insertions, 47 deletions
diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 6f84afd9..c6dd8450 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -15,6 +15,7 @@ full = [ "tokio-test" ] macros = ["tokio/macros"] +sync = ["tokio/sync"] rt-core = ["tokio/rt-core"] rt-threaded = ["rt-core", "tokio/rt-threaded"] diff --git a/tests-integration/tests/rt_shell.rs b/tests-integration/tests/rt_shell.rs new file mode 100644 index 00000000..392c0519 --- /dev/null +++ b/tests-integration/tests/rt_shell.rs @@ -0,0 +1,32 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "sync")] + +use tokio::runtime; +use tokio::sync::oneshot; + +use std::sync::mpsc; +use std::thread; + +#[test] +fn basic_shell_rt() { + let (feed_tx, feed_rx) = mpsc::channel::<oneshot::Sender<()>>(); + + let th = thread::spawn(move || { + for tx in feed_rx.iter() { + tx.send(()).unwrap(); + } + }); + + for _ in 0..1_000 { + let mut rt = runtime::Builder::new().build().unwrap(); + + let (tx, rx) = oneshot::channel(); + + feed_tx.send(tx).unwrap(); + + rt.block_on(rx).unwrap(); + } + + drop(feed_tx); + th.join().unwrap(); +} diff --git a/tokio/src/runtime/shell.rs b/tokio/src/runtime/shell.rs index efe77f20..294f2a16 100644 --- a/tokio/src/runtime/shell.rs +++ b/tokio/src/runtime/shell.rs @@ -1,49 +1,43 @@ #![allow(clippy::redundant_clone)] -use crate::park::Park; +use crate::park::{Park, Unpark}; use crate::runtime::enter; use crate::runtime::time; +use crate::util::{waker_ref, Wake}; use std::future::Future; -use std::mem::ManuallyDrop; -use std::pin::Pin; use std::sync::Arc; +use std::task::Context; use std::task::Poll::Ready; -use std::task::{Context, RawWaker, RawWakerVTable, Waker}; #[derive(Debug)] pub(super) struct Shell { driver: time::Driver, /// TODO: don't store this - waker: Waker, + unpark: Arc<Handle>, } -type Handle = <time::Driver as Park>::Unpark; +#[derive(Debug)] +struct Handle(<time::Driver as Park>::Unpark); impl Shell { pub(super) fn new(driver: time::Driver) -> Shell { - // Make sure we don't mess up types (as we do casts later) - let unpark: Arc<Handle> = Arc::new(driver.unpark()); - - let raw_waker = RawWaker::new( - Arc::into_raw(unpark) as *const Handle as *const (), - &RawWakerVTable::new(clone_waker, wake, wake_by_ref, drop_waker), - ); + let unpark = Arc::new(Handle(driver.unpark())); - let waker = unsafe { Waker::from_raw(raw_waker) }; - - Shell { driver, waker } + Shell { driver, unpark } } - pub(super) fn block_on<F>(&mut self, mut f: F) -> F::Output + pub(super) fn block_on<F>(&mut self, f: F) -> F::Output where F: Future, { let _e = enter(); - let mut f = unsafe { Pin::new_unchecked(&mut f) }; - let mut cx = Context::from_waker(&self.waker); + pin!(f); + + let waker = waker_ref(&self.unpark); + let mut cx = Context::from_waker(&waker); loop { if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { @@ -55,29 +49,14 @@ impl Shell { } } -fn clone_waker(ptr: *const ()) -> RawWaker { - let w1 = unsafe { ManuallyDrop::new(Arc::from_raw(ptr as *const Handle)) }; - let _w2 = ManuallyDrop::new(w1.clone()); - - RawWaker::new( - ptr, - &RawWakerVTable::new(clone_waker, wake, wake_by_ref, drop_waker), - ) -} - -fn wake(ptr: *const ()) { - use crate::park::Unpark; - let unpark = unsafe { Arc::from_raw(ptr as *const Handle) }; - (unpark).unpark() -} - -fn wake_by_ref(ptr: *const ()) { - use crate::park::Unpark; - - let unpark = ptr as *const Handle; - unsafe { (*unpark).unpark() } -} +impl Wake for Handle { + /// Wake by value + fn wake(self: Arc<Self>) { + Wake::wake_by_ref(&self); + } -fn drop_waker(ptr: *const ()) { - let _ = unsafe { Arc::from_raw(ptr as *const Handle) }; + /// Wake by reference + fn wake_by_ref(arc_self: &Arc<Self>) { + arc_self.0.unpark(); + } } diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index c2f572f1..a093395c 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -9,10 +9,8 @@ pub(crate) mod linked_list; #[cfg(any(feature = "rt-threaded", feature = "macros", feature = "stream"))] mod rand; -cfg_rt_core! { - mod wake; - pub(crate) use wake::{waker_ref, Wake}; -} +mod wake; +pub(crate) use wake::{waker_ref, Wake}; cfg_rt_threaded! { pub(crate) use rand::FastRand; |