summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-16 08:28:34 -0800
committerGitHub <noreply@github.com>2019-11-16 08:28:34 -0800
commit19f1fc36bd567377bde4a2c6818c6b606d89d488 (patch)
tree44ab1d6dceabbb8353ab6369779cce4d3333075f /tokio
parent3f0eabe7798de624f5ee9c7562803bfb97e6088f (diff)
task: return `JoinHandle` from spawn (#1777)
`tokio::spawn` now returns a `JoinHandle` to obtain the result of the task: Closes #887.
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/lib.rs6
-rw-r--r--tokio/src/runtime/basic_scheduler.rs8
-rw-r--r--tokio/src/runtime/global.rs52
-rw-r--r--tokio/src/runtime/mod.rs2
-rw-r--r--tokio/src/runtime/thread_pool/slice.rs9
-rw-r--r--tokio/src/runtime/thread_pool/spawner.rs8
-rw-r--r--tokio/src/runtime/thread_pool/tests/loom_pool.rs20
-rw-r--r--tokio/src/task/mod.rs6
-rw-r--r--tokio/src/task/raw.rs1
-rw-r--r--tokio/src/task/spawn.rs53
-rw-r--r--tokio/src/task/state.rs1
-rw-r--r--tokio/tests/rt_common.rs27
12 files changed, 112 insertions, 81 deletions
diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs
index 4be056ec..cfb90549 100644
--- a/tokio/src/lib.rs
+++ b/tokio/src/lib.rs
@@ -121,6 +121,8 @@ pub mod sync;
#[cfg(feature = "rt-core")]
pub mod task;
+#[cfg(feature = "rt-core")]
+pub use crate::task::spawn;
#[cfg(feature = "time")]
pub mod time;
@@ -128,10 +130,6 @@ pub mod time;
#[cfg(feature = "rt-full")]
mod util;
-#[doc(inline)]
-#[cfg(feature = "rt-core")]
-pub use crate::runtime::spawn;
-
#[cfg(not(test))] // Work around for rust-lang/rust#62127
#[cfg(feature = "macros")]
#[doc(inline)]
diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs
index a4991c1e..cb99cf18 100644
--- a/tokio/src/runtime/basic_scheduler.rs
+++ b/tokio/src/runtime/basic_scheduler.rs
@@ -225,12 +225,14 @@ impl SchedulerPriv {
///
/// Must be called from the same thread that holds the `BasicScheduler`
/// value.
- pub(super) unsafe fn spawn_background<F>(&self, future: F)
+ pub(super) unsafe fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
- F: Future<Output = ()> + Send + 'static,
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
{
- let task = task::background(future);
+ let (task, handle) = task::joinable(future);
self.schedule_local(task);
+ handle
}
unsafe fn schedule_local(&self, task: Task<Self>) {
diff --git a/tokio/src/runtime/global.rs b/tokio/src/runtime/global.rs
index 8a2641b9..146b678c 100644
--- a/tokio/src/runtime/global.rs
+++ b/tokio/src/runtime/global.rs
@@ -1,4 +1,5 @@
use crate::runtime::basic_scheduler;
+use crate::task::JoinHandle;
#[cfg(feature = "rt-full")]
use crate::runtime::thread_pool;
@@ -27,64 +28,23 @@ thread_local! {
// ===== global spawn fns =====
/// Spawns a future on the default executor.
-///
-/// In order for a future to do work, it must be spawned on an executor. The
-/// `spawn` function is the easiest way to do this. It spawns a future on the
-/// [default executor] for the current execution context (tracked using a
-/// thread-local variable).
-///
-/// The default executor is **usually** a thread pool.
-///
-/// # Examples
-///
-/// In this example, a server is started and `spawn` is used to start a new task
-/// that processes each received connection.
-///
-/// ```
-/// use tokio::net::TcpListener;
-///
-/// # async fn process<T>(_t: T) {}
-/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
-/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
-///
-/// loop {
-/// let (socket, _) = listener.accept().await?;
-///
-/// tokio::spawn(async move {
-/// // Process each socket concurrently.
-/// process(socket).await
-/// });
-/// }
-/// # }
-/// ```
-///
-/// [default executor]: struct.DefaultExecutor.html
-///
-/// # Panics
-///
-/// This function will panic if the default executor is not set or if spawning
-/// onto the default executor returns an error. To avoid the panic, use
-/// [`DefaultExecutor`].
-///
-/// [`DefaultExecutor`]: struct.DefaultExecutor.html
-pub fn spawn<T>(future: T)
+pub(crate) fn spawn<T>(future: T) -> JoinHandle<T::Output>
where
- T: Future<Output = ()> + Send + 'static,
+ T: Future + Send + 'static,
+ T::Output: Send + 'static,
{
EXECUTOR.with(|current_executor| match current_executor.get() {
#[cfg(feature = "rt-full")]
State::ThreadPool(thread_pool_ptr) => {
let thread_pool = unsafe { &*thread_pool_ptr };
- thread_pool.spawn_background(future);
+ thread_pool.spawn(future)
}
State::Basic(basic_scheduler_ptr) => {
let basic_scheduler = unsafe { &*basic_scheduler_ptr };
// Safety: The `BasicScheduler` value set the thread-local (same
// thread).
- unsafe {
- basic_scheduler.spawn_background(future);
- }
+ unsafe { basic_scheduler.spawn(future) }
}
State::Empty => {
// Explicit drop of `future` silences the warning that `future` is
diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs
index b2a5ba50..8f66b725 100644
--- a/tokio/src/runtime/mod.rs
+++ b/tokio/src/runtime/mod.rs
@@ -149,7 +149,7 @@ use self::enter::enter;
#[cfg(feature = "rt-core")]
mod global;
#[cfg(feature = "rt-core")]
-pub use self::global::spawn;
+pub(crate) use self::global::spawn;
mod handle;
pub use self::handle::Handle;
diff --git a/tokio/src/runtime/thread_pool/slice.rs b/tokio/src/runtime/thread_pool/slice.rs
index 1a0bd381..4b3ef996 100644
--- a/tokio/src/runtime/thread_pool/slice.rs
+++ b/tokio/src/runtime/thread_pool/slice.rs
@@ -95,15 +95,6 @@ where
}
}
- pub(crate) fn spawn_background<F>(&self, future: F)
- where
- F: Future + Send + 'static,
- F::Output: Send + 'static,
- {
- let task = task::background(future);
- self.schedule(task);
- }
-
pub(crate) fn schedule(&self, task: Task<Shared<P>>) {
current::get(|current_worker| match current_worker.as_member(self) {
Some(worker) => {
diff --git a/tokio/src/runtime/thread_pool/spawner.rs b/tokio/src/runtime/thread_pool/spawner.rs
index b7031c43..e2975313 100644
--- a/tokio/src/runtime/thread_pool/spawner.rs
+++ b/tokio/src/runtime/thread_pool/spawner.rs
@@ -37,14 +37,6 @@ impl Spawner {
self.workers.spawn_typed(future)
}
- /// Spawn a task in the background
- pub(crate) fn spawn_background<F>(&self, future: F)
- where
- F: Future<Output = ()> + Send + 'static,
- {
- self.workers.spawn_background(future);
- }
-
/// Reference to the worker set. Used by `ThreadPool` to initiate shutdown.
pub(super) fn workers(&self) -> &slice::Set<Box<dyn Unpark>> {
&*self.workers
diff --git a/tokio/src/runtime/thread_pool/tests/loom_pool.rs b/tokio/src/runtime/thread_pool/tests/loom_pool.rs
index 065d515e..7b8becf2 100644
--- a/tokio/src/runtime/thread_pool/tests/loom_pool.rs
+++ b/tokio/src/runtime/thread_pool/tests/loom_pool.rs
@@ -150,18 +150,22 @@ fn pool_shutdown() {
#[test]
fn complete_block_on_under_load() {
+ use futures::FutureExt;
+
loom::model(|| {
let pool = mk_pool(2);
- pool.block_on(async {
- // Spin hard
- crate::spawn(async {
- for _ in 0..2 {
- yield_once().await;
- }
- });
+ pool.block_on({
+ futures::future::lazy(|_| ()).then(|_| {
+ // Spin hard
+ crate::spawn(async {
+ for _ in 0..2 {
+ yield_once().await;
+ }
+ });
- gated2(true).await
+ gated2(true)
+ })
});
});
}
diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs
index a0175456..709be16b 100644
--- a/tokio/src/task/mod.rs
+++ b/tokio/src/task/mod.rs
@@ -21,6 +21,11 @@ pub(crate) use self::list::OwnedList;
mod raw;
use self::raw::RawTask;
+#[cfg(feature = "rt-core")]
+mod spawn;
+#[cfg(feature = "rt-core")]
+pub use spawn::spawn;
+
mod stack;
pub(crate) use self::stack::TransferStack;
@@ -70,6 +75,7 @@ pub(crate) trait Schedule: Send + Sync + Sized + 'static {
}
/// Create a new task without an associated join handle
+#[cfg(feature = "rt-full")]
pub(crate) fn background<T, S>(task: T) -> Task<S>
where
T: Future + Send + 'static,
diff --git a/tokio/src/task/raw.rs b/tokio/src/task/raw.rs
index d6542a16..899e5aa3 100644
--- a/tokio/src/task/raw.rs
+++ b/tokio/src/task/raw.rs
@@ -55,6 +55,7 @@ pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
}
impl RawTask {
+ #[cfg(feature = "rt-full")]
pub(super) fn new_background<T, S>(task: T) -> RawTask
where
T: Future + Send + 'static,
diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs
new file mode 100644
index 00000000..6fdff651
--- /dev/null
+++ b/tokio/src/task/spawn.rs
@@ -0,0 +1,53 @@
+use crate::runtime;
+use crate::task::JoinHandle;
+
+use std::future::Future;
+
+/// Spawns a new asynchronous task, returning a
+/// [`JoinHandle`](super::JoinHandle)] for it.
+///
+/// Spawning a task enables the task to execute concurrently to other tasks. The
+/// spawned task may execute on the current thread, or it may be sent to a
+/// different thread to be executed. The specifics depend on the current
+/// [`Runtime`](crate::runtime::Runtime) configuration.
+///
+/// # Examples
+///
+/// In this example, a server is started and `spawn` is used to start a new task
+/// that processes each received connection.
+///
+/// ```no_run
+/// use tokio::net::{TcpListener, TcpStream};
+///
+/// use std::io;
+///
+/// async fn process(socket: TcpStream) {
+/// // ...
+/// # drop(socket);
+/// }
+///
+/// #[tokio::main]
+/// async fn main() -> io::Result<()> {
+/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
+///
+/// loop {
+/// let (socket, _) = listener.accept().await?;
+///
+/// tokio::spawn(async move {
+/// // Process each socket concurrently.
+/// process(socket).await
+/// });
+/// }
+/// }
+/// ```
+///
+/// # Panics
+///
+/// Panics if called from **outside** of the Tokio runtime.
+pub fn spawn<T>(task: T) -> JoinHandle<T::Output>
+where
+ T: Future + Send + 'static,
+ T::Output: Send + 'static,
+{
+ runtime::spawn(task)
+}
diff --git a/tokio/src/task/state.rs b/tokio/src/task/state.rs
index 3adfea91..e10f0601 100644
--- a/tokio/src/task/state.rs
+++ b/tokio/src/task/state.rs
@@ -58,6 +58,7 @@ const INITIAL_STATE: usize = NOTIFIED;
/// unambiguous modification order.
impl State {
/// Starts with a ref count of 1
+ #[cfg(feature = "rt-full")]
pub(super) fn new_background() -> State {
State {
val: AtomicUsize::new(INITIAL_STATE),
diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs
index 6e8e58fb..81fe6801 100644
--- a/tokio/tests/rt_common.rs
+++ b/tokio/tests/rt_common.rs
@@ -80,7 +80,7 @@ rt_test! {
}
#[test]
- fn spawn_one() {
+ fn spawn_one_bg() {
let mut rt = rt();
let out = rt.block_on(async {
@@ -97,6 +97,29 @@ rt_test! {
}
#[test]
+ fn spawn_one_join() {
+ let mut rt = rt();
+
+ let out = rt.block_on(async {
+ let (tx, rx) = oneshot::channel();
+
+ let handle = tokio::spawn(async move {
+ tx.send("ZOMG").unwrap();
+ "DONE"
+ });
+
+ let msg = assert_ok!(rx.await);
+
+ let out = assert_ok!(handle.await);
+ assert_eq!(out, "DONE");
+
+ msg
+ });
+
+ assert_eq!(out, "ZOMG");
+ }
+
+ #[test]
fn spawn_two() {
let mut rt = rt();
@@ -180,7 +203,7 @@ rt_test! {
tokio::spawn(poll_fn(move |_| {
assert_eq!(2, Arc::strong_count(&cnt));
- Poll::Pending
+ Poll::<()>::Pending
}));
});