summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/io/poll_evented.rs4
-rw-r--r--tokio/src/io/registration.rs4
-rw-r--r--tokio/src/net/tcp/listener.rs2
-rw-r--r--tokio/src/net/tcp/stream.rs2
-rw-r--r--tokio/src/net/udp/socket.rs2
-rw-r--r--tokio/src/net/unix/datagram/socket.rs2
-rw-r--r--tokio/src/net/unix/listener.rs4
-rw-r--r--tokio/src/net/unix/stream.rs2
-rw-r--r--tokio/src/park/mod.rs4
-rw-r--r--tokio/src/park/thread.rs170
-rw-r--r--tokio/src/runtime/basic_scheduler.rs1
-rw-r--r--tokio/src/runtime/blocking/mod.rs1
-rw-r--r--tokio/src/runtime/blocking/pool.rs5
-rw-r--r--tokio/src/runtime/builder.rs18
-rw-r--r--tokio/src/runtime/context.rs6
-rw-r--r--tokio/src/runtime/enter.rs46
-rw-r--r--tokio/src/runtime/handle.rs339
-rw-r--r--tokio/src/runtime/mod.rs94
-rw-r--r--tokio/src/runtime/tests/loom_pool.rs4
-rw-r--r--tokio/src/signal/registry.rs4
-rw-r--r--tokio/src/signal/windows.rs2
-rw-r--r--tokio/src/task/local.rs10
-rw-r--r--tokio/src/task/spawn.rs2
-rw-r--r--tokio/tests/io_driver.rs2
-rw-r--r--tokio/tests/rt_basic.rs6
-rw-r--r--tokio/tests/rt_common.rs140
-rw-r--r--tokio/tests/rt_threaded.rs20
-rw-r--r--tokio/tests/signal_drop_rt.rs4
-rw-r--r--tokio/tests/signal_multi_rt.rs2
-rw-r--r--tokio/tests/task_blocking.rs18
-rw-r--r--tokio/tests/task_local_set.rs18
-rw-r--r--tokio/tests/time_rt.rs2
32 files changed, 291 insertions, 649 deletions
diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs
index 785968f4..9054c3b8 100644
--- a/tokio/src/io/poll_evented.rs
+++ b/tokio/src/io/poll_evented.rs
@@ -173,7 +173,7 @@ where
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new(io: E) -> io::Result<Self> {
PollEvented::new_with_ready(io, mio::Ready::all())
}
@@ -201,7 +201,7 @@ where
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new_with_ready(io: E, ready: mio::Ready) -> io::Result<Self> {
let registration = Registration::new_with_ready(&io, ready)?;
Ok(Self {
diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs
index 63aaff56..82065072 100644
--- a/tokio/src/io/registration.rs
+++ b/tokio/src/io/registration.rs
@@ -67,7 +67,7 @@ impl Registration {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new<T>(io: &T) -> io::Result<Registration>
where
T: Evented,
@@ -104,7 +104,7 @@ impl Registration {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new_with_ready<T>(io: &T, ready: mio::Ready) -> io::Result<Registration>
where
T: Evented,
diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs
index fd79b259..44945e38 100644
--- a/tokio/src/net/tcp/listener.rs
+++ b/tokio/src/net/tcp/listener.rs
@@ -262,7 +262,7 @@ impl TcpListener {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(listener: net::TcpListener) -> io::Result<TcpListener> {
let io = mio::net::TcpListener::from_std(listener)?;
let io = PollEvented::new(io)?;
diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs
index e624fb9d..e0348724 100644
--- a/tokio/src/net/tcp/stream.rs
+++ b/tokio/src/net/tcp/stream.rs
@@ -187,7 +187,7 @@ impl TcpStream {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(stream: net::TcpStream) -> io::Result<TcpStream> {
let io = mio::net::TcpStream::from_stream(stream)?;
let io = PollEvented::new(io)?;
diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs
index 16e53773..f9d88372 100644
--- a/tokio/src/net/udp/socket.rs
+++ b/tokio/src/net/udp/socket.rs
@@ -64,7 +64,7 @@ impl UdpSocket {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> {
let io = mio::net::UdpSocket::from_socket(socket)?;
let io = PollEvented::new(io)?;
diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs
index 2282f96a..ba3a10c4 100644
--- a/tokio/src/net/unix/datagram/socket.rs
+++ b/tokio/src/net/unix/datagram/socket.rs
@@ -164,7 +164,7 @@ impl UnixDatagram {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a Tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
/// # Examples
/// ```
/// # use std::error::Error;
diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs
index 9b76cb01..119dc6fb 100644
--- a/tokio/src/net/unix/listener.rs
+++ b/tokio/src/net/unix/listener.rs
@@ -60,7 +60,7 @@ impl UnixListener {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn bind<P>(path: P) -> io::Result<UnixListener>
where
P: AsRef<Path>,
@@ -82,7 +82,7 @@ impl UnixListener {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(listener: net::UnixListener) -> io::Result<UnixListener> {
let listener = mio_uds::UnixListener::from_listener(listener)?;
let io = PollEvented::new(listener)?;
diff --git a/tokio/src/net/unix/stream.rs b/tokio/src/net/unix/stream.rs
index 559fe02a..6f49849a 100644
--- a/tokio/src/net/unix/stream.rs
+++ b/tokio/src/net/unix/stream.rs
@@ -54,7 +54,7 @@ impl UnixStream {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> {
let stream = mio_uds::UnixStream::from_stream(stream)?;
let io = PollEvented::new(stream)?;
diff --git a/tokio/src/park/mod.rs b/tokio/src/park/mod.rs
index 2cfef8c2..4085a99a 100644
--- a/tokio/src/park/mod.rs
+++ b/tokio/src/park/mod.rs
@@ -42,9 +42,7 @@ cfg_resource_drivers! {
mod thread;
pub(crate) use self::thread::ParkThread;
-cfg_block_on! {
- pub(crate) use self::thread::{CachedParkThread, ParkError};
-}
+pub(crate) use self::thread::{CachedParkThread, ParkError};
use std::sync::Arc;
use std::time::Duration;
diff --git a/tokio/src/park/thread.rs b/tokio/src/park/thread.rs
index 44174d35..9ed41310 100644
--- a/tokio/src/park/thread.rs
+++ b/tokio/src/park/thread.rs
@@ -212,118 +212,114 @@ impl Unpark for UnparkThread {
}
}
-cfg_block_on! {
- use std::marker::PhantomData;
- use std::rc::Rc;
+use std::marker::PhantomData;
+use std::rc::Rc;
- use std::mem;
- use std::task::{RawWaker, RawWakerVTable, Waker};
+use std::mem;
+use std::task::{RawWaker, RawWakerVTable, Waker};
- /// Blocks the current thread using a condition variable.
- #[derive(Debug)]
- pub(crate) struct CachedParkThread {
- _anchor: PhantomData<Rc<()>>,
- }
-
- impl CachedParkThread {
- /// Create a new `ParkThread` handle for the current thread.
- ///
- /// This type cannot be moved to other threads, so it should be created on
- /// the thread that the caller intends to park.
- pub(crate) fn new() -> CachedParkThread {
- CachedParkThread {
- _anchor: PhantomData,
- }
- }
-
- pub(crate) fn get_unpark(&self) -> Result<UnparkThread, ParkError> {
- self.with_current(|park_thread| park_thread.unpark())
- }
+/// Blocks the current thread using a condition variable.
+#[derive(Debug)]
+pub(crate) struct CachedParkThread {
+ _anchor: PhantomData<Rc<()>>,
+}
- /// Get a reference to the `ParkThread` handle for this thread.
- fn with_current<F, R>(&self, f: F) -> Result<R, ParkError>
- where
- F: FnOnce(&ParkThread) -> R,
- {
- CURRENT_PARKER.try_with(|inner| f(inner))
- .map_err(|_| ())
+impl CachedParkThread {
+ /// Create a new `ParkThread` handle for the current thread.
+ ///
+ /// This type cannot be moved to other threads, so it should be created on
+ /// the thread that the caller intends to park.
+ pub(crate) fn new() -> CachedParkThread {
+ CachedParkThread {
+ _anchor: PhantomData,
}
}
- impl Park for CachedParkThread {
- type Unpark = UnparkThread;
- type Error = ParkError;
+ pub(crate) fn get_unpark(&self) -> Result<UnparkThread, ParkError> {
+ self.with_current(|park_thread| park_thread.unpark())
+ }
- fn unpark(&self) -> Self::Unpark {
- self.get_unpark().unwrap()
- }
+ /// Get a reference to the `ParkThread` handle for this thread.
+ fn with_current<F, R>(&self, f: F) -> Result<R, ParkError>
+ where
+ F: FnOnce(&ParkThread) -> R,
+ {
+ CURRENT_PARKER.try_with(|inner| f(inner)).map_err(|_| ())
+ }
+}
- fn park(&mut self) -> Result<(), Self::Error> {
- self.with_current(|park_thread| park_thread.inner.park())?;
- Ok(())
- }
+impl Park for CachedParkThread {
+ type Unpark = UnparkThread;
+ type Error = ParkError;
- fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
- self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?;
- Ok(())
- }
+ fn unpark(&self) -> Self::Unpark {
+ self.get_unpark().unwrap()
+ }
- fn shutdown(&mut self) {
- let _ = self.with_current(|park_thread| park_thread.inner.shutdown());
- }
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.with_current(|park_thread| park_thread.inner.park())?;
+ Ok(())
}
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?;
+ Ok(())
+ }
- impl UnparkThread {
- pub(crate) fn into_waker(self) -> Waker {
- unsafe {
- let raw = unparker_to_raw_waker(self.inner);
- Waker::from_raw(raw)
- }
- }
+ fn shutdown(&mut self) {
+ let _ = self.with_current(|park_thread| park_thread.inner.shutdown());
}
+}
- impl Inner {
- #[allow(clippy::wrong_self_convention)]
- fn into_raw(this: Arc<Inner>) -> *const () {
- Arc::into_raw(this) as *const ()
+impl UnparkThread {
+ pub(crate) fn into_waker(self) -> Waker {
+ unsafe {
+ let raw = unparker_to_raw_waker(self.inner);
+ Waker::from_raw(raw)
}
+ }
+}
- unsafe fn from_raw(ptr: *const ()) -> Arc<Inner> {
- Arc::from_raw(ptr as *const Inner)
- }
+impl Inner {
+ #[allow(clippy::wrong_self_convention)]
+ fn into_raw(this: Arc<Inner>) -> *const () {
+ Arc::into_raw(this) as *const ()
}
- unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker {
- RawWaker::new(
- Inner::into_raw(unparker),
- &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker),
- )
+ unsafe fn from_raw(ptr: *const ()) -> Arc<Inner> {
+ Arc::from_raw(ptr as *const Inner)
}
+}
- unsafe fn clone(raw: *const ()) -> RawWaker {
- let unparker = Inner::from_raw(raw);
+unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker {
+ RawWaker::new(
+ Inner::into_raw(unparker),
+ &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker),
+ )
+}
- // Increment the ref count
- mem::forget(unparker.clone());
+unsafe fn clone(raw: *const ()) -> RawWaker {
+ let unparker = Inner::from_raw(raw);
- unparker_to_raw_waker(unparker)
- }
+ // Increment the ref count
+ mem::forget(unparker.clone());
- unsafe fn drop_waker(raw: *const ()) {
- let _ = Inner::from_raw(raw);
- }
+ unparker_to_raw_waker(unparker)
+}
- unsafe fn wake(raw: *const ()) {
- let unparker = Inner::from_raw(raw);
- unparker.unpark();
- }
+unsafe fn drop_waker(raw: *const ()) {
+ let _ = Inner::from_raw(raw);
+}
- unsafe fn wake_by_ref(raw: *const ()) {
- let unparker = Inner::from_raw(raw);
- unparker.unpark();
+unsafe fn wake(raw: *const ()) {
+ let unparker = Inner::from_raw(raw);
+ unparker.unpark();
+}
- // We don't actually own a reference to the unparker
- mem::forget(unparker);
- }
+unsafe fn wake_by_ref(raw: *const ()) {
+ let unparker = Inner::from_raw(raw);
+ unparker.unpark();
+
+ // We don't actually own a reference to the unparker
+ mem::forget(unparker);
}
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs
index 7e1c257c..48cff709 100644
--- a/tokio/src/runtime/basic_scheduler.rs
+++ b/tokio/src/runtime/basic_scheduler.rs
@@ -108,6 +108,7 @@ where
}
/// Spawns a future onto the thread pool
+ #[allow(dead_code)]
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs
index 0b36a75f..a819e9e9 100644
--- a/tokio/src/runtime/blocking/mod.rs
+++ b/tokio/src/runtime/blocking/mod.rs
@@ -15,7 +15,6 @@ cfg_blocking_impl! {
pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool {
BlockingPool::new(builder, thread_cap)
-
}
}
diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs
index 5ad5f5f8..47895fcf 100644
--- a/tokio/src/runtime/blocking/pool.rs
+++ b/tokio/src/runtime/blocking/pool.rs
@@ -6,6 +6,7 @@ use crate::runtime::blocking::schedule::NoopSchedule;
use crate::runtime::blocking::shutdown;
use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::builder::ThreadNameFn;
+use crate::runtime::context;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};
@@ -67,7 +68,7 @@ pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
{
- let rt = Handle::current();
+ let rt = context::current().expect("not currently running on the Tokio runtime.");
let (task, handle) = task::joinable(BlockingTask::new(func));
let _ = rt.blocking_spawner.spawn(task, &rt);
@@ -79,7 +80,7 @@ pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()>
where
F: FnOnce() -> R + Send + 'static,
{
- let rt = Handle::current();
+ let rt = context::current().expect("not currently running on the Tokio runtime.");
let (task, _handle) = task::joinable(BlockingTask::new(func));
rt.blocking_spawner.spawn(task, &rt)
diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs
index d1498750..db01cf58 100644
--- a/tokio/src/runtime/builder.rs
+++ b/tokio/src/runtime/builder.rs
@@ -1,9 +1,9 @@
+use crate::loom::sync::{Arc, Mutex};
use crate::runtime::handle::Handle;
use crate::runtime::shell::Shell;
use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};
use std::fmt;
-use std::sync::Arc;
/// Builds Tokio Runtime with custom configuration values.
///
@@ -67,7 +67,7 @@ pub struct Builder {
pub(super) before_stop: Option<Callback>,
}
-pub(crate) type ThreadNameFn = Arc<dyn Fn() -> String + Send + Sync + 'static>;
+pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
#[derive(Debug, Clone, Copy)]
enum Kind {
@@ -100,7 +100,7 @@ impl Builder {
max_threads: 512,
// Default thread name
- thread_name: Arc::new(|| "tokio-runtime-worker".into()),
+ thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
// Do not set a stack size by default
thread_stack_size: None,
@@ -212,7 +212,7 @@ impl Builder {
/// ```
pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
let val = val.into();
- self.thread_name = Arc::new(move || val.clone());
+ self.thread_name = std::sync::Arc::new(move || val.clone());
self
}
@@ -240,7 +240,7 @@ impl Builder {
where
F: Fn() -> String + Send + Sync + 'static,
{
- self.thread_name = Arc::new(f);
+ self.thread_name = std::sync::Arc::new(f);
self
}
@@ -293,7 +293,7 @@ impl Builder {
where
F: Fn() + Send + Sync + 'static,
{
- self.after_start = Some(Arc::new(f));
+ self.after_start = Some(std::sync::Arc::new(f));
self
}
@@ -333,7 +333,7 @@ impl Builder {
/// ```
/// use tokio::runtime::Builder;
///
- /// let mut rt = Builder::new().build().unwrap();
+ /// let rt = Builder::new().build().unwrap();
///
/// rt.block_on(async {
/// println!("Hello from the Tokio runtime");
@@ -364,7 +364,7 @@ impl Builder {
let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime {
- kind: Kind::Shell(Shell::new(driver)),
+ kind: Kind::Shell(Mutex::new(Some(Shell::new(driver)))),
handle: Handle {
spawner,
io_handle,
@@ -463,7 +463,7 @@ cfg_rt_core! {
let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime {
- kind: Kind::Basic(scheduler),
+ kind: Kind::Basic(Mutex::new(Some(scheduler))),
handle: Handle {
spawner,
io_handle,
diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs
index 1b267f48..c42b3432 100644
--- a/tokio/src/runtime/context.rs
+++ b/tokio/src/runtime/context.rs
@@ -7,8 +7,10 @@ thread_local! {
static CONTEXT: RefCell<Option<Handle>> = RefCell::new(None)
}
-pub(crate) fn current() -> Option<Handle> {
- CONTEXT.with(|ctx| ctx.borrow().clone())
+cfg_blocking_impl! {
+ pub(crate) fn current() -> Option<Handle> {
+ CONTEXT.with(|ctx| ctx.borrow().clone())
+ }
}
cfg_io_driver! {
diff --git a/tokio/src/runtime/enter.rs b/tokio/src/runtime/enter.rs
index 56a7c57b..bb6e6be0 100644
--- a/tokio/src/runtime/enter.rs
+++ b/tokio/src/runtime/enter.rs
@@ -138,31 +138,29 @@ cfg_rt_threaded! {
}
}
-cfg_block_on! {
- impl Enter {
- /// Blocks the thread on the specified future, returning the value with
- /// which that future completes.
- pub(crate) fn block_on<F>(&mut self, f: F) -> Result<F::Output, crate::park::ParkError>
- where
- F: std::future::Future,
- {
- use crate::park::{CachedParkThread, Park};
- use std::task::Context;
- use std::task::Poll::Ready;
-
- let mut park = CachedParkThread::new();
- let waker = park.get_unpark()?.into_waker();
- let mut cx = Context::from_waker(&waker);
-
- pin!(f);
-
- loop {
- if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
- return Ok(v);
- }
-
- park.park()?;
+impl Enter {
+ /// Blocks the thread on the specified future, returning the value with
+ /// which that future completes.
+ pub(crate) fn block_on<F>(&mut self, f: F) -> Result<F::Output, crate::park::ParkError>
+ where
+ F: std::future::Future,
+ {
+ use crate::park::{CachedParkThread, Park};
+ use std::task::Context;
+ use std::task::Poll::Ready;
+
+ let mut park = CachedParkThread::new();
+ let waker = park.get_unpark()?.into_waker();
+ let mut cx = Context::from_waker(&waker);
+
+ pin!(f);
+
+ loop {
+ if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
+ return Ok(v);
}
+
+ park.park()?;
}
}
}
diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs
index 0716a7fa..516ad4b3 100644
--- a/tokio/src/runtime/handle.rs
+++ b/tokio/src/runtime/handle.rs
@@ -1,16 +1,4 @@
use crate::runtime::{blocking, context, io, time, Spawner};
-use std::{error, fmt};
-
-cfg_blocking! {
- use crate::runtime::task;
- use crate::runtime::blocking::task::BlockingTask;
-}
-
-cfg_rt_core! {
- use crate::task::JoinHandle;
-
- use std::future::Future;
-}
/// Handle to the runtime.
///
@@ -19,7 +7,7 @@ cfg_rt_core! {
///
/// [`Runtime::handle`]: crate::runtime::Runtime::handle()
#[derive(Debug, Clone)]
-pub struct Handle {
+pub(crate) struct Handle {
pub(super) spawner: Spawner,
/// Handles to the I/O drivers
@@ -39,333 +27,10 @@ impl Handle {
/// Enter the runtime context. This allows you to construct types that must
/// have an executor available on creation such as [`Delay`] or [`TcpStream`].
/// It will also allow you to call methods such as [`tokio::spawn`].
- ///
- /// This function is also available as [`Runtime::enter`].
- ///
- /// [`Delay`]: struct@crate::time::Delay
- /// [`TcpStream`]: struct@crate::net::TcpStream
- /// [`Runtime::enter`]: fn@crate::runtime::Runtime::enter
- /// [`tokio::spawn`]: fn@crate::spawn
- ///
- /// # Example
- ///
- /// ```
- /// use tokio::runtime::Runtime;
- ///
- /// fn function_that_spawns(msg: String) {
- /// // Had we not used `handle.enter` below, this would panic.
- /// tokio::spawn(async move {
- /// println!("{}", msg);
- /// });
- /// }
- ///
- /// fn main() {
- /// let rt = Runtime::new().unwrap();
- /// let handle = rt.handle().clone();
- ///
- /// let s = "Hello World!".to_string();
- ///
- /// // By entering the context, we tie `tokio::spawn` to this executor.
- /// handle.enter(|| function_that_spawns(s));
- /// }
- /// ```
- pub fn enter<F, R>(&self, f: F) -> R
+ pub(crate) fn enter<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
context::enter(self.clone(), f)
}
-
- /// Returns a `Handle` view over the currently running `Runtime`
- ///
- /// # Panic
- ///
- /// This will panic if called outside the context of a Tokio runtime. That means that you must
- /// call this on one of the threads **being run by the runtime**. Calling this from within a
- /// thread created by `std::thread::spawn` (for example) will cause a panic.
- ///
- /// # Examples
- ///
- /// This can be used to obtain the handle of the surrounding runtime from an async
- /// block or function running on that runtime.
- ///
- /// ```
- /// # use std::thread;
- /// # use tokio::runtime::Runtime;
- /// # fn dox() {
- /// # let rt = Runtime::new().unwrap();
- /// # rt.spawn(async {
- /// use tokio::runtime::Handle;
- ///
- /// // Inside an async block or function.
- /// let handle = Handle::current();
- /// handle.spawn(async {
- /// println!("now running in the existing Runtime");
- /// });
- ///
- /// # let handle =
- /// thread::spawn(move || {
- /// // Notice that the handle is created outside of this thread and then moved in
- /// handle.block_on(async { /* ... */ })
- /// // This next line would cause a panic
- /// // let handle2 = Handle::current();
- /// });
- /// # handle.join().unwrap();
- /// # });
- /// # }
- /// ```
- pub fn current() -> Self {
- context::current().expect("not currently running on the Tokio runtime.")
- }
-
- /// Returns a Handle view over the currently running Runtime
- ///
- /// Returns an error if no Runtime has been started
- ///
- /// Contrary to `current`, this never panics
- pub fn try_current() -> Result<Self, TryCurrentError> {
- context::current().ok_or(TryCurrentError(()))
- }
-}
-
-cfg_rt_core! {
- impl Handle {
- /// Spawns a future onto the Tokio runtime.
- ///
- /// This spawns the given future onto the runtime's executor, usually a
- /// thread pool. The thread pool is then responsible for polling the future
- /// until it completes.
- ///
- /// See [module level][mod] documentation for more details.
- ///
- /// [mod]: index.html
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::runtime::Runtime;
- ///
- /// # fn dox() {
- /// // Create the runtime
- /// let rt = Runtime::new().unwrap();
- /// let handle = rt.handle();
- ///
- /// // Spawn a future onto the runtime
- /// handle.spawn(async {
- /// println!("now running on a worker thread");
- /// });
- /// # }
- /// ```
- ///
- /// # Panics
- ///
- /// This function will not panic unless task execution is disabled on the
- /// executor. This can only happen if the runtime was built using
- /// [`Builder`] without picking either [`basic_scheduler`] or
- /// [`threaded_scheduler`].
- ///
- /// [`Builder`]: struct@crate::runtime::Builder
- /// [`threaded_scheduler`]: fn@crate::runtime::Builder::thr