diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-16 08:28:34 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-16 08:28:34 -0800 |
commit | 19f1fc36bd567377bde4a2c6818c6b606d89d488 (patch) | |
tree | 44ab1d6dceabbb8353ab6369779cce4d3333075f /tokio | |
parent | 3f0eabe7798de624f5ee9c7562803bfb97e6088f (diff) |
task: return `JoinHandle` from spawn (#1777)
`tokio::spawn` now returns a `JoinHandle` to obtain the result of the task:
Closes #887.
Diffstat (limited to 'tokio')
-rw-r--r-- | tokio/src/lib.rs | 6 | ||||
-rw-r--r-- | tokio/src/runtime/basic_scheduler.rs | 8 | ||||
-rw-r--r-- | tokio/src/runtime/global.rs | 52 | ||||
-rw-r--r-- | tokio/src/runtime/mod.rs | 2 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/slice.rs | 9 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/spawner.rs | 8 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/tests/loom_pool.rs | 20 | ||||
-rw-r--r-- | tokio/src/task/mod.rs | 6 | ||||
-rw-r--r-- | tokio/src/task/raw.rs | 1 | ||||
-rw-r--r-- | tokio/src/task/spawn.rs | 53 | ||||
-rw-r--r-- | tokio/src/task/state.rs | 1 | ||||
-rw-r--r-- | tokio/tests/rt_common.rs | 27 |
12 files changed, 112 insertions, 81 deletions
diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 4be056ec..cfb90549 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -121,6 +121,8 @@ pub mod sync; #[cfg(feature = "rt-core")] pub mod task; +#[cfg(feature = "rt-core")] +pub use crate::task::spawn; #[cfg(feature = "time")] pub mod time; @@ -128,10 +130,6 @@ pub mod time; #[cfg(feature = "rt-full")] mod util; -#[doc(inline)] -#[cfg(feature = "rt-core")] -pub use crate::runtime::spawn; - #[cfg(not(test))] // Work around for rust-lang/rust#62127 #[cfg(feature = "macros")] #[doc(inline)] diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index a4991c1e..cb99cf18 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -225,12 +225,14 @@ impl SchedulerPriv { /// /// Must be called from the same thread that holds the `BasicScheduler` /// value. - pub(super) unsafe fn spawn_background<F>(&self, future: F) + pub(super) unsafe fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> where - F: Future<Output = ()> + Send + 'static, + F: Future + Send + 'static, + F::Output: Send + 'static, { - let task = task::background(future); + let (task, handle) = task::joinable(future); self.schedule_local(task); + handle } unsafe fn schedule_local(&self, task: Task<Self>) { diff --git a/tokio/src/runtime/global.rs b/tokio/src/runtime/global.rs index 8a2641b9..146b678c 100644 --- a/tokio/src/runtime/global.rs +++ b/tokio/src/runtime/global.rs @@ -1,4 +1,5 @@ use crate::runtime::basic_scheduler; +use crate::task::JoinHandle; #[cfg(feature = "rt-full")] use crate::runtime::thread_pool; @@ -27,64 +28,23 @@ thread_local! { // ===== global spawn fns ===== /// Spawns a future on the default executor. -/// -/// In order for a future to do work, it must be spawned on an executor. The -/// `spawn` function is the easiest way to do this. It spawns a future on the -/// [default executor] for the current execution context (tracked using a -/// thread-local variable). -/// -/// The default executor is **usually** a thread pool. -/// -/// # Examples -/// -/// In this example, a server is started and `spawn` is used to start a new task -/// that processes each received connection. -/// -/// ``` -/// use tokio::net::TcpListener; -/// -/// # async fn process<T>(_t: T) {} -/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { -/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?; -/// -/// loop { -/// let (socket, _) = listener.accept().await?; -/// -/// tokio::spawn(async move { -/// // Process each socket concurrently. -/// process(socket).await -/// }); -/// } -/// # } -/// ``` -/// -/// [default executor]: struct.DefaultExecutor.html -/// -/// # Panics -/// -/// This function will panic if the default executor is not set or if spawning -/// onto the default executor returns an error. To avoid the panic, use -/// [`DefaultExecutor`]. -/// -/// [`DefaultExecutor`]: struct.DefaultExecutor.html -pub fn spawn<T>(future: T) +pub(crate) fn spawn<T>(future: T) -> JoinHandle<T::Output> where - T: Future<Output = ()> + Send + 'static, + T: Future + Send + 'static, + T::Output: Send + 'static, { EXECUTOR.with(|current_executor| match current_executor.get() { #[cfg(feature = "rt-full")] State::ThreadPool(thread_pool_ptr) => { let thread_pool = unsafe { &*thread_pool_ptr }; - thread_pool.spawn_background(future); + thread_pool.spawn(future) } State::Basic(basic_scheduler_ptr) => { let basic_scheduler = unsafe { &*basic_scheduler_ptr }; // Safety: The `BasicScheduler` value set the thread-local (same // thread). - unsafe { - basic_scheduler.spawn_background(future); - } + unsafe { basic_scheduler.spawn(future) } } State::Empty => { // Explicit drop of `future` silences the warning that `future` is diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index b2a5ba50..8f66b725 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -149,7 +149,7 @@ use self::enter::enter; #[cfg(feature = "rt-core")] mod global; #[cfg(feature = "rt-core")] -pub use self::global::spawn; +pub(crate) use self::global::spawn; mod handle; pub use self::handle::Handle; diff --git a/tokio/src/runtime/thread_pool/slice.rs b/tokio/src/runtime/thread_pool/slice.rs index 1a0bd381..4b3ef996 100644 --- a/tokio/src/runtime/thread_pool/slice.rs +++ b/tokio/src/runtime/thread_pool/slice.rs @@ -95,15 +95,6 @@ where } } - pub(crate) fn spawn_background<F>(&self, future: F) - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let task = task::background(future); - self.schedule(task); - } - pub(crate) fn schedule(&self, task: Task<Shared<P>>) { current::get(|current_worker| match current_worker.as_member(self) { Some(worker) => { diff --git a/tokio/src/runtime/thread_pool/spawner.rs b/tokio/src/runtime/thread_pool/spawner.rs index b7031c43..e2975313 100644 --- a/tokio/src/runtime/thread_pool/spawner.rs +++ b/tokio/src/runtime/thread_pool/spawner.rs @@ -37,14 +37,6 @@ impl Spawner { self.workers.spawn_typed(future) } - /// Spawn a task in the background - pub(crate) fn spawn_background<F>(&self, future: F) - where - F: Future<Output = ()> + Send + 'static, - { - self.workers.spawn_background(future); - } - /// Reference to the worker set. Used by `ThreadPool` to initiate shutdown. pub(super) fn workers(&self) -> &slice::Set<Box<dyn Unpark>> { &*self.workers diff --git a/tokio/src/runtime/thread_pool/tests/loom_pool.rs b/tokio/src/runtime/thread_pool/tests/loom_pool.rs index 065d515e..7b8becf2 100644 --- a/tokio/src/runtime/thread_pool/tests/loom_pool.rs +++ b/tokio/src/runtime/thread_pool/tests/loom_pool.rs @@ -150,18 +150,22 @@ fn pool_shutdown() { #[test] fn complete_block_on_under_load() { + use futures::FutureExt; + loom::model(|| { let pool = mk_pool(2); - pool.block_on(async { - // Spin hard - crate::spawn(async { - for _ in 0..2 { - yield_once().await; - } - }); + pool.block_on({ + futures::future::lazy(|_| ()).then(|_| { + // Spin hard + crate::spawn(async { + for _ in 0..2 { + yield_once().await; + } + }); - gated2(true).await + gated2(true) + }) }); }); } diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index a0175456..709be16b 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -21,6 +21,11 @@ pub(crate) use self::list::OwnedList; mod raw; use self::raw::RawTask; +#[cfg(feature = "rt-core")] +mod spawn; +#[cfg(feature = "rt-core")] +pub use spawn::spawn; + mod stack; pub(crate) use self::stack::TransferStack; @@ -70,6 +75,7 @@ pub(crate) trait Schedule: Send + Sync + Sized + 'static { } /// Create a new task without an associated join handle +#[cfg(feature = "rt-full")] pub(crate) fn background<T, S>(task: T) -> Task<S> where T: Future + Send + 'static, diff --git a/tokio/src/task/raw.rs b/tokio/src/task/raw.rs index d6542a16..899e5aa3 100644 --- a/tokio/src/task/raw.rs +++ b/tokio/src/task/raw.rs @@ -55,6 +55,7 @@ pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable { } impl RawTask { + #[cfg(feature = "rt-full")] pub(super) fn new_background<T, S>(task: T) -> RawTask where T: Future + Send + 'static, diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs new file mode 100644 index 00000000..6fdff651 --- /dev/null +++ b/tokio/src/task/spawn.rs @@ -0,0 +1,53 @@ +use crate::runtime; +use crate::task::JoinHandle; + +use std::future::Future; + +/// Spawns a new asynchronous task, returning a +/// [`JoinHandle`](super::JoinHandle)] for it. +/// +/// Spawning a task enables the task to execute concurrently to other tasks. The +/// spawned task may execute on the current thread, or it may be sent to a +/// different thread to be executed. The specifics depend on the current +/// [`Runtime`](crate::runtime::Runtime) configuration. +/// +/// # Examples +/// +/// In this example, a server is started and `spawn` is used to start a new task +/// that processes each received connection. +/// +/// ```no_run +/// use tokio::net::{TcpListener, TcpStream}; +/// +/// use std::io; +/// +/// async fn process(socket: TcpStream) { +/// // ... +/// # drop(socket); +/// } +/// +/// #[tokio::main] +/// async fn main() -> io::Result<()> { +/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?; +/// +/// loop { +/// let (socket, _) = listener.accept().await?; +/// +/// tokio::spawn(async move { +/// // Process each socket concurrently. +/// process(socket).await +/// }); +/// } +/// } +/// ``` +/// +/// # Panics +/// +/// Panics if called from **outside** of the Tokio runtime. +pub fn spawn<T>(task: T) -> JoinHandle<T::Output> +where + T: Future + Send + 'static, + T::Output: Send + 'static, +{ + runtime::spawn(task) +} diff --git a/tokio/src/task/state.rs b/tokio/src/task/state.rs index 3adfea91..e10f0601 100644 --- a/tokio/src/task/state.rs +++ b/tokio/src/task/state.rs @@ -58,6 +58,7 @@ const INITIAL_STATE: usize = NOTIFIED; /// unambiguous modification order. impl State { /// Starts with a ref count of 1 + #[cfg(feature = "rt-full")] pub(super) fn new_background() -> State { State { val: AtomicUsize::new(INITIAL_STATE), diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 6e8e58fb..81fe6801 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -80,7 +80,7 @@ rt_test! { } #[test] - fn spawn_one() { + fn spawn_one_bg() { let mut rt = rt(); let out = rt.block_on(async { @@ -97,6 +97,29 @@ rt_test! { } #[test] + fn spawn_one_join() { + let mut rt = rt(); + + let out = rt.block_on(async { + let (tx, rx) = oneshot::channel(); + + let handle = tokio::spawn(async move { + tx.send("ZOMG").unwrap(); + "DONE" + }); + + let msg = assert_ok!(rx.await); + + let out = assert_ok!(handle.await); + assert_eq!(out, "DONE"); + + msg + }); + + assert_eq!(out, "ZOMG"); + } + + #[test] fn spawn_two() { let mut rt = rt(); @@ -180,7 +203,7 @@ rt_test! { tokio::spawn(poll_fn(move |_| { assert_eq!(2, Arc::strong_count(&cnt)); - Poll::Pending + Poll::<()>::Pending })); }); |