diff options
-rw-r--r-- | tokio/Cargo.toml | 2 | ||||
-rw-r--r-- | tokio/src/macros/cfg.rs | 25 | ||||
-rw-r--r-- | tokio/src/runtime/basic_scheduler.rs | 4 | ||||
-rw-r--r-- | tokio/src/runtime/blocking/schedule.rs | 4 | ||||
-rw-r--r-- | tokio/src/runtime/thread_pool/shared.rs | 6 | ||||
-rw-r--r-- | tokio/src/sync/mod.rs | 2 | ||||
-rw-r--r-- | tokio/src/task/local.rs | 727 | ||||
-rw-r--r-- | tokio/src/task/mod.rs | 38 | ||||
-rw-r--r-- | tokio/src/task/raw.rs | 18 | ||||
-rw-r--r-- | tokio/src/tests/loom_schedule.rs | 4 | ||||
-rw-r--r-- | tokio/src/tests/mock_schedule.rs | 7 | ||||
-rw-r--r-- | tokio/tests/rt_common.rs | 63 |
12 files changed, 886 insertions, 14 deletions
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index ba6d1d7d..eab9d8dd 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -39,6 +39,7 @@ full = [ "net", "process", "rt-core", + "rt-util", "rt-threaded", "signal", "stream", @@ -67,6 +68,7 @@ process = [ ] # Includes basic task execution capabilities rt-core = [] +rt-util = [] rt-threaded = [ "num_cpus", "rt-core", diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 959eed22..4aee3b7f 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -52,6 +52,21 @@ macro_rules! cfg_not_blocking_impl { } } +/// Enable internal `AtomicWaker` impl +macro_rules! cfg_atomic_waker_impl { + ($($item:item)*) => { + $( + #[cfg(any( + feature = "io-driver", + feature = "time", + all(feature = "rt-core", feature = "rt-util") + ))] + #[cfg(not(loom))] + $item + )* + } +} + macro_rules! cfg_dns { ($($item:item)*) => { $( @@ -220,6 +235,16 @@ macro_rules! cfg_rt_threaded { } } +macro_rules! cfg_rt_util { + ($($item:item)*) => { + $( + #[cfg(feature = "rt-util")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt-util")))] + $item + )* + } +} + macro_rules! cfg_not_rt_threaded { ($($item:item)*) => { $( #[cfg(not(feature = "rt-threaded"))] $item )* diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index d5cf7c76..48c8ad9e 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -1,5 +1,5 @@ use crate::park::{Park, Unpark}; -use crate::task::{self, JoinHandle, Schedule, Task}; +use crate::task::{self, JoinHandle, Schedule, ScheduleSendOnly, Task}; use std::cell::UnsafeCell; use std::collections::VecDeque; @@ -304,6 +304,8 @@ impl Schedule for SchedulerPriv { } } +impl ScheduleSendOnly for SchedulerPriv {} + impl<P> Drop for BasicScheduler<P> where P: Park, diff --git a/tokio/src/runtime/blocking/schedule.rs b/tokio/src/runtime/blocking/schedule.rs index 461b12c3..5d2cd5f5 100644 --- a/tokio/src/runtime/blocking/schedule.rs +++ b/tokio/src/runtime/blocking/schedule.rs @@ -1,4 +1,4 @@ -use crate::task::{Schedule, Task}; +use crate::task::{Schedule, ScheduleSendOnly, Task}; /// `task::Schedule` implementation that does nothing. This is unique to the /// blocking scheduler as tasks scheduled are not really futures but blocking @@ -16,3 +16,5 @@ impl Schedule for NoopSchedule { unreachable!(); } } + +impl ScheduleSendOnly for NoopSchedule {} diff --git a/tokio/src/runtime/thread_pool/shared.rs b/tokio/src/runtime/thread_pool/shared.rs index 86c784ad..f61bafab 100644 --- a/tokio/src/runtime/thread_pool/shared.rs +++ b/tokio/src/runtime/thread_pool/shared.rs @@ -1,7 +1,7 @@ use crate::park::Unpark; -use crate::runtime::Unparker; use crate::runtime::thread_pool::slice; -use crate::task::{self, Schedule, Task}; +use crate::runtime::Unparker; +use crate::task::{self, Schedule, ScheduleSendOnly, Task}; use std::ptr; @@ -90,3 +90,5 @@ impl Schedule for Shared { Self::schedule(self, task); } } + +impl ScheduleSendOnly for Shared {} diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 0db703bb..943f8f11 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -35,7 +35,7 @@ cfg_sync! { } cfg_not_sync! { - cfg_resource_drivers! { + cfg_atomic_waker_impl! { mod task; pub(crate) use task::AtomicWaker; } diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs new file mode 100644 index 00000000..cb62bd5e --- /dev/null +++ b/tokio/src/task/local.rs @@ -0,0 +1,727 @@ +//! Runs `!Send` futures on the current thread. +use crate::sync::AtomicWaker; +use crate::task::{self, JoinHandle, Schedule, Task, TransferStack}; + +use std::cell::{Cell, UnsafeCell}; +use std::collections::VecDeque; +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::ptr::{self, NonNull}; +use std::rc::Rc; +use std::sync::Mutex; +use std::task::{Context, Poll}; + +use pin_project_lite::pin_project; +cfg_rt_util! { + /// A set of tasks which are executed on the same thread. + /// + /// In some cases, it is necessary to run one or more futures that do not + /// implement [`Send`] and thus are unsafe to send between threads. In these + /// cases, a [local task set] may be used to schedule one or more `!Send` + /// futures to run together on the same thread. + /// + /// For example, the following code will not compile: + /// + /// ```rust,compile_fail + /// # use tokio::runtime::Runtime; + /// use std::rc::Rc; + /// + /// // `Rc` does not implement `Send`, and thus may not be sent between + /// // threads safely. + /// let unsend_data = Rc::new("my unsend data..."); + /// + /// let mut rt = Runtime::new().unwrap(); + /// + /// rt.block_on(async move { + /// let unsend_data = unsend_data.clone(); + /// // Because the `async` block here moves `unsend_data`, the future is `!Send`. + /// // Since `tokio::spawn` requires the spawned future to implement `Send`, this + /// // will not compile. + /// tokio::spawn(async move { + /// println!("{}", unsend_data); + /// // ... + /// }).await.unwrap(); + /// }); + /// ``` + /// In order to spawn `!Send` futures, we can use a local task set to + /// schedule them on the thread calling [`Runtime::block_on`]. When running + /// inside of the local task set, we can use [`task::spawn_local`], which can + /// spawn `!Send` futures. For example: + /// + /// ```rust + /// # use tokio::runtime::Runtime; + /// use std::rc::Rc; + /// use tokio::task; + /// + /// let unsend_data = Rc::new("my unsend data..."); + /// + /// let mut rt = Runtime::new().unwrap(); + /// // Construct a local task set that can run `!Send` futures. + /// let local = task::LocalSet::new(); + /// + /// // Run the local task group. + /// local.block_on(&mut rt, async move { + /// let unsend_data = unsend_data.clone(); + /// // `spawn_local` ensures that the future is spawned on the local + /// // task group. + /// task::spawn_local(async move { + /// println!("{}", unsend_data); + /// // ... + /// }).await.unwrap(); + /// }); + /// ``` + /// + /// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html + /// [local task set]: struct.LocalSet.html + /// [`Runtime::block_on`]: ../struct.Runtime.html#method.block_on + /// [`task::spawn_local`]: fn.spawn.html + #[derive(Debug)] + pub struct LocalSet { + scheduler: Rc<Scheduler>, + } +} +struct Scheduler { + /// List of all active tasks spawned onto this executor. + /// + /// # Safety + /// + /// Must only be accessed from the primary thread + tasks: UnsafeCell<task::OwnedList<Scheduler>>, + + /// Local run local_queue. + /// + /// Tasks notified from the current thread are pushed into this queue. + /// + /// # Safety + /// + /// References should not be handed out. Only call `push` / `pop` functions. + /// Only call from the owning thread. + local_queue: UnsafeCell<VecDeque<Task<Scheduler>>>, + + tick: Cell<u8>, + + /// Remote run queue. + /// + /// Tasks notified from another thread are pushed into this queue. + remote_queue: Mutex<VecDeque<Task<Scheduler>>>, + + /// Tasks pending drop + pending_drop: TransferStack<Self>, + + /// Used to notify the `LocalFuture` when a task in the local task set is + /// notified. + waker: AtomicWaker, +} + +pin_project! { + struct LocalFuture<F> { + scheduler: Rc<Scheduler>, + #[pin] + future: F, + } +} + +thread_local! { + static CURRENT_TASK_SET: Cell<Option<NonNull<Scheduler>>> = Cell::new(None); +} + +cfg_rt_util! { + /// Spawns a `!Send` future on the local task set. + /// + /// The spawned future will be run on the same thread that called `spawn_local.` + /// This may only be called from the context of a local task set. + /// + /// # Panics + /// + /// - This function panics if called outside of a local task set. + /// + /// # Examples + /// + /// ```rust + /// # use tokio::runtime::Runtime; + /// use std::rc::Rc; + /// use tokio::task; + /// + /// let unsend_data = Rc::new("my unsend data..."); + /// + /// let mut rt = Runtime::new().unwrap(); + /// let local = task::LocalSet::new(); + /// + /// // Run the local task set. + /// local.block_on(&mut rt, async move { + /// let unsend_data = unsend_data.clone(); + /// task::spawn_local(async move { + /// println!("{}", unsend_data); + /// // ... + /// }).await.unwrap(); + /// }); + /// ``` + pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output> + where + F: Future + 'static, + F::Output: 'static, + { + CURRENT_TASK_SET.with(|current| { + let current = current + .get() + .expect("`spawn_local` called from outside of a local::LocalSet!"); + unsafe { + let (task, handle) = task::joinable_local(future); + current.as_ref().schedule_local(task); + handle + } + }) + } +} + +/// Max number of tasks to poll per tick. +const MAX_TASKS_PER_TICK: usize = 61; + +/// How often to check the remote queue first +const CHECK_REMOTE_INTERVAL: u8 = 13; + +impl LocalSet { + /// Returns a new local task set. + pub fn new() -> Self { + Self { + scheduler: Rc::new(Scheduler::new()), + } + } + + /// Spawns a `!Send` task onto the local task set. + /// + /// This task is guaranteed to be run on the current thread. + /// + /// Unlike the free function [`spawn_local`], this method may be used to + /// spawn_local local tasks when the task set is _not_ running. For example: + /// ```rust + /// # use tokio::runtime::Runtime; + /// use tokio::task; + /// + /// let mut rt = Runtime::new().unwrap(); + /// let local = task::LocalSet::new(); + /// + /// // Spawn a future on the local set. This future will be run when + /// // we call `block_on` to drive the task set. + /// local.spawn_local(async { + /// // ... + /// }); + /// + /// // Run the local task set. + /// local.block_on(&mut rt, async move { + /// // ... + /// }); + /// + /// // When `block_on` finishes, we can spawn_local _more_ futures, which will + /// // run in subsequent calls to `block_on`. + /// local.spawn_local(async { + /// // ... + /// }); + /// + /// local.block_on(&mut rt, async move { + /// // ... + /// }); + /// ``` + /// [`spawn_local`]: fn.spawn_local.html + pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + 'static, + F::Output: 'static, + { + let (task, handle) = task::joinable_local(future); + unsafe { + // This is safe: since `LocalSet` is not Send or Sync, this is + // always being called from the local thread. + self.scheduler.schedule_local(task); + } + handle + } + + /// Run a future to completion on the provided runtime, driving any local + /// futures spawned on this task set on the current thread. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers which + /// the future spawns internally will be executed on the runtime. The future + /// may also call [`spawn_local`] to spawn_local additional local futures on the + /// current thread. + /// + /// This method should not be called from an asynchronous context. + /// + /// # Panics + /// + /// This function panics if the executor is at capacity, if the provided + /// future panics, or if called within an asynchronous execution context. + /// + /// # Notes + /// + /// Since this function internally calls [`Runtime::block_on`], and drives + /// futures in the local task set inside that call to `block_on`, the local + /// futures may not use [in-place blocking]. If a blocking call needs to be + /// issued from a local task, the [`spawn_blocking`] API may be used instead. + /// + /// For example, this will panic: + /// ```should_panic + /// use tokio::runtime::Runtime; + /// use tokio::task; + /// + /// let mut rt = Runtime::new().unwrap(); + /// let local = task::LocalSet::new(); + /// local.block_on(&mut rt, async { + /// let join = task::spawn_local(async { + /// let blocking_result = task::block_in_place(|| { + /// // ... + /// }); + /// // ... + /// }); + /// join.await.unwrap(); + /// }) + /// ``` + /// This, however, will not panic: + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::task; + /// + /// let mut rt = Runtime::new().unwrap(); + /// let local = task::LocalSet::new(); + /// local.block_on(&mut rt, async { + /// let join = task::spawn_local(async { + /// let blocking_result = task::spawn_blocking(|| { + /// // ... + /// }).await; + /// // ... + /// }); + /// join.await.unwrap(); + /// }) + /// ``` + /// + /// [`spawn_local`]: fn.spawn_local.html + /// [`Runtime::block_on`]: ../struct.Runtime.html#method.block_on + /// [in-place blocking]: ../blocking/fn.in_place.html + /// [`spawn_blocking`]: ../blocking/fn.spawn_blocking.html + pub fn block_on<F>(&self, rt: &mut crate::runtime::Runtime, future: F) -> F::Output + where + F: Future + 'static, + F::Output: 'static, + { + let scheduler = self.scheduler.clone(); + self.scheduler + .with(move || rt.block_on(LocalFuture { scheduler, future })) + } +} + +impl Default for LocalSet { + fn default() -> Self { + Self::new() + } +} + +impl<F: Future> Future for LocalFuture<F> { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = self.project(); + let scheduler = this.scheduler; + let mut future = this.future; + scheduler.waker.register_by_ref(cx.waker()); + + if let Poll::Ready(output) = future.as_mut().poll(cx) { + return Poll::Ready(output); + } + + scheduler.tick(); + Poll::Pending + } +} + +// === impl Scheduler === + +impl Schedule for Scheduler { + fn bind(&self, task: &Task<Self>) { + assert!(self.is_current()); + unsafe { + (*self.tasks.get()).insert(task); + } + } + + fn release(&self, _: Task<Self>) { + unreachable!("tasks should only be completed locally") + } + + fn release_local(&self, task: &Task<Self>) { + debug_assert!(self.is_current()); + unsafe { + (*self.tasks.get()).remove(task); + } + } + + fn schedule(&self, task: Task<Self>) { + if self.is_current() { + unsafe { + self.schedule_local(task); + } + } else { + self.remote_queue.lock().unwrap().push_back(task); + + self.waker.wake(); + } + } +} + +impl Scheduler { + fn new() -> Self { + Self { + tasks: UnsafeCell::new(task::OwnedList::new()), + local_queue: UnsafeCell::new(VecDeque::with_capacity(64)), + tick: Cell::new(0), + pending_drop: TransferStack::new(), + remote_queue: Mutex::new(VecDeque::with_capacity(64)), + waker: AtomicWaker::new(), + } + } + + fn with<F>(&self, f: impl FnOnce() -> F) -> F { + struct Entered<'a> { + current: &'a Cell<Option<NonNull<Scheduler>>>, + } + + impl<'a> Drop for Entered<'a> { + fn drop(&mut self) { + self.current.set(None); + } + } + + CURRENT_TASK_SET.with(|current| { + let prev = current.replace(Some(NonNull::from(self))); + assert!(prev.is_none(), "nested call to local::Scheduler::with"); + let _entered = Entered { current }; + f() + }) + } + + unsafe fn schedule_local(&self, task: Task<Self>) { + (*self.local_queue.get()).push_back(task); + } + + fn is_current(&self) -> bool { + CURRENT_TASK_SET + .try_with(|current| { + current + .get() + .iter() + .any(|current| ptr::eq(current.as_ptr(), self as *const _)) + }) + .unwrap_or(false) + } + + fn next_task(&self, tick: u8) -> Option<Task<Self>> { + if 0 == tick % CHECK_REMOTE_INTERVAL { + self.next_remote_task().or_else(|| self.next_local_task()) + } else { + self.next_local_task().or_else(|| self.next_remote_task()) + } + } + + fn next_local_task(&self) -> Option<Task<Self>> { + unsafe { (*self.local_queue.get()).pop_front() } + } + + fn next_remote_task(&self) -> Option<Task<Self>> { + // there is no semantic information in the `PoisonError`, and it + // doesn't implement `Debug`, but clippy thinks that it's bad to + // match all errors here... + #[allow(clippy::match_wild_err_arm)] + let mut lock = match self.remote_queue.lock() { + // If the lock is poisoned, but the thread is already panicking, + // avoid a double panic. This is necessary since `next_task` (which + // calls `next_remote_task`) can be called in the `Drop` impl. + Err(_) if std::thread::panicking() => return None, + Err(_) => panic!("mutex poisoned"), + Ok(lock) => lock, + }; + lock.pop_front() + } + + fn tick(&self) { + assert!(self.is_current()); + for _ in 0..MAX_TASKS_PER_TICK { + let tick = self.tick.get().wrapping_add(1); + self.tick.set(tick); + let task = match self.next_task(tick) { + Some(task) => task, + None => return, + }; + + if let Some(task) = task.run(&mut || Some(self.into())) { + unsafe { + // we are on the local thread, so this is okay. + self.schedule_local(task); + } + } + } + } + + fn drain_pending_drop(&self) { + for task in self.pending_drop.drain() { + unsafe { + (*self.tasks.get()).remove(&task); + } + drop(task); + } + } +} + +impl fmt::Debug for Scheduler { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Scheduler { .. }").finish() + } +} + +impl Drop for Scheduler { + fn drop(&mut self) { + // Drain all local tasks + while let Some(task) = self.next_local_task() { + task.shutdown(); + } + + // Release owned tasks + unsafe { + (*self.tasks.get()).shutdown(); + } + + self.drain_pending_drop(); + + // Wait until all tasks have been released. + // XXX: this is a busy loop, but we don't really have any way to park + // the thread here? + while unsafe { !(*self.tasks.get()).is_empty() } { + self.drain_pending_drop(); + } + } +} + +#[cfg(all(test, not(loom)))] +mod tests { + use super::*; + use crate::{runtime, task}; + + #[test] + fn local_current_thread() { + let mut rt = runtime::Builder::new().basic_scheduler().build().unwrap(); + LocalSet::new().block_on(&mut rt, async { + spawn_local(async {}).await.unwrap(); + }); + } + + #[test] + fn local_threadpool() { + thread_local! { + static ON_RT_THREAD: Cell<bool> = Cell::new(false); + } + + ON_RT_THREAD.with(|cell| cell.set(true)); + + let mut rt = runtime::Runtime::new().unwrap(); + LocalSet::new().block_on(&mut rt, async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + spawn_local(async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + }) + .await + .unwrap(); + }); + } + + #[test] + fn local_threadpool_timer() { + // This test ensures that runtime services like the timer are properly + // set for the local task set. + use std::time::Duration; + thread_local! { + static ON_RT_THREAD: Cell<bool> = Cell::new(false); + } + + ON_RT_THREAD.with(|cell| cell.set(true)); + + let mut rt = runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .build() + .unwrap(); + LocalSet::new().block_on(&mut rt, async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + let join = spawn_local(async move { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + crate::time::delay_for(Duration::from_millis(10)).await; + assert!(ON_RT_THREAD.with(|cell| cell.get())); + }); + join.await.unwrap(); + }); + } + + #[test] + // This will panic, since the thread that calls `block_on` cannot use + // in-place blocking inside of `block_on`. + #[should_panic] + fn local_threadpool_blocking_in_place() { + thread_local! { + static ON_RT_THREAD: Cell<bool> = Cell::new(false); + } + + ON_RT_THREAD.with(|cell| cell.set(true)); + + let mut rt = runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .build() + .unwrap(); + LocalSet::new().block_on(&mut rt, async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + let join = spawn_local(async move { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + task::block_in_place(|| {}); + assert!(ON_RT_THREAD.with(|cell| cell.get())); + }); + join.await.unwrap(); + }); + } + + #[test] + fn local_threadpool_blocking_run() { + thread_local! { + static ON_RT_THREAD: Cell<bool> = Cell::new(false); + } + + ON_RT_THREAD.with(|cell| cell.set(true)); + + let mut rt = runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .build() + .unwrap(); + LocalSet::new().block_on(&mut rt, async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + let join = spawn_local(async move { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + task::spawn_blocking(|| { + assert!( + !ON_RT_THREAD.with(|cell| cell.get()), + "blocking must not run on the local task set's thread" + ); + }) + .await + .unwrap(); + assert!(ON_RT_THREAD.with(|cell| cell.get())); + }); + join.await.unwrap(); + }); + } + + #[test] + fn all_spawns_are_local() { + use futures::future; + thread_local! { + static ON_RT_THREAD: Cell<bool> = Cell::new(false); + } + + ON_RT_THREAD.with(|cell| cell.set(true)); + + let mut rt = runtime::Builder::new() + .threaded_scheduler() + .build() + .unwrap(); + LocalSet::new().block_on(&mut rt, async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + let handles = (0..128) + .map(|_| { + spawn_local(async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + }) + }) + .collect::<Vec<_>>(); + for joined in future::join_all(handles).await { + joined.unwrap(); + } + }) + } + + #[test] + fn nested_spawn_is_local() { + thread_local! { + static ON_RT_THREAD: Cell<bool> = Cell::new(false); + } + + ON_RT_THREAD.with(|cell| cell.set(true)); + + let mut rt = runtime::Builder::new() + .threaded_scheduler() + .build() + .unwrap(); + LocalSet::new().block_on(&mut rt, async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + spawn_local(async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + spawn_local(async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + spawn_local(async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + spawn_local(async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + }) + .await + .unwrap(); + }) + .await + .unwrap(); + }) + .await + .unwrap(); + }) + .await + .unwrap(); + }) + } + #[test] + fn join_local_future_elsewhere() { + thread_local! { + static ON_RT_THREAD: Cell<bool> = Cell::new(false); + } + + ON_RT_THREAD.with(|cell| cell.set(true)); + + let mut rt = runtime::Builder::new() + .threaded_scheduler() + .build() + .unwrap(); + let local = LocalSet::new(); + local.block_on(&mut rt, async move { + let (tx, rx) = crate::sync::oneshot::channel(); + let join = spawn_local(async move { + println!("hello world running..."); + assert!( + ON_RT_THREAD.with(|cell| cell.get()), + "local task must run on local thread, no matter where it is awaited" + ); + rx.await.unwrap(); + + println!("hello world task done"); + "hello world" + }); + let join2 = task::spawn(async move { + assert!( + !ON_RT_THREAD.with(|cell| cell.get()), + "spawned task should be on a worker" + ); + + tx.send(()).expect("task shouldn't have ended yet"); + println!("waking up hello world..."); + + join.await.expect("task should complete successfully"); + + println!("hello world task joined"); + }); + join2.await.unwrap() + }); + } +} diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index dc98b6d4..f84659e0 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -232,6 +232,11 @@ cfg_rt_core! { pub use self::join::JoinHandle; } +cfg_rt_util! { + mod local; + pub use local::{spawn_local, LocalSet}; +} + mod list; pub(crate) use self::list::OwnedList; @@ -269,12 +274,12 @@ pub(crate) struct Task<S: 'static> { _p: PhantomData<S>, } -unsafe impl<S: Send + Sync + 'static> Send for Task<S> {} +unsafe impl<S: ScheduleSendOnly + 'static> Send for Task<S> {} /// Task result sent back pub(crate) type Result<T> = std::result::Result<T, JoinError>; -pub(crate) trait Schedule: Send + Sync + Sized + 'static { +pub(crate) trait Schedule: Sized + 'static { /// Bind a task to the executor. /// /// Guaranteed to be called from the thread that called `poll` on the task. @@ -291,11 +296,18 @@ pub(crate) trait Schedule: Send + Sync + Sized + 'static { fn schedule(&self, task: Task<Self>); } +/// Marker trait indicating that a scheduler can only schedule tasks which +/// implement `Send`. +/// +/// Schedulers that implement this trait may not schedule `!Send` futures. If +/// trait is implemented, the corresponding `Task` type will implement `Send`. +pub(crate) trait ScheduleSendOnly: Schedule + Send + Sync {} + /// Create a new task with an associated join handle pub(crate) fn joinable<T, S>(task: T) -> (Task<S>, JoinHandle<T::Output>) where T: Future + Send + 'static, - S: Schedule, + S: ScheduleSendOnly, { let raw = RawTask::new_joinable::<_, S>(task); @@ -309,6 +321,26 @@ where (task, join) } +cfg_rt_util! { + /// Create a new `!Send` task with an associated join handle + pub(crate) fn joinable_local<T, S>(task: T) -> (Task<S>, JoinHandle<T::Output>) + where + T: Future + 'static, + S: Schedule, + { + let raw = RawTask::new_joinable_local::<_, S>(task); + + let task = Task { + raw, + _p: PhantomData, + }; + + let join = JoinHandle::new(raw); + + (task, join) + } +} + impl<S: 'static> Task<S> { pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> { Task { diff --git a/tokio/src/task/raw.rs b/tokio/src/task/raw.rs index 9d439f11..93bb79c8 100644 --- a/tokio/src/task/raw.rs +++ b/tokio/src/task/raw.rs @@ -1,7 +1,7 @@ use crate::loom::alloc::Track; use crate::task::Cell; use crate::task::Harness; -use crate::task::{Header, Schedule}; +use crate::task::{Header, Schedule, ScheduleSendOnly}; use crate::task::{Snapshot, State}; use std::future::Future; @@ -54,18 +54,30 @@ pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable { } } +cfg_rt_util! { + impl RawTask { + pub(super) fn new_joinable_local<T, S>(task: T) -> RawTask + where + T: Future + 'static, + S: Schedule, + { + RawTask::new::<_, S>(task, State::new_joinable()) + } + } +} + impl RawTask { pub(super) fn new_joinable<T, S>(task: T) -> RawTask where T: Future + Send + 'static, - S: Schedule, + S: ScheduleSendOnly, { RawTask::new::<_, S>(task, State::new_joinable()) } fn new<T, S>(task: T, state: State) -> RawTask where - T: Future + Send + 'static, + T: Future + 'static, S: Schedule, { let ptr = Box::into_raw(Cell::new::<S>(task, state)); diff --git a/tokio/src/tests/loom_schedule.rs b/tokio/src/tests/loom_schedule.rs index d3f93228..15ec903b 100644 --- a/tokio/src/tests/loom_schedule.rs +++ b/tokio/src/tests/loom_schedule.rs @@ -1,4 +1,4 @@ -use crate::task::{Schedule, Task}; +use crate::task::{Schedule, ScheduleSendOnly, Task}; use loom::sync::Notify; use std::collections::VecDeque; @@ -49,3 +49,5 @@ impl Schedule for LoomSchedule { self.notify.notify(); } } + +impl ScheduleSendOnly for LoomSchedule {} diff --git a/tokio/src/tests/mock_schedule.rs b/tokio/src/tests/mock_schedule.rs index 8c20d361..a9941564 100644 --- a/tokio/src/tests/mock_schedule.rs +++ b/tokio/src/tests/mock_schedule.rs @@ -1,6 +1,5 @@ #![allow(warnings)] - -use crate::task::{Header, Schedule, Task}; +use crate::task::{Header, Schedule, ScheduleSendOnly, Task}; use std::collections::VecDeque; use std::sync::Mutex; @@ -112,6 +111,8 @@ impl Schedule for Mock { } |