summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLucio Franco <luciofranco14@gmail.com>2020-08-27 20:05:48 -0400
committerGitHub <noreply@github.com>2020-08-27 20:05:48 -0400
commitd600ab9a8f37e9eff3fa8587069a816b65b6da0b (patch)
tree06d14901604c5c7822b43d9f4973fdccd15509e7
parentd9d909cb4c6d326423ee02fbcf6bbfe5553d2c0a (diff)
rt: Refactor `Runtime::block_on` to take `&self` (#2782)
Co-authored-by: Eliza Weisman <eliza@buoyant.io>
-rw-r--r--benches/mpsc.rs10
-rw-r--r--benches/scheduler.rs6
-rw-r--r--benches/spawn.rs12
-rw-r--r--benches/sync_rwlock.rs10
-rw-r--r--benches/sync_semaphore.rs10
-rw-r--r--tests-integration/tests/rt_shell.rs2
-rw-r--r--tokio-test/src/lib.rs2
-rw-r--r--tokio-util/src/context.rs23
-rw-r--r--tokio-util/tests/context.rs9
-rw-r--r--tokio/src/io/poll_evented.rs4
-rw-r--r--tokio/src/io/registration.rs4
-rw-r--r--tokio/src/net/tcp/listener.rs2
-rw-r--r--tokio/src/net/tcp/stream.rs2
-rw-r--r--tokio/src/net/udp/socket.rs2
-rw-r--r--tokio/src/net/unix/datagram/socket.rs2
-rw-r--r--tokio/src/net/unix/listener.rs4
-rw-r--r--tokio/src/net/unix/stream.rs2
-rw-r--r--tokio/src/park/mod.rs4
-rw-r--r--tokio/src/park/thread.rs170
-rw-r--r--tokio/src/runtime/basic_scheduler.rs1
-rw-r--r--tokio/src/runtime/blocking/mod.rs1
-rw-r--r--tokio/src/runtime/blocking/pool.rs5
-rw-r--r--tokio/src/runtime/builder.rs18
-rw-r--r--tokio/src/runtime/context.rs6
-rw-r--r--tokio/src/runtime/enter.rs46
-rw-r--r--tokio/src/runtime/handle.rs339
-rw-r--r--tokio/src/runtime/mod.rs94
-rw-r--r--tokio/src/runtime/tests/loom_pool.rs4
-rw-r--r--tokio/src/signal/registry.rs4
-rw-r--r--tokio/src/signal/windows.rs2
-rw-r--r--tokio/src/task/local.rs10
-rw-r--r--tokio/src/task/spawn.rs2
-rw-r--r--tokio/tests/io_driver.rs2
-rw-r--r--tokio/tests/rt_basic.rs6
-rw-r--r--tokio/tests/rt_common.rs140
-rw-r--r--tokio/tests/rt_threaded.rs20
-rw-r--r--tokio/tests/signal_drop_rt.rs4
-rw-r--r--tokio/tests/signal_multi_rt.rs2
-rw-r--r--tokio/tests/task_blocking.rs18
-rw-r--r--tokio/tests/task_local_set.rs18
-rw-r--r--tokio/tests/time_rt.rs2
41 files changed, 331 insertions, 693 deletions
diff --git a/benches/mpsc.rs b/benches/mpsc.rs
index 49bd3cc0..ec07ad8f 100644
--- a/benches/mpsc.rs
+++ b/benches/mpsc.rs
@@ -43,7 +43,7 @@ fn send_large(b: &mut Bencher) {
}
fn contention_bounded(b: &mut Bencher) {
- let mut rt = tokio::runtime::Builder::new()
+ let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@@ -70,7 +70,7 @@ fn contention_bounded(b: &mut Bencher) {
}
fn contention_bounded_full(b: &mut Bencher) {
- let mut rt = tokio::runtime::Builder::new()
+ let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@@ -97,7 +97,7 @@ fn contention_bounded_full(b: &mut Bencher) {
}
fn contention_unbounded(b: &mut Bencher) {
- let mut rt = tokio::runtime::Builder::new()
+ let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@@ -124,7 +124,7 @@ fn contention_unbounded(b: &mut Bencher) {
}
fn uncontented_bounded(b: &mut Bencher) {
- let mut rt = tokio::runtime::Builder::new()
+ let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@@ -146,7 +146,7 @@ fn uncontented_bounded(b: &mut Bencher) {
}
fn uncontented_unbounded(b: &mut Bencher) {
- let mut rt = tokio::runtime::Builder::new()
+ let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
diff --git a/benches/scheduler.rs b/benches/scheduler.rs
index 0562a120..801de72a 100644
--- a/benches/scheduler.rs
+++ b/benches/scheduler.rs
@@ -13,7 +13,7 @@ use std::sync::{mpsc, Arc};
fn spawn_many(b: &mut Bencher) {
const NUM_SPAWN: usize = 10_000;
- let mut rt = rt();
+ let rt = rt();
let (tx, rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));
@@ -68,7 +68,7 @@ fn yield_many(b: &mut Bencher) {
fn ping_pong(b: &mut Bencher) {
const NUM_PINGS: usize = 1_000;
- let mut rt = rt();
+ let rt = rt();
let (done_tx, done_rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));
@@ -111,7 +111,7 @@ fn ping_pong(b: &mut Bencher) {
fn chained_spawn(b: &mut Bencher) {
const ITER: usize = 1_000;
- let mut rt = rt();
+ let rt = rt();
fn iter(done_tx: mpsc::SyncSender<()>, n: usize) {
if n == 0 {
diff --git a/benches/spawn.rs b/benches/spawn.rs
index 9122c7b1..f76daf3f 100644
--- a/benches/spawn.rs
+++ b/benches/spawn.rs
@@ -10,7 +10,7 @@ async fn work() -> usize {
}
fn basic_scheduler_local_spawn(bench: &mut Bencher) {
- let mut runtime = tokio::runtime::Builder::new()
+ let runtime = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
@@ -23,7 +23,7 @@ fn basic_scheduler_local_spawn(bench: &mut Bencher) {
}
fn threaded_scheduler_local_spawn(bench: &mut Bencher) {
- let mut runtime = tokio::runtime::Builder::new()
+ let runtime = tokio::runtime::Builder::new()
.threaded_scheduler()
.build()
.unwrap();
@@ -40,9 +40,9 @@ fn basic_scheduler_remote_spawn(bench: &mut Bencher) {
.basic_scheduler()
.build()
.unwrap();
- let handle = runtime.handle();
+
bench.iter(|| {
- let h = handle.spawn(work());
+ let h = runtime.spawn(work());
black_box(h);
});
}
@@ -52,9 +52,9 @@ fn threaded_scheduler_remote_spawn(bench: &mut Bencher) {
.threaded_scheduler()
.build()
.unwrap();
- let handle = runtime.handle();
+
bench.iter(|| {
- let h = handle.spawn(work());
+ let h = runtime.spawn(work());
black_box(h);
});
}
diff --git a/benches/sync_rwlock.rs b/benches/sync_rwlock.rs
index 4eca9807..30c66e49 100644
--- a/benches/sync_rwlock.rs
+++ b/benches/sync_rwlock.rs
@@ -3,7 +3,7 @@ use std::sync::Arc;
use tokio::{sync::RwLock, task};
fn read_uncontended(b: &mut Bencher) {
- let mut rt = tokio::runtime::Builder::new()
+ let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@@ -22,7 +22,7 @@ fn read_uncontended(b: &mut Bencher) {
}
fn read_concurrent_uncontended_multi(b: &mut Bencher) {
- let mut rt = tokio::runtime::Builder::new()
+ let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@@ -51,7 +51,7 @@ fn read_concurrent_uncontended_multi(b: &mut Bencher) {
}
fn read_concurrent_uncontended(b: &mut Bencher) {
- let mut rt = tokio::runtime::Builder::new()
+ let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
@@ -78,7 +78,7 @@ fn read_concurrent_uncontended(b: &mut Bencher) {
}
fn read_concurrent_contended_multi(b: &mut Bencher) {
- let mut rt = tokio::runtime::Builder::new()
+ let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@@ -108,7 +108,7 @@ fn read_concurrent_contended_multi(b: &mut Bencher) {
}
fn read_concurrent_contended(b: &mut Bencher) {
- let mut rt = tokio::runtime::Builder::new()
+ let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
diff --git a/benches/sync_semaphore.rs b/benches/sync_semaphore.rs
index c43311c0..32d4aa2b 100644
--- a/benches/sync_semaphore.rs
+++ b/benches/sync_semaphore.rs
@@ -3,7 +3,7 @@ use std::sync::Arc;
use tokio::{sync::Semaphore, task};
fn uncontended(b: &mut Bencher) {
- let mut rt = tokio::runtime::Builder::new()
+ let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@@ -27,7 +27,7 @@ async fn task(s: Arc<Semaphore>) {
}
fn uncontended_concurrent_multi(b: &mut Bencher) {
- let mut rt = tokio::runtime::Builder::new()
+ let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@@ -51,7 +51,7 @@ fn uncontended_concurrent_multi(b: &mut Bencher) {
}
fn uncontended_concurrent_single(b: &mut Bencher) {
- let mut rt = tokio::runtime::Builder::new()
+ let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
@@ -73,7 +73,7 @@ fn uncontended_concurrent_single(b: &mut Bencher) {
}
fn contended_concurrent_multi(b: &mut Bencher) {
- let mut rt = tokio::runtime::Builder::new()
+ let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@@ -97,7 +97,7 @@ fn contended_concurrent_multi(b: &mut Bencher) {
}
fn contended_concurrent_single(b: &mut Bencher) {
- let mut rt = tokio::runtime::Builder::new()
+ let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
diff --git a/tests-integration/tests/rt_shell.rs b/tests-integration/tests/rt_shell.rs
index 392c0519..012f44a7 100644
--- a/tests-integration/tests/rt_shell.rs
+++ b/tests-integration/tests/rt_shell.rs
@@ -18,7 +18,7 @@ fn basic_shell_rt() {
});
for _ in 0..1_000 {
- let mut rt = runtime::Builder::new().build().unwrap();
+ let rt = runtime::Builder::new().build().unwrap();
let (tx, rx) = oneshot::channel();
diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs
index d53a42ad..cfbf80ce 100644
--- a/tokio-test/src/lib.rs
+++ b/tokio-test/src/lib.rs
@@ -28,7 +28,7 @@ pub mod task;
pub fn block_on<F: std::future::Future>(future: F) -> F::Output {
use tokio::runtime;
- let mut rt = runtime::Builder::new()
+ let rt = runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs
index f6289093..e07538d9 100644
--- a/tokio-util/src/context.rs
+++ b/tokio-util/src/context.rs
@@ -12,21 +12,21 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
-use tokio::runtime::Handle;
+use tokio::runtime::Runtime;
pin_project! {
/// `TokioContext` allows connecting a custom executor with the tokio runtime.
///
/// It contains a `Handle` to the runtime. A handle to the runtime can be
/// obtain by calling the `Runtime::handle()` method.
- pub struct TokioContext<F> {
+ pub struct TokioContext<'a, F> {
#[pin]
inner: F,
- handle: Handle,
+ handle: &'a Runtime,
}
}
-impl<F: Future> Future for TokioContext<F> {
+impl<F: Future> Future for TokioContext<'_, F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -39,16 +39,16 @@ impl<F: Future> Future for TokioContext<F> {
}
/// Trait extension that simplifies bundling a `Handle` with a `Future`.
-pub trait HandleExt {
+pub trait RuntimeExt {
/// Convenience method that takes a Future and returns a `TokioContext`.
///
/// # Example: calling Tokio Runtime from a custom ThreadPool
///
/// ```no_run
- /// use tokio_util::context::HandleExt;
+ /// use tokio_util::context::RuntimeExt;
/// use tokio::time::{delay_for, Duration};
///
- /// let mut rt = tokio::runtime::Builder::new()
+ /// let rt = tokio::runtime::Builder::new()
/// .threaded_scheduler()
/// .enable_all()
/// .build().unwrap();
@@ -61,18 +61,17 @@ pub trait HandleExt {
///
/// rt.block_on(
/// rt2
- /// .handle()
/// .wrap(async { delay_for(Duration::from_millis(2)).await }),
/// );
///```
- fn wrap<F: Future>(&self, fut: F) -> TokioContext<F>;
+ fn wrap<F: Future>(&self, fut: F) -> TokioContext<'_, F>;
}
-impl HandleExt for Handle {
- fn wrap<F: Future>(&self, fut: F) -> TokioContext<F> {
+impl RuntimeExt for Runtime {
+ fn wrap<F: Future>(&self, fut: F) -> TokioContext<'_, F> {
TokioContext {
inner: fut,
- handle: self.clone(),
+ handle: self,
}
}
}
diff --git a/tokio-util/tests/context.rs b/tokio-util/tests/context.rs
index 49038ddb..2e39b144 100644
--- a/tokio-util/tests/context.rs
+++ b/tokio-util/tests/context.rs
@@ -2,11 +2,11 @@
use tokio::runtime::Builder;
use tokio::time::*;
-use tokio_util::context::HandleExt;
+use tokio_util::context::RuntimeExt;
#[test]
fn tokio_context_with_another_runtime() {
- let mut rt1 = Builder::new()
+ let rt1 = Builder::new()
.threaded_scheduler()
.core_threads(1)
// no timer!
@@ -21,8 +21,5 @@ fn tokio_context_with_another_runtime() {
// Without the `HandleExt.wrap()` there would be a panic because there is
// no timer running, since it would be referencing runtime r1.
- let _ = rt1.block_on(
- rt2.handle()
- .wrap(async move { delay_for(Duration::from_millis(2)).await }),
- );
+ let _ = rt1.block_on(rt2.wrap(async move { delay_for(Duration::from_millis(2)).await }));
}
diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs
index 785968f4..9054c3b8 100644
--- a/tokio/src/io/poll_evented.rs
+++ b/tokio/src/io/poll_evented.rs
@@ -173,7 +173,7 @@ where
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new(io: E) -> io::Result<Self> {
PollEvented::new_with_ready(io, mio::Ready::all())
}
@@ -201,7 +201,7 @@ where
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new_with_ready(io: E, ready: mio::Ready) -> io::Result<Self> {
let registration = Registration::new_with_ready(&io, ready)?;
Ok(Self {
diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs
index 63aaff56..82065072 100644
--- a/tokio/src/io/registration.rs
+++ b/tokio/src/io/registration.rs
@@ -67,7 +67,7 @@ impl Registration {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new<T>(io: &T) -> io::Result<Registration>
where
T: Evented,
@@ -104,7 +104,7 @@ impl Registration {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new_with_ready<T>(io: &T, ready: mio::Ready) -> io::Result<Registration>
where
T: Evented,
diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs
index fd79b259..44945e38 100644
--- a/tokio/src/net/tcp/listener.rs
+++ b/tokio/src/net/tcp/listener.rs
@@ -262,7 +262,7 @@ impl TcpListener {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(listener: net::TcpListener) -> io::Result<TcpListener> {
let io = mio::net::TcpListener::from_std(listener)?;
let io = PollEvented::new(io)?;
diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs
index e624fb9d..e0348724 100644
--- a/tokio/src/net/tcp/stream.rs
+++ b/tokio/src/net/tcp/stream.rs
@@ -187,7 +187,7 @@ impl TcpStream {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(stream: net::TcpStream) -> io::Result<TcpStream> {
let io = mio::net::TcpStream::from_stream(stream)?;
let io = PollEvented::new(io)?;
diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs
index 16e53773..f9d88372 100644
--- a/tokio/src/net/udp/socket.rs
+++ b/tokio/src/net/udp/socket.rs
@@ -64,7 +64,7 @@ impl UdpSocket {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> {
let io = mio::net::UdpSocket::from_socket(socket)?;
let io = PollEvented::new(io)?;
diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs
index 2282f96a..ba3a10c4 100644
--- a/tokio/src/net/unix/datagram/socket.rs
+++ b/tokio/src/net/unix/datagram/socket.rs
@@ -164,7 +164,7 @@ impl UnixDatagram {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a Tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
/// # Examples
/// ```
/// # use std::error::Error;
diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs
index 9b76cb01..119dc6fb 100644
--- a/tokio/src/net/unix/listener.rs
+++ b/tokio/src/net/unix/listener.rs
@@ -60,7 +60,7 @@ impl UnixListener {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn bind<P>(path: P) -> io::Result<UnixListener>
where
P: AsRef<Path>,
@@ -82,7 +82,7 @@ impl UnixListener {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(listener: net::UnixListener) -> io::Result<UnixListener> {
let listener = mio_uds::UnixListener::from_listener(listener)?;
let io = PollEvented::new(listener)?;
diff --git a/tokio/src/net/unix/stream.rs b/tokio/src/net/unix/stream.rs
index 559fe02a..6f49849a 100644
--- a/tokio/src/net/unix/stream.rs
+++ b/tokio/src/net/unix/stream.rs
@@ -54,7 +54,7 @@ impl UnixStream {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> {
let stream = mio_uds::UnixStream::from_stream(stream)?;
let io = PollEvented::new(stream)?;
diff --git a/tokio/src/park/mod.rs b/tokio/src/park/mod.rs
index 2cfef8c2..4085a99a 100644
--- a/tokio/src/park/mod.rs
+++ b/tokio/src/park/mod.rs
@@ -42,9 +42,7 @@ cfg_resource_drivers! {
mod thread;
pub(crate) use self::thread::ParkThread;
-cfg_block_on! {
- pub(crate) use self::thread::{CachedParkThread, ParkError};
-}
+pub(crate) use self::thread::{CachedParkThread, ParkError};
use std::sync::Arc;
use std::time::Duration;
diff --git a/tokio/src/park/thread.rs b/tokio/src/park/thread.rs
index 44174d35..9ed41310 100644
--- a/tokio/src/park/thread.rs
+++ b/tokio/src/park/thread.rs
@@ -212,118 +212,114 @@ impl Unpark for UnparkThread {
}
}
-cfg_block_on! {
- use std::marker::PhantomData;
- use std::rc::Rc;
+use std::marker::PhantomData;
+use std::rc::Rc;
- use std::mem;
- use std::task::{RawWaker, RawWakerVTable, Waker};
+use std::mem;
+use std::task::{RawWaker, RawWakerVTable, Waker};
- /// Blocks the current thread using a condition variable.
- #[derive(Debug)]
- pub(crate) struct CachedParkThread {
- _anchor: PhantomData<Rc<()>>,
- }
-
- impl CachedParkThread {
- /// Create a new `ParkThread` handle for the current thread.
- ///
- /// This type cannot be moved to other threads, so it should be created on
- /// the thread that the caller intends to park.
- pub(crate) fn new() -> CachedParkThread {
- CachedParkThread {
- _anchor: PhantomData,
- }
- }
-
- pub(crate) fn get_unpark(&self) -> Result<UnparkThread, ParkError> {
- self.with_current(|park_thread| park_thread.unpark())
- }
+/// Blocks the current thread using a condition variable.
+#[derive(Debug)]
+pub(crate) struct CachedParkThread {
+ _anchor: PhantomData<Rc<()>>,
+}
- /// Get a reference to the `ParkThread` handle for this thread.
- fn with_current<F, R>(&self, f: F) -> Result<R, ParkError>
- where
- F: FnOnce(&ParkThread) -> R,
- {
- CURRENT_PARKER.try_with(|inner| f(inner))
- .map_err(|_| ())
+impl CachedParkThread {
+ /// Create a new `ParkThread` handle for the current thread.
+ ///
+ /// This type cannot be moved to other threads, so it should be created on
+ /// the thread that the caller intends to park.
+ pub(crate) fn new() -> CachedParkThread {
+ CachedParkThread {
+ _anchor: PhantomData,
}
}
- impl Park for CachedParkThread {
- type Unpark = UnparkThread;
- type Error = ParkError;
+ pub(crate) fn get_unpark(&self) -> Result<UnparkThread, ParkError> {
+ self.with_current(|park_thread| park_thread.unpark())
+ }
- fn unpark(&self) -> Self::Unpark {
- self.get_unpark().unwrap()
- }
+ /// Get a reference to the `ParkThread` handle for this thread.
+ fn with_current<F, R>(&self, f: F) -> Result<R, ParkError>
+ where
+ F: FnOnce(&ParkThread) -> R,
+ {
+ CURRENT_PARKER.try_with(|inner| f(inner)).map_err(|_| ())
+ }
+}
- fn park(&mut self) -> Result<(), Self::Error> {
- self.with_current(|park_thread| park_thread.inner.park())?;
- Ok(())
- }
+impl Park for CachedParkThread {
+ type Unpark = UnparkThread;
+ type Error = ParkError;
- fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
- self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?;
- Ok(())
- }
+ fn unpark(&self) -> Self::Unpark {
+ self.get_unpark().unwrap()
+ }
- fn shutdown(&mut self) {
- let _ = self.with_current(|park_thread| park_thread.inner.shutdown());
- }
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.with_current(|park_thread| park_thread.inner.park())?;
+ Ok(())
}
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?;
+ Ok(())
+ }
- impl UnparkThread {
- pub(crate) fn into_waker(self) -> Waker {
- unsafe {
- let raw = unparker_to_raw_waker(self.inner);
- Waker::from_raw(raw)
- }
- }
+ fn shutdown(&mut self) {
+ let _ = self.with_current(|park_thread| park_thread.inner.shutdown());
}
+}
- impl Inner {
- #[allow(clippy::wrong_self_convention)]
- fn into_raw(this: Arc<Inner>) -> *const () {
- Arc::into_raw(this) as *const ()
+impl UnparkThread {
+ pub(crate) fn into_waker(self) -> Waker {
+ unsafe {
+ let raw = unparker_to_raw_waker(self.inner);
+ Waker::from_raw(raw)
}
+ }
+}
- unsafe fn from_raw(ptr: *const ()) -> Arc<Inner> {
- Arc::from_raw(ptr as *const Inner)
- }
+impl Inner {
+ #[allow(clippy::wrong_self_convention)]
+ fn into_raw(this: Arc<Inner>) -> *const () {
+ Arc::into_raw(this) as *const ()
}
- unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker {
- RawWaker::new(
- Inner::into_raw(unparker),
- &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker),
- )
+ unsafe fn from_raw(ptr: *const ()) -> Arc<Inner> {
+ Arc::from_raw(ptr as *const Inner)
}
+}
- unsafe fn clone(raw: *const ()) -> RawWaker {
- let unparker = Inner::from_raw(raw);
+unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker {
+ RawWaker::new(
+ Inner::into_raw(unparker),
+ &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker),
+ )
+}
- // Increment the ref count
- mem::forget(unparker.clone());
+unsafe fn clone(raw: *const ()) -> RawWaker {
+ let unparker = Inner::from_raw(raw);
- unparker_to_raw_waker(unparker)
- }
+ // Increment the ref count
+ mem::forget(unparker.clone());
- unsafe fn drop_waker(raw: *const ()) {
- let _ = Inner::from_raw(raw);
- }
+ unparker_to_raw_waker(unparker)
+}
- unsafe fn wake(raw: *const ()) {
- let unparker = Inner::from_raw(raw);
- unparker.unpark();
- }
+unsafe fn drop_waker(raw: *const ()) {
+ let _ = Inner::from_raw(raw);
+}
- unsafe fn wake_by_ref(raw: *const ()) {
- let unparker = Inner::from_raw(raw);
- unparker.unpark();
+unsafe fn wake(raw: *const ()) {