diff options
author | Lucio Franco <luciofranco14@gmail.com> | 2020-08-27 20:05:48 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-27 20:05:48 -0400 |
commit | d600ab9a8f37e9eff3fa8587069a816b65b6da0b (patch) | |
tree | 06d14901604c5c7822b43d9f4973fdccd15509e7 | |
parent | d9d909cb4c6d326423ee02fbcf6bbfe5553d2c0a (diff) |
rt: Refactor `Runtime::block_on` to take `&self` (#2782)
Co-authored-by: Eliza Weisman <eliza@buoyant.io>
41 files changed, 331 insertions, 693 deletions
diff --git a/benches/mpsc.rs b/benches/mpsc.rs index 49bd3cc0..ec07ad8f 100644 --- a/benches/mpsc.rs +++ b/benches/mpsc.rs @@ -43,7 +43,7 @@ fn send_large(b: &mut Bencher) { } fn contention_bounded(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -70,7 +70,7 @@ fn contention_bounded(b: &mut Bencher) { } fn contention_bounded_full(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -97,7 +97,7 @@ fn contention_bounded_full(b: &mut Bencher) { } fn contention_unbounded(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -124,7 +124,7 @@ fn contention_unbounded(b: &mut Bencher) { } fn uncontented_bounded(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -146,7 +146,7 @@ fn uncontented_bounded(b: &mut Bencher) { } fn uncontented_unbounded(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() diff --git a/benches/scheduler.rs b/benches/scheduler.rs index 0562a120..801de72a 100644 --- a/benches/scheduler.rs +++ b/benches/scheduler.rs @@ -13,7 +13,7 @@ use std::sync::{mpsc, Arc}; fn spawn_many(b: &mut Bencher) { const NUM_SPAWN: usize = 10_000; - let mut rt = rt(); + let rt = rt(); let (tx, rx) = mpsc::sync_channel(1000); let rem = Arc::new(AtomicUsize::new(0)); @@ -68,7 +68,7 @@ fn yield_many(b: &mut Bencher) { fn ping_pong(b: &mut Bencher) { const NUM_PINGS: usize = 1_000; - let mut rt = rt(); + let rt = rt(); let (done_tx, done_rx) = mpsc::sync_channel(1000); let rem = Arc::new(AtomicUsize::new(0)); @@ -111,7 +111,7 @@ fn ping_pong(b: &mut Bencher) { fn chained_spawn(b: &mut Bencher) { const ITER: usize = 1_000; - let mut rt = rt(); + let rt = rt(); fn iter(done_tx: mpsc::SyncSender<()>, n: usize) { if n == 0 { diff --git a/benches/spawn.rs b/benches/spawn.rs index 9122c7b1..f76daf3f 100644 --- a/benches/spawn.rs +++ b/benches/spawn.rs @@ -10,7 +10,7 @@ async fn work() -> usize { } fn basic_scheduler_local_spawn(bench: &mut Bencher) { - let mut runtime = tokio::runtime::Builder::new() + let runtime = tokio::runtime::Builder::new() .basic_scheduler() .build() .unwrap(); @@ -23,7 +23,7 @@ fn basic_scheduler_local_spawn(bench: &mut Bencher) { } fn threaded_scheduler_local_spawn(bench: &mut Bencher) { - let mut runtime = tokio::runtime::Builder::new() + let runtime = tokio::runtime::Builder::new() .threaded_scheduler() .build() .unwrap(); @@ -40,9 +40,9 @@ fn basic_scheduler_remote_spawn(bench: &mut Bencher) { .basic_scheduler() .build() .unwrap(); - let handle = runtime.handle(); + bench.iter(|| { - let h = handle.spawn(work()); + let h = runtime.spawn(work()); black_box(h); }); } @@ -52,9 +52,9 @@ fn threaded_scheduler_remote_spawn(bench: &mut Bencher) { .threaded_scheduler() .build() .unwrap(); - let handle = runtime.handle(); + bench.iter(|| { - let h = handle.spawn(work()); + let h = runtime.spawn(work()); black_box(h); }); } diff --git a/benches/sync_rwlock.rs b/benches/sync_rwlock.rs index 4eca9807..30c66e49 100644 --- a/benches/sync_rwlock.rs +++ b/benches/sync_rwlock.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use tokio::{sync::RwLock, task}; fn read_uncontended(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -22,7 +22,7 @@ fn read_uncontended(b: &mut Bencher) { } fn read_concurrent_uncontended_multi(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -51,7 +51,7 @@ fn read_concurrent_uncontended_multi(b: &mut Bencher) { } fn read_concurrent_uncontended(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .basic_scheduler() .build() .unwrap(); @@ -78,7 +78,7 @@ fn read_concurrent_uncontended(b: &mut Bencher) { } fn read_concurrent_contended_multi(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -108,7 +108,7 @@ fn read_concurrent_contended_multi(b: &mut Bencher) { } fn read_concurrent_contended(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .basic_scheduler() .build() .unwrap(); diff --git a/benches/sync_semaphore.rs b/benches/sync_semaphore.rs index c43311c0..32d4aa2b 100644 --- a/benches/sync_semaphore.rs +++ b/benches/sync_semaphore.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use tokio::{sync::Semaphore, task}; fn uncontended(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -27,7 +27,7 @@ async fn task(s: Arc<Semaphore>) { } fn uncontended_concurrent_multi(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -51,7 +51,7 @@ fn uncontended_concurrent_multi(b: &mut Bencher) { } fn uncontended_concurrent_single(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .basic_scheduler() .build() .unwrap(); @@ -73,7 +73,7 @@ fn uncontended_concurrent_single(b: &mut Bencher) { } fn contended_concurrent_multi(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(6) .threaded_scheduler() .build() @@ -97,7 +97,7 @@ fn contended_concurrent_multi(b: &mut Bencher) { } fn contended_concurrent_single(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .basic_scheduler() .build() .unwrap(); diff --git a/tests-integration/tests/rt_shell.rs b/tests-integration/tests/rt_shell.rs index 392c0519..012f44a7 100644 --- a/tests-integration/tests/rt_shell.rs +++ b/tests-integration/tests/rt_shell.rs @@ -18,7 +18,7 @@ fn basic_shell_rt() { }); for _ in 0..1_000 { - let mut rt = runtime::Builder::new().build().unwrap(); + let rt = runtime::Builder::new().build().unwrap(); let (tx, rx) = oneshot::channel(); diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs index d53a42ad..cfbf80ce 100644 --- a/tokio-test/src/lib.rs +++ b/tokio-test/src/lib.rs @@ -28,7 +28,7 @@ pub mod task; pub fn block_on<F: std::future::Future>(future: F) -> F::Output { use tokio::runtime; - let mut rt = runtime::Builder::new() + let rt = runtime::Builder::new() .basic_scheduler() .enable_all() .build() diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs index f6289093..e07538d9 100644 --- a/tokio-util/src/context.rs +++ b/tokio-util/src/context.rs @@ -12,21 +12,21 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use tokio::runtime::Handle; +use tokio::runtime::Runtime; pin_project! { /// `TokioContext` allows connecting a custom executor with the tokio runtime. /// /// It contains a `Handle` to the runtime. A handle to the runtime can be /// obtain by calling the `Runtime::handle()` method. - pub struct TokioContext<F> { + pub struct TokioContext<'a, F> { #[pin] inner: F, - handle: Handle, + handle: &'a Runtime, } } -impl<F: Future> Future for TokioContext<F> { +impl<F: Future> Future for TokioContext<'_, F> { type Output = F::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { @@ -39,16 +39,16 @@ impl<F: Future> Future for TokioContext<F> { } /// Trait extension that simplifies bundling a `Handle` with a `Future`. -pub trait HandleExt { +pub trait RuntimeExt { /// Convenience method that takes a Future and returns a `TokioContext`. /// /// # Example: calling Tokio Runtime from a custom ThreadPool /// /// ```no_run - /// use tokio_util::context::HandleExt; + /// use tokio_util::context::RuntimeExt; /// use tokio::time::{delay_for, Duration}; /// - /// let mut rt = tokio::runtime::Builder::new() + /// let rt = tokio::runtime::Builder::new() /// .threaded_scheduler() /// .enable_all() /// .build().unwrap(); @@ -61,18 +61,17 @@ pub trait HandleExt { /// /// rt.block_on( /// rt2 - /// .handle() /// .wrap(async { delay_for(Duration::from_millis(2)).await }), /// ); ///``` - fn wrap<F: Future>(&self, fut: F) -> TokioContext<F>; + fn wrap<F: Future>(&self, fut: F) -> TokioContext<'_, F>; } -impl HandleExt for Handle { - fn wrap<F: Future>(&self, fut: F) -> TokioContext<F> { +impl RuntimeExt for Runtime { + fn wrap<F: Future>(&self, fut: F) -> TokioContext<'_, F> { TokioContext { inner: fut, - handle: self.clone(), + handle: self, } } } diff --git a/tokio-util/tests/context.rs b/tokio-util/tests/context.rs index 49038ddb..2e39b144 100644 --- a/tokio-util/tests/context.rs +++ b/tokio-util/tests/context.rs @@ -2,11 +2,11 @@ use tokio::runtime::Builder; use tokio::time::*; -use tokio_util::context::HandleExt; +use tokio_util::context::RuntimeExt; #[test] fn tokio_context_with_another_runtime() { - let mut rt1 = Builder::new() + let rt1 = Builder::new() .threaded_scheduler() .core_threads(1) // no timer! @@ -21,8 +21,5 @@ fn tokio_context_with_another_runtime() { // Without the `HandleExt.wrap()` there would be a panic because there is // no timer running, since it would be referencing runtime r1. - let _ = rt1.block_on( - rt2.handle() - .wrap(async move { delay_for(Duration::from_millis(2)).await }), - ); + let _ = rt1.block_on(rt2.wrap(async move { delay_for(Duration::from_millis(2)).await })); } diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index 785968f4..9054c3b8 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -173,7 +173,7 @@ where /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn new(io: E) -> io::Result<Self> { PollEvented::new_with_ready(io, mio::Ready::all()) } @@ -201,7 +201,7 @@ where /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn new_with_ready(io: E, ready: mio::Ready) -> io::Result<Self> { let registration = Registration::new_with_ready(&io, ready)?; Ok(Self { diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index 63aaff56..82065072 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -67,7 +67,7 @@ impl Registration { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn new<T>(io: &T) -> io::Result<Registration> where T: Evented, @@ -104,7 +104,7 @@ impl Registration { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn new_with_ready<T>(io: &T, ready: mio::Ready) -> io::Result<Registration> where T: Evented, diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index fd79b259..44945e38 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -262,7 +262,7 @@ impl TcpListener { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(listener: net::TcpListener) -> io::Result<TcpListener> { let io = mio::net::TcpListener::from_std(listener)?; let io = PollEvented::new(io)?; diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index e624fb9d..e0348724 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -187,7 +187,7 @@ impl TcpStream { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(stream: net::TcpStream) -> io::Result<TcpStream> { let io = mio::net::TcpStream::from_stream(stream)?; let io = PollEvented::new(io)?; diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs index 16e53773..f9d88372 100644 --- a/tokio/src/net/udp/socket.rs +++ b/tokio/src/net/udp/socket.rs @@ -64,7 +64,7 @@ impl UdpSocket { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> { let io = mio::net::UdpSocket::from_socket(socket)?; let io = PollEvented::new(io)?; diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index 2282f96a..ba3a10c4 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -164,7 +164,7 @@ impl UnixDatagram { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a Tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. /// # Examples /// ``` /// # use std::error::Error; diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index 9b76cb01..119dc6fb 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -60,7 +60,7 @@ impl UnixListener { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn bind<P>(path: P) -> io::Result<UnixListener> where P: AsRef<Path>, @@ -82,7 +82,7 @@ impl UnixListener { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(listener: net::UnixListener) -> io::Result<UnixListener> { let listener = mio_uds::UnixListener::from_listener(listener)?; let io = PollEvented::new(listener)?; diff --git a/tokio/src/net/unix/stream.rs b/tokio/src/net/unix/stream.rs index 559fe02a..6f49849a 100644 --- a/tokio/src/net/unix/stream.rs +++ b/tokio/src/net/unix/stream.rs @@ -54,7 +54,7 @@ impl UnixStream { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> { let stream = mio_uds::UnixStream::from_stream(stream)?; let io = PollEvented::new(stream)?; diff --git a/tokio/src/park/mod.rs b/tokio/src/park/mod.rs index 2cfef8c2..4085a99a 100644 --- a/tokio/src/park/mod.rs +++ b/tokio/src/park/mod.rs @@ -42,9 +42,7 @@ cfg_resource_drivers! { mod thread; pub(crate) use self::thread::ParkThread; -cfg_block_on! { - pub(crate) use self::thread::{CachedParkThread, ParkError}; -} +pub(crate) use self::thread::{CachedParkThread, ParkError}; use std::sync::Arc; use std::time::Duration; diff --git a/tokio/src/park/thread.rs b/tokio/src/park/thread.rs index 44174d35..9ed41310 100644 --- a/tokio/src/park/thread.rs +++ b/tokio/src/park/thread.rs @@ -212,118 +212,114 @@ impl Unpark for UnparkThread { } } -cfg_block_on! { - use std::marker::PhantomData; - use std::rc::Rc; +use std::marker::PhantomData; +use std::rc::Rc; - use std::mem; - use std::task::{RawWaker, RawWakerVTable, Waker}; +use std::mem; +use std::task::{RawWaker, RawWakerVTable, Waker}; - /// Blocks the current thread using a condition variable. - #[derive(Debug)] - pub(crate) struct CachedParkThread { - _anchor: PhantomData<Rc<()>>, - } - - impl CachedParkThread { - /// Create a new `ParkThread` handle for the current thread. - /// - /// This type cannot be moved to other threads, so it should be created on - /// the thread that the caller intends to park. - pub(crate) fn new() -> CachedParkThread { - CachedParkThread { - _anchor: PhantomData, - } - } - - pub(crate) fn get_unpark(&self) -> Result<UnparkThread, ParkError> { - self.with_current(|park_thread| park_thread.unpark()) - } +/// Blocks the current thread using a condition variable. +#[derive(Debug)] +pub(crate) struct CachedParkThread { + _anchor: PhantomData<Rc<()>>, +} - /// Get a reference to the `ParkThread` handle for this thread. - fn with_current<F, R>(&self, f: F) -> Result<R, ParkError> - where - F: FnOnce(&ParkThread) -> R, - { - CURRENT_PARKER.try_with(|inner| f(inner)) - .map_err(|_| ()) +impl CachedParkThread { + /// Create a new `ParkThread` handle for the current thread. + /// + /// This type cannot be moved to other threads, so it should be created on + /// the thread that the caller intends to park. + pub(crate) fn new() -> CachedParkThread { + CachedParkThread { + _anchor: PhantomData, } } - impl Park for CachedParkThread { - type Unpark = UnparkThread; - type Error = ParkError; + pub(crate) fn get_unpark(&self) -> Result<UnparkThread, ParkError> { + self.with_current(|park_thread| park_thread.unpark()) + } - fn unpark(&self) -> Self::Unpark { - self.get_unpark().unwrap() - } + /// Get a reference to the `ParkThread` handle for this thread. + fn with_current<F, R>(&self, f: F) -> Result<R, ParkError> + where + F: FnOnce(&ParkThread) -> R, + { + CURRENT_PARKER.try_with(|inner| f(inner)).map_err(|_| ()) + } +} - fn park(&mut self) -> Result<(), Self::Error> { - self.with_current(|park_thread| park_thread.inner.park())?; - Ok(()) - } +impl Park for CachedParkThread { + type Unpark = UnparkThread; + type Error = ParkError; - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?; - Ok(()) - } + fn unpark(&self) -> Self::Unpark { + self.get_unpark().unwrap() + } - fn shutdown(&mut self) { - let _ = self.with_current(|park_thread| park_thread.inner.shutdown()); - } + fn park(&mut self) -> Result<(), Self::Error> { + self.with_current(|park_thread| park_thread.inner.park())?; + Ok(()) } + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?; + Ok(()) + } - impl UnparkThread { - pub(crate) fn into_waker(self) -> Waker { - unsafe { - let raw = unparker_to_raw_waker(self.inner); - Waker::from_raw(raw) - } - } + fn shutdown(&mut self) { + let _ = self.with_current(|park_thread| park_thread.inner.shutdown()); } +} - impl Inner { - #[allow(clippy::wrong_self_convention)] - fn into_raw(this: Arc<Inner>) -> *const () { - Arc::into_raw(this) as *const () +impl UnparkThread { + pub(crate) fn into_waker(self) -> Waker { + unsafe { + let raw = unparker_to_raw_waker(self.inner); + Waker::from_raw(raw) } + } +} - unsafe fn from_raw(ptr: *const ()) -> Arc<Inner> { - Arc::from_raw(ptr as *const Inner) - } +impl Inner { + #[allow(clippy::wrong_self_convention)] + fn into_raw(this: Arc<Inner>) -> *const () { + Arc::into_raw(this) as *const () } - unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker { - RawWaker::new( - Inner::into_raw(unparker), - &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker), - ) + unsafe fn from_raw(ptr: *const ()) -> Arc<Inner> { + Arc::from_raw(ptr as *const Inner) } +} - unsafe fn clone(raw: *const ()) -> RawWaker { - let unparker = Inner::from_raw(raw); +unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker { + RawWaker::new( + Inner::into_raw(unparker), + &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker), + ) +} - // Increment the ref count - mem::forget(unparker.clone()); +unsafe fn clone(raw: *const ()) -> RawWaker { + let unparker = Inner::from_raw(raw); - unparker_to_raw_waker(unparker) - } + // Increment the ref count + mem::forget(unparker.clone()); - unsafe fn drop_waker(raw: *const ()) { - let _ = Inner::from_raw(raw); - } + unparker_to_raw_waker(unparker) +} - unsafe fn wake(raw: *const ()) { - let unparker = Inner::from_raw(raw); - unparker.unpark(); - } +unsafe fn drop_waker(raw: *const ()) { + let _ = Inner::from_raw(raw); +} - unsafe fn wake_by_ref(raw: *const ()) { - let unparker = Inner::from_raw(raw); - unparker.unpark(); +unsafe fn wake(raw: *const ()) { |