summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-12-22 13:03:44 -0800
committerGitHub <noreply@github.com>2019-12-22 13:03:44 -0800
commitadc5186ebd1290c2f144e153a87e147d257f8b0f (patch)
tree507624871353d166db8e822208d06e873ea8611f /tokio
parent7b53b7b659fe1feeb30e768cad8fdadf9531beb6 (diff)
rt: fix storing Runtime in thread-local (#2011)
Storing a `Runtime` value in a thread-local resulted in a panic due to the inability to access the parker. This fixes the bug by skipping parking if it fails. In general, there isn't much that we can do besides not parking. Fixes #593
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/park/mod.rs2
-rw-r--r--tokio/src/park/thread.rs15
-rw-r--r--tokio/src/runtime/blocking/shutdown.rs4
-rw-r--r--tokio/src/runtime/enter.rs11
-rw-r--r--tokio/src/runtime/thread_pool/mod.rs2
-rw-r--r--tokio/tests/rt_common.rs18
6 files changed, 41 insertions, 11 deletions
diff --git a/tokio/src/park/mod.rs b/tokio/src/park/mod.rs
index 9c1958c3..13dfee2c 100644
--- a/tokio/src/park/mod.rs
+++ b/tokio/src/park/mod.rs
@@ -43,7 +43,7 @@ mod thread;
pub(crate) use self::thread::ParkThread;
cfg_blocking_impl! {
- pub(crate) use self::thread::CachedParkThread;
+ pub(crate) use self::thread::{CachedParkThread, ParkError};
}
use std::sync::Arc;
diff --git a/tokio/src/park/thread.rs b/tokio/src/park/thread.rs
index 59513e1d..853d4d8a 100644
--- a/tokio/src/park/thread.rs
+++ b/tokio/src/park/thread.rs
@@ -230,12 +230,17 @@ cfg_blocking_impl! {
}
}
+ pub(crate) fn get_unpark(&self) -> Result<UnparkThread, ParkError> {
+ self.with_current(|park_thread| park_thread.unpark())
+ }
+
/// Get a reference to the `ParkThread` handle for this thread.
- fn with_current<F, R>(&self, f: F) -> R
+ fn with_current<F, R>(&self, f: F) -> Result<R, ParkError>
where
F: FnOnce(&ParkThread) -> R,
{
- CURRENT_PARKER.with(|inner| f(inner))
+ CURRENT_PARKER.try_with(|inner| f(inner))
+ .map_err(|_| ParkError { _p: () })
}
}
@@ -244,16 +249,16 @@ cfg_blocking_impl! {
type Error = ParkError;
fn unpark(&self) -> Self::Unpark {
- self.with_current(|park_thread| park_thread.unpark())
+ self.get_unpark().unwrap()
}
fn park(&mut self) -> Result<(), Self::Error> {
- self.with_current(|park_thread| park_thread.inner.park());
+ self.with_current(|park_thread| park_thread.inner.park())?;
Ok(())
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
- self.with_current(|park_thread| park_thread.inner.park_timeout(duration));
+ self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?;
Ok(())
}
}
diff --git a/tokio/src/runtime/blocking/shutdown.rs b/tokio/src/runtime/blocking/shutdown.rs
index d9f5eb0f..8b34dbec 100644
--- a/tokio/src/runtime/blocking/shutdown.rs
+++ b/tokio/src/runtime/blocking/shutdown.rs
@@ -39,6 +39,10 @@ impl Receiver {
};
// The oneshot completes with an Err
+ //
+ // If blocking fails to wait, this indicates a problem parking the
+ // current thread (usually, shutting down a runtime stored in a
+ // thread-local).
let _ = e.block_on(&mut self.rx);
}
}
diff --git a/tokio/src/runtime/enter.rs b/tokio/src/runtime/enter.rs
index b716c2a9..35bb3581 100644
--- a/tokio/src/runtime/enter.rs
+++ b/tokio/src/runtime/enter.rs
@@ -74,10 +74,12 @@ pub(crate) fn exit<F: FnOnce() -> R, R>(f: F) -> R {
}
cfg_blocking_impl! {
+ use crate::park::ParkError;
+
impl Enter {
/// Blocks the thread on the specified future, returning the value with
/// which that future completes.
- pub(crate) fn block_on<F>(&mut self, mut f: F) -> F::Output
+ pub(crate) fn block_on<F>(&mut self, mut f: F) -> Result<F::Output, ParkError>
where
F: std::future::Future,
{
@@ -87,7 +89,7 @@ cfg_blocking_impl! {
use std::task::Poll::Ready;
let mut park = CachedParkThread::new();
- let waker = park.unpark().into_waker();
+ let waker = park.get_unpark()?.into_waker();
let mut cx = Context::from_waker(&waker);
// `block_on` takes ownership of `f`. Once it is pinned here, the original `f` binding can
@@ -96,9 +98,10 @@ cfg_blocking_impl! {
loop {
if let Ready(v) = f.as_mut().poll(&mut cx) {
- return v;
+ return Ok(v);
}
- park.park().unwrap();
+
+ park.park()?;
}
}
}
diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs
index 3d4c69c2..6a50fe9c 100644
--- a/tokio/src/runtime/thread_pool/mod.rs
+++ b/tokio/src/runtime/thread_pool/mod.rs
@@ -91,7 +91,7 @@ impl ThreadPool {
{
self.spawner.enter(|| {
let mut enter = crate::runtime::enter();
- enter.block_on(future)
+ enter.block_on(future).expect("failed to park thread")
})
}
}
diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs
index 92687473..00256d05 100644
--- a/tokio/tests/rt_common.rs
+++ b/tokio/tests/rt_common.rs
@@ -617,6 +617,24 @@ rt_test! {
assert_ok!(drop_rx.recv());
}
+ #[test]
+ fn runtime_in_thread_local() {
+ use std::cell::RefCell;
+ use std::thread;
+
+ thread_local!(
+ static R: RefCell<Option<Runtime>> = RefCell::new(None);
+ );
+
+ thread::spawn(|| {
+ R.with(|cell| {
+ *cell.borrow_mut() = Some(rt());
+ });
+
+ let _rt = rt();
+ }).join().unwrap();
+ }
+
async fn client_server(tx: mpsc::Sender<()>) {
let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);