diff options
author | Jon Gjengset <jon@thesquareplanet.com> | 2019-10-30 11:58:49 -0400 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2019-10-30 08:58:49 -0700 |
commit | 109fd3086b97896c422d9565e7d1ddbe4b6f300b (patch) | |
tree | ae2b1cf8e5b2a90fdb677b25ca0b1294126bdc5e /tokio | |
parent | e3261440e56ebf576790e4c6f7f125a6c8759659 (diff) |
thread-pool: in-place blocking with new scheduler (#1681)
The initial new scheduler PR omitted in-place blocking
support. This patch brings it back.
Diffstat (limited to 'tokio')
-rw-r--r-- | tokio/src/executor/task/harness.rs | 10 | ||||
-rw-r--r-- | tokio/src/executor/task/mod.rs | 2 | ||||
-rw-r--r-- | tokio/src/executor/task/raw.rs | 9 | ||||
-rw-r--r-- | tokio/src/executor/task/tests/loom.rs | 16 | ||||
-rw-r--r-- | tokio/src/executor/task/tests/task.rs | 64 | ||||
-rw-r--r-- | tokio/src/executor/thread_pool/builder.rs | 15 | ||||
-rw-r--r-- | tokio/src/executor/thread_pool/current.rs | 4 | ||||
-rw-r--r-- | tokio/src/executor/thread_pool/mod.rs | 3 | ||||
-rw-r--r-- | tokio/src/executor/thread_pool/queue/inject.rs | 5 | ||||
-rw-r--r-- | tokio/src/executor/thread_pool/set.rs | 15 | ||||
-rw-r--r-- | tokio/src/executor/thread_pool/tests/loom_pool.rs | 56 | ||||
-rw-r--r-- | tokio/src/executor/thread_pool/tests/loom_queue.rs | 4 | ||||
-rw-r--r-- | tokio/src/executor/thread_pool/tests/queue.rs | 2 | ||||
-rw-r--r-- | tokio/src/executor/thread_pool/tests/worker.rs | 14 | ||||
-rw-r--r-- | tokio/src/executor/thread_pool/worker.rs | 314 | ||||
-rw-r--r-- | tokio/tests/thread_pool.rs | 48 |
16 files changed, 495 insertions, 86 deletions
diff --git a/tokio/src/executor/task/harness.rs b/tokio/src/executor/task/harness.rs index ef2978dc..e5355e4f 100644 --- a/tokio/src/executor/task/harness.rs +++ b/tokio/src/executor/task/harness.rs @@ -51,7 +51,7 @@ where /// Panics raised while polling the future are handled. /// /// Returns `true` if the task needs to be scheduled again - pub(super) fn poll(mut self, executor: NonNull<S>) -> bool { + pub(super) fn poll(mut self, executor: &mut dyn FnMut() -> Option<NonNull<S>>) -> bool { use std::panic; // Transition the task to the running state. @@ -81,6 +81,7 @@ where // own the task here. let task = ManuallyDrop::new(Task::from_raw(header.into())); // Call the scheduler's bind callback + let executor = executor().expect("first poll must happen from an executor"); executor.as_ref().bind(&task); header.executor.with_mut(|ptr| *ptr = Some(executor)); } @@ -393,7 +394,7 @@ where fn complete( mut self, - executor: NonNull<S>, + executor: &mut dyn FnMut() -> Option<NonNull<S>>, join_interest: bool, output: super::Result<T::Output>, ) { @@ -402,15 +403,16 @@ where self.core().store_output(output); } + let executor = executor(); let bound_executor = unsafe { self.header().executor.with(|ptr| *ptr) }; // Handle releasing the task. First, check if the current // executor is the one that is bound to the task: - if Some(executor) == bound_executor { + if executor.is_some() && executor == bound_executor { unsafe { // perform a local release let task = ManuallyDrop::new(self.to_task()); - executor.as_ref().release_local(&task); + executor.as_ref().unwrap().as_ref().release_local(&task); if self.transition_to_released(join_interest).is_final_ref() { self.dealloc(); diff --git a/tokio/src/executor/task/mod.rs b/tokio/src/executor/task/mod.rs index dfc6628e..3e4e500b 100644 --- a/tokio/src/executor/task/mod.rs +++ b/tokio/src/executor/task/mod.rs @@ -99,7 +99,7 @@ impl<S: 'static> Task<S> { impl<S: Schedule> Task<S> { /// Returns `self` when the task needs to be immediately re-scheduled - pub(crate) fn run(self, executor: NonNull<S>) -> Option<Self> { + pub(crate) fn run(self, executor: &mut dyn FnMut() -> Option<NonNull<S>>) -> Option<Self> { if unsafe { self.raw.poll(executor) } { Some(self) } else { diff --git a/tokio/src/executor/task/raw.rs b/tokio/src/executor/task/raw.rs index a9b048fc..d0ffac3a 100644 --- a/tokio/src/executor/task/raw.rs +++ b/tokio/src/executor/task/raw.rs @@ -15,7 +15,7 @@ pub(super) struct RawTask<S: 'static> { pub(super) struct Vtable<S: 'static> { /// Poll the future - pub(super) poll: unsafe fn(*mut (), NonNull<S>) -> bool, + pub(super) poll: unsafe fn(*mut (), &mut dyn FnMut() -> Option<NonNull<S>>) -> bool, /// The task handle has been dropped and the join waker needs to be dropped /// or the task struct needs to be deallocated @@ -101,7 +101,7 @@ impl<S> RawTask<S> { /// Safety: mutual exclusion is required to call this function. /// /// Returns `true` if the task needs to be scheduled again. - pub(super) unsafe fn poll(self, executor: NonNull<S>) -> bool { + pub(super) unsafe fn poll(self, executor: &mut dyn FnMut() -> Option<NonNull<S>>) -> bool { // Get the vtable without holding a ref to the meta struct. This is done // because a mutable reference to the task is passed into the poll fn. let vtable = self.header().vtable; @@ -150,7 +150,10 @@ impl<S: 'static> Clone for RawTask<S> { impl<S: 'static> Copy for RawTask<S> {} -unsafe fn poll<T: Future, S: Schedule>(ptr: *mut (), executor: NonNull<S>) -> bool { +unsafe fn poll<T: Future, S: Schedule>( + ptr: *mut (), + executor: &mut dyn FnMut() -> Option<NonNull<S>>, +) -> bool { let harness = Harness::<T, S>::from_raw(ptr); harness.poll(executor) } diff --git a/tokio/src/executor/task/tests/loom.rs b/tokio/src/executor/task/tests/loom.rs index 53539987..b377aeb9 100644 --- a/tokio/src/executor/task/tests/loom.rs +++ b/tokio/src/executor/task/tests/loom.rs @@ -15,7 +15,7 @@ fn create_drop_join_handle() { let (task, join_handle) = task::joinable(async { "hello" }); let schedule = LoomSchedule::new(); - let schedule = From::from(&schedule); + let schedule = &mut || Some(From::from(&schedule)); let th = thread::spawn(move || { drop(join_handle); @@ -37,7 +37,7 @@ fn poll_drop_handle_then_drop() { let (task, mut join_handle) = task::joinable(async { "hello" }); let schedule = LoomSchedule::new(); - let schedule = From::from(&schedule); + let schedule = &mut || Some(From::from(&schedule)); let th = thread::spawn(move || { block_on(poll_fn(|cx| { @@ -58,7 +58,7 @@ fn join_output() { let (task, join_handle) = task::joinable(async { "hello world" }); let schedule = LoomSchedule::new(); - let schedule = From::from(&schedule); + let schedule = &mut || Some(From::from(&schedule)); let th = thread::spawn(move || { let out = assert_ok!(block_on(join_handle)); @@ -115,12 +115,12 @@ fn release_remote() { // Join handle let th = join_one_task(join_handle); - let task = match task.run(From::from(&s1)) { + let task = match task.run(&mut || Some(From::from(&s1))) { Some(task) => task, None => s1.recv().expect("released!"), }; - assert_none!(task.run(From::from(&s2))); + assert_none!(task.run(&mut || Some(From::from(&s2)))); assert_none!(s1.recv()); assert_ok!(th.join().unwrap()); @@ -152,7 +152,7 @@ fn shutdown_from_list_after_poll() { // Join handle let th = join_two_tasks(join_handle); - match task.run(From::from(&s1)) { + match task.run(&mut || Some(From::from(&s1))) { Some(task) => { // always drain the list before calling shutdown on tasks list.shutdown(); @@ -184,7 +184,7 @@ fn shutdown_from_queue_after_poll() { // Join handle let th = join_two_tasks(join_handle); - let task = match task.run(From::from(&s1)) { + let task = match task.run(&mut || Some(From::from(&s1))) { Some(task) => task, None => assert_some!(s1.recv()), }; @@ -239,7 +239,7 @@ fn work(schedule: &LoomSchedule) { let mut task = Some(task); while let Some(t) = task.take() { - task = t.run(From::from(schedule)); + task = t.run(&mut || Some(From::from(schedule))); } } } diff --git a/tokio/src/executor/task/tests/task.rs b/tokio/src/executor/task/tests/task.rs index 76319b8d..66fbae59 100644 --- a/tokio/src/executor/task/tests/task.rs +++ b/tokio/src/executor/task/tests/task.rs @@ -28,7 +28,7 @@ fn create_complete_drop() { let task = task::background(task); let mock = mock().bind(&task).release_local(); - let mock = From::from(&mock); + let mock = &mut || Some(From::from(&mock)); // Nothing is returned assert!(task.run(mock).is_none()); @@ -53,7 +53,7 @@ fn create_yield_complete_drop() { let task = task::background(task); let mock = mock().bind(&task).release_local(); - let mock = From::from(&mock); + let mock = &mut || Some(From::from(&mock)); // Task is returned let task = assert_some!(task.run(mock)); @@ -83,7 +83,7 @@ fn create_clone_yield_complete_drop() { let task = task::background(task); let mock = mock().bind(&task).release_local(); - let mock = From::from(&mock); + let mock = &mut || Some(From::from(&mock)); // Task is returned let task = assert_some!(task.run(mock)); @@ -111,7 +111,7 @@ fn create_wake_drop() { let mock = mock().bind(&task).schedule().release_local(); - assert_none!(task.run(From::from(&mock))); + assert_none!(task.run(&mut || Some(From::from(&mock)))); assert_none!(mock.next_pending_run()); // The future was **not** dropped. @@ -121,7 +121,7 @@ fn create_wake_drop() { let task = assert_some!(mock.next_pending_run()); - assert_none!(task.run(From::from(&mock))); + assert_none!(task.run(&mut || Some(From::from(&mock)))); // The future was dropped. assert!(did_drop.did_drop_future()); @@ -143,7 +143,7 @@ fn notify_complete() { let task = task::background(task); let mock = mock().bind(&task).release_local(); - let mock = From::from(&mock); + let mock = &mut || Some(From::from(&mock)); assert_none!(task.run(mock)); assert!(did_drop.did_drop_future()); @@ -165,9 +165,9 @@ fn complete_on_second_schedule_obj() { let mock2 = mock().bind(&task).release(); // Task is returned - let task = assert_some!(task.run(From::from(&mock2))); + let task = assert_some!(task.run(&mut || Some(From::from(&mock2)))); - assert_none!(task.run(From::from(&mock1))); + assert_none!(task.run(&mut || Some(From::from(&mock1)))); // The message was sent assert!(rx.try_recv().is_ok()); @@ -187,7 +187,7 @@ fn join_task_immediate_drop_handle() { let mock = mock().bind(&task).release_local(); - assert!(task.run(From::from(&mock)).is_none()); + assert!(task.run(&mut || Some(From::from(&mock))).is_none()); assert!(did_drop.did_drop_future()); assert!(did_drop.did_drop_output()); @@ -202,7 +202,7 @@ fn join_task_immediate_complete_1() { let mock = mock().bind(&task).release_local(); - assert!(task.run(From::from(&mock)).is_none()); + assert!(task.run(&mut || Some(From::from(&mock))).is_none()); assert!(did_drop.did_drop_future()); assert!(!did_drop.did_drop_output()); @@ -227,7 +227,7 @@ fn join_task_immediate_complete_2() { assert_pending!(handle.poll()); - assert!(task.run(From::from(&mock)).is_none()); + assert!(task.run(&mut || Some(From::from(&mock))).is_none()); assert!(did_drop.did_drop_future()); assert!(!did_drop.did_drop_output()); @@ -253,14 +253,14 @@ fn join_task_complete_later() { let mock = mock().bind(&task).release_local(); - let task = assert_some!(task.run(From::from(&mock))); + let task = assert_some!(task.run(&mut || Some(From::from(&mock)))); assert!(!did_drop.did_drop_future()); assert!(!did_drop.did_drop_output()); assert_pending!(handle.poll()); - assert_none!(task.run(From::from(&mock))); + assert_none!(task.run(&mut || Some(From::from(&mock)))); assert!(handle.is_woken()); let out = assert_ready_ok!(handle.poll()); @@ -288,12 +288,12 @@ fn drop_join_after_poll() { assert_pending!(handle.poll()); drop(handle); - let task = assert_some!(task.run(From::from(&mock))); + let task = assert_some!(task.run(&mut || Some(From::from(&mock)))); assert!(!did_drop.did_drop_future()); assert!(!did_drop.did_drop_output()); - assert_none!(task.run(From::from(&mock))); + assert_none!(task.run(&mut || Some(From::from(&mock)))); assert!(did_drop.did_drop_future()); assert!(did_drop.did_drop_output()); @@ -317,7 +317,7 @@ fn join_handle_change_task_complete() { assert_pending!(t1.poll()); drop(t1); - let task = assert_some!(task.run(From::from(&mock))); + let task = assert_some!(task.run(&mut || Some(From::from(&mock)))); let mut t2 = spawn(poll_fn(|cx| Pin::new(&mut handle).poll(cx))); assert_pending!(t2.poll()); @@ -325,7 +325,7 @@ fn join_handle_change_task_complete() { assert!(!did_drop.did_drop_future()); assert!(!did_drop.did_drop_output()); - assert_none!(task.run(From::from(&mock))); + assert_none!(task.run(&mut || Some(From::from(&mock)))); assert!(t2.is_woken()); @@ -347,7 +347,7 @@ fn drop_handle_after_complete() { let mock = mock().bind(&task).release_local(); - assert!(task.run(From::from(&mock)).is_none()); + assert!(task.run(&mut || Some(From::from(&mock))).is_none()); assert!(did_drop.did_drop_future()); assert!(!did_drop.did_drop_output()); @@ -370,7 +370,7 @@ fn non_initial_task_state_drop_join_handle_without_polling() { let mock = mock().bind(&task).schedule().release_local(); - assert_none!(task.run(From::from(&mock))); + assert_none!(task.run(&mut || Some(From::from(&mock)))); drop(handle); @@ -380,7 +380,7 @@ fn non_initial_task_state_drop_join_handle_without_polling() { tx.send(()).unwrap(); let task = assert_some!(mock.next_pending_run()); - assert!(task.run(From::from(&mock)).is_none()); + assert!(task.run(&mut || Some(From::from(&mock))).is_none()); assert!(did_drop.did_drop_future()); assert!(did_drop.did_drop_output()); @@ -400,7 +400,7 @@ fn task_panic_background() { let mock = mock().bind(&task).release_local(); - assert!(task.run(From::from(&mock)).is_none()); + assert!(task.run(&mut || Some(From::from(&mock))).is_none()); assert!(did_drop.did_drop_future()); } @@ -422,7 +422,7 @@ fn task_panic_join() { assert_pending!(handle.poll()); - assert!(task.run(From::from(&mock)).is_none()); + assert!(task.run(&mut || Some(From::from(&mock))).is_none()); assert!(did_drop.did_drop_future()); assert!(handle.is_woken()); @@ -443,12 +443,12 @@ fn complete_second_schedule_obj_before_join() { assert_pending!(handle.poll()); - assert_none!(task.run(From::from(&mock2))); + assert_none!(task.run(&mut || Some(From::from(&mock2)))); tx.send("hello").unwrap(); let task = assert_some!(mock2.next_pending_run()); - assert_none!(task.run(From::from(&mock1))); + assert_none!(task.run(&mut || Some(From::from(&mock1)))); assert!(did_drop.did_drop_future()); // The join handle was notified @@ -476,12 +476,12 @@ fn complete_second_schedule_obj_after_join() { assert_pending!(handle.poll()); - assert_none!(task.run(From::from(&mock2))); + assert_none!(task.run(&mut || Some(From::from(&mock2)))); tx.send("hello").unwrap(); let task = assert_some!(mock2.next_pending_run()); - assert_none!(task.run(From::from(&mock1))); + assert_none!(task.run(&mut || Some(From::from(&mock1)))); assert!(did_drop.did_drop_future()); // The join handle was notified @@ -512,7 +512,7 @@ fn shutdown_from_list_before_notified() { let mock = mock().bind(&task).release(); assert_pending!(handle.poll()); - assert_none!(task.run(From::from(&mock))); + assert_none!(task.run(&mut || Some(From::from(&mock)))); list.shutdown(); assert!(did_drop.did_drop_future()); @@ -542,7 +542,7 @@ fn shutdown_from_list_after_notified() { let mock = mock().bind(&task).schedule().release(); assert_pending!(handle.poll()); - assert_none!(task.run(From::from(&mock))); + assert_none!(task.run(&mut || Some(From::from(&mock)))); tx.send(()).unwrap(); @@ -552,7 +552,7 @@ fn shutdown_from_list_after_notified() { assert_none!(mock.next_pending_drop()); - assert_none!(task.run(From::from(&mock))); + assert_none!(task.run(&mut || Some(From::from(&mock)))); assert!(did_drop.did_drop_future()); assert!(handle.is_woken()); @@ -580,8 +580,8 @@ fn shutdown_from_list_after_complete() { let m2 = mock(); assert_pending!(handle.poll()); - let task = assert_some!(task.run(From::from(&m1))); - assert_none!(task.run(From::from(&m2))); + let task = assert_some!(task.run(&mut || Some(From::from(&m1)))); + assert_none!(task.run(&mut || Some(From::from(&m2)))); assert!(did_drop.did_drop_future()); assert!(handle.is_woken()); @@ -626,7 +626,7 @@ fn shutdown_from_task_after_notified() { let mock = mock().bind(&task).schedule().release(); assert_pending!(handle.poll()); - assert_none!(task.run(From::from(&mock))); + assert_none!(task.run(&mut || Some(From::from(&mock)))); tx.send(()).unwrap(); diff --git a/tokio/src/executor/thread_pool/builder.rs b/tokio/src/executor/thread_pool/builder.rs index 7955e286..c1298a5a 100644 --- a/tokio/src/executor/thread_pool/builder.rs +++ b/tokio/src/executor/thread_pool/builder.rs @@ -155,7 +155,14 @@ impl Builder { let (shutdown_tx, shutdown_rx) = shutdown::channel(); let around_worker = self.around_worker.as_ref().map(Arc::clone); - let launch_worker = move |worker: Worker<BoxedPark<P>>| { + let launch_worker = Arc::new(Box::new(move |worker: Worker<BoxedPark<P>>| { + // NOTE: It might seem like the shutdown_tx that's moved into this Arc is never + // dropped, and that shutdown_rx will therefore never see EOF, but that is not actually + // the case. Only `build_with_park` and each worker hold onto a copy of this Arc. + // `build_with_park` drops it immediately, and the workers drop theirs when their `run` + // method returns (and their copy of the Arc are dropped). In fact, we don't actually + // _need_ a copy of `shutdown_tx` for each worker thread; having them all hold onto + // this Arc, which in turn holds the last `shutdown_tx` would have been sufficient. let shutdown_tx = shutdown_tx.clone(); let around_worker = around_worker.as_ref().map(Arc::clone); Box::new(move || { @@ -186,7 +193,8 @@ impl Builder { // Dropping the handle must happen __after__ the callback drop(shutdown_tx); }) as Box<dyn FnOnce() + Send + 'static> - }; + }) + as Box<dyn Fn(Worker<BoxedPark<P>>) -> Box<dyn FnOnce() + Send> + Send + Sync>); let mut blocking = crate::executor::blocking::Builder::default(); blocking.name(self.name.clone()); @@ -197,7 +205,8 @@ impl Builder { let (pool, workers) = worker::create_set::<_, BoxedPark<P>>( self.pool_size, - |i| BoxedPark::new(build_park(i)), + |i| Box::new(BoxedPark::new(build_park(i))), + Arc::clone(&launch_worker), blocking.clone(), ); diff --git a/tokio/src/executor/thread_pool/current.rs b/tokio/src/executor/thread_pool/current.rs index 6910dca1..f02be101 100644 --- a/tokio/src/executor/thread_pool/current.rs +++ b/tokio/src/executor/thread_pool/current.rs @@ -50,6 +50,10 @@ where }) } +pub(super) fn clear() { + CURRENT_WORKER.with(|cell| cell.set(Inner::new())) +} + pub(super) fn get<F, R>(f: F) -> R where F: FnOnce(&Current) -> R, diff --git a/tokio/src/executor/thread_pool/mod.rs b/tokio/src/executor/thread_pool/mod.rs index bab594ef..09179245 100644 --- a/tokio/src/executor/thread_pool/mod.rs +++ b/tokio/src/executor/thread_pool/mod.rs @@ -37,6 +37,9 @@ mod worker; #[cfg(test)] mod tests; +#[cfg(feature = "blocking")] +pub use worker::blocking; + // Re-export `task::Error` pub use crate::executor::task::Error; diff --git a/tokio/src/executor/thread_pool/queue/inject.rs b/tokio/src/executor/thread_pool/queue/inject.rs index dbad65fd..5f8caf1a 100644 --- a/tokio/src/executor/thread_pool/queue/inject.rs +++ b/tokio/src/executor/thread_pool/queue/inject.rs @@ -19,6 +19,11 @@ impl<T: 'static> Inject<T> { self.cluster.global.push(task, f) } + /// Check if the queue has been closed + pub(crate) fn is_closed(&self) -> bool { + self.cluster.global.is_closed() + } + /// Close the queue /// /// Returns `true` if the channel was closed. `false` indicates the pool was diff --git a/tokio/src/executor/thread_pool/set.rs b/tokio/src/executor/thread_pool/set.rs index dddd50e8..5ee2f544 100644 --- a/tokio/src/executor/thread_pool/set.rs +++ b/tokio/src/executor/thread_pool/set.rs @@ -150,6 +150,10 @@ where } } + pub(crate) fn is_closed(&self) -> bool { + self.inject.is_closed() + } + pub(crate) fn len(&self) -> usize { self.shared.len() } @@ -175,10 +179,19 @@ where } } +impl<P: 'static> Set<P> { + /// Wait for all locks on the injection queue to drop. + /// + /// This is done by locking w/o doing anything. + pub(super) fn wait_for_unlocked(&self) { + self.inject.wait_for_unlocked(); + } +} + impl<P: 'static> Drop for Set<P> { fn drop(&mut self) { // Before proceeding, wait for all concurrent wakers to exit - self.inject.wait_for_unlocked(); + self.wait_for_unlocked(); } } diff --git a/tokio/src/executor/thread_pool/tests/loom_pool.rs b/tokio/src/executor/thread_pool/tests/loom_pool.rs index 55da54d0..df754070 100644 --- a/tokio/src/executor/thread_pool/tests/loom_pool.rs +++ b/tokio/src/executor/thread_pool/tests/loom_pool.rs @@ -1,9 +1,9 @@ -use crate::spawn; use crate::executor::loom::sync::atomic::Ordering::{Acquire, Relaxed, Release}; use crate::executor::loom::sync::atomic::{AtomicBool, AtomicUsize}; use crate::executor::loom::sync::{Arc, Mutex}; use crate::executor::tests::loom_oneshot as oneshot; -use crate::executor::thread_pool::ThreadPool; +use crate::executor::thread_pool::{self, Builder, ThreadPool}; +use crate::spawn; use std::future::Future; @@ -42,6 +42,58 @@ fn pool_multi_spawn() { } #[test] +fn only_blocking() { + loom::model(|| { + let mut pool = Builder::new().num_threads(1).build(); + let (block_tx, block_rx) = oneshot::channel(); + + pool.spawn(async move { + thread_pool::blocking(move || { + block_tx.send(()); + }) + }); + + block_rx.recv(); + pool.shutdown_now(); + }); +} + +#[test] +fn blocking_and_regular() { + const NUM: usize = 3; + loom::model(|| { + let mut pool = Builder::new().num_threads(1).build(); + let cnt = Arc::new(AtomicUsize::new(0)); + + let (block_tx, block_rx) = oneshot::channel(); + let (done_tx, done_rx) = oneshot::channel(); + let done_tx = Arc::new(Mutex::new(Some(done_tx))); + + pool.spawn(async move { + thread_pool::blocking(move || { + block_tx.send(()); + }) + }); + + for _ in 0..NUM { + let cnt = cnt.clone(); + let done_tx = done_tx.clone(); + + pool.spawn(async move { + if NUM == cnt.fetch_add(1, Relaxed) + 1 { + done_tx.lock().unwrap().take().unwrap().send(()); + } + }); + } + + done_rx.recv(); + block_rx.recv(); + + pool.shutdown_now(); + }); +} + +#[test] fn pool_multi_notify() { loom::model(|| { let pool = ThreadPool::new(); diff --git a/tokio/src/executor/thread_pool/tests/loom_queue.rs b/tokio/src/executor/thread_pool/tests/loom_queue.rs index 8b0214a3..cc9ae449 100644 --- a/tokio/src/executor/thread_pool/tests/loom_queue.rs +++ b/tokio/src/executor/thread_pool/tests/loom_queue.rs @@ -25,7 +25,7 @@ fn multi_worker() { // Try to work while let Some(task) = q.pop_local_first() { - assert!(task.run(From::from(&NOOP_SCHEDULE)).is_none()); + assert!(task.run(&mut || Some(From::from(&NOOP_SCHEDULE))).is_none()); let r = rem.get(); assert!(r > 0); rem.set(r - 1); @@ -33,7 +33,7 @@ fn multi_worker() { // Try to steal if let Some(task) = q.steal(0) { - assert!(task.run(From::from(&NOOP_SCHEDULE)).is_none()); + assert!(task.run(&mut || Some(From::from(&NOOP_SCHEDULE))).is_none()); let r = rem.get(); assert!(r > 0); rem.set(r - 1); diff --git a/tokio/src/executor/thread_pool/tests/queue.rs b/tokio/src/executor/thread_pool/tests/queue.rs index be89d94d..94a3c7a1 100644 --- a/tokio/src/executor/thread_pool/tests/queue.rs +++ b/tokio/src/executor/thread_pool/tests/queue.rs @@ -252,7 +252,7 @@ fn num(task: Task<Noop>) -> u32 { use std::task::Context; use std::task::Poll::*; - assert!(task.run(From::from(&NOOP_SCHEDULE)).is_none()); + assert!(task.run(&mut || Some(From::from(&NOOP_SCHEDULE))).is_none()); // Find the task that completed TASKS.with(|c| { diff --git a/tokio/src/executor/thread_pool/tests/worker.rs b/tokio/src/executor/thread_pool/tests/worker.rs index dc132ab7..f5f9bace 100644 --- a/tokio/src/executor/thread_pool/tests/worker.rs +++ b/tokio/src/executor/thread_pool/tests/worker.rs @@ -3,6 +3,8 @@ use crate::executor::thread_pool; use tokio_test::assert_ok; +use std::sync::Arc; + macro_rules! pool { (2) => {{ let (pool, mut w, mock_park) = pool!(!2); @@ -11,8 +13,14 @@ macro_rules! pool { (! $n:expr) => {{ let mut mock_park = crate::executor::tests::mock_park::MockPark::new(); let blocking = std::sync::Arc::new(crate::executor::blocking::Pool::default()); - let (pool, workers) = - thread_pool::create_pool($n, |index| mock_park.mk_park(index), blocking); + let (pool, workers) = thread_pool::create_pool( + $n, + |index| Box::new(mock_park.mk_park(index)), + Arc::new(Box::new(|_| { + unreachable!("attempted to move worker during non-blocking test") + })), + blocking, + ); (pool, workers, mock_park) }}; } @@ -39,8 +47,8 @@ fn execute_single_task() { #[test] fn task_migrates() { - use std::sync::mpsc; use crate::sync::oneshot; + use std::sync::mpsc; let (p, mut w0, mut w1, ..) = pool!(2); let (tx1, rx1) = oneshot::channel(); diff --git a/tokio/src/executor/thread_pool/worker.rs b/tokio/src/executor/thread_pool/worker.rs index 7fbe6589..b78c214a 100644 --- a/tokio/src/executor/thread_pool/worker.rs +++ b/tokio/src/executor/thread_pool/worker.rs @@ -3,8 +3,57 @@ use crate::executor::park::{Park, Unpark}; use crate::executor::task::Task; use crate::executor::thread_pool::{current, Owned, Shared}; +use std::cell::Cell; +use std::ops::{Deref, DerefMut}; use std::time::Duration; +// The Arc<Box<_>> is needed because loom doesn't support Arc<T> where T: !Sized +// loom doesn't support that because it requires CoerceUnsized, which is unstable +type LaunchWorker<P> = Arc<Box<dyn Fn(Worker<P>) -> Box<dyn FnOnce() + Send> + Send + Sync>>; + +thread_local! { + /// Thread-local tracking the current executor + static ON_BLOCK: Cell<Option<*mut dyn FnMut()>> = Cell::new(None) +} + +/// Run the provided blocking function without blocking the executor. +/// +/// In general, issuing a blocking call or performing a lot of compute in a future without +/// yielding is not okay, as it may prevent the executor from driving other futures forward. +/// If you run a closure through this method, the current executor thread will relegate all its +/// executor duties to another (possibly new) thread, and only then poll the task. Note that this +/// requires additional synchronization. +/// +/// # Examples +/// +/// ``` +/// # async fn docs() { +/// tokio::executor::thread_pool::blocking(move || { +/// // do some compute-heavy work or call synchronous code +/// }); +/// # } +/// ``` +#[cfg(feature = "blocking")] +pub fn blocking<F, R>(f: F) -> R +where + F: FnOnce() -> R, +{ + // Make the current worker give away its Worker to another thread so that we can safely block + // this one without preventing progress on other futures the worker owns. + ON_BLOCK.with(|ob| { + let allow_blocking = ob + .get() + .expect("can only call blocking when on Tokio runtime"); + + // This is safe, because ON_BLOCK was set from an &mut dyn FnMut in the worker that wraps + // the worker's operation, and is unset just prior to when the FnMut is dropped. + let allow_blocking = unsafe { &mut *allow_blocking }; + + allow_blocking(); + f() + }) +} + // TODO: remove this re-export pub(super) use crate::executor::thread_pool::set::Set; @@ -13,17 +62,24 @@ pub(crate) struct Worker<P: Park + 'static> { entry: Entry<P::Unpark>, /// Park the thread - park: P, + park: Box<P>, + + /// Fn for launching another Worker should we need it + launch_worker: LaunchWorker<P>, + + /// To indicate that the Worker has been given away and should no longer be used + gone: Cell<bool>, } pub(crate) fn create_set<F, P>( pool_size: usize, mk_park: F, + launch_worker: LaunchWorker<P>, blocking: Arc<crate::executor::blocking::Pool>, ) -> (Arc<Set<P::Unpark>>, Vec<Worker<P>>) where P: Send + Park, - F: FnMut(usize) -> P, + F: FnMut(usize) -> Box<P>, { // Create the parks... let parks: Vec<_> = (0..pool_size).map(mk_park).collect(); @@ -40,7 +96,7 @@ where .enumerate() .map(|(index, park)| { // unsafe is safe because we call Worker::new only once with each index in the pool - unsafe { Worker::new(pool.clone(), index, park) } + unsafe { Worker::new(pool.clone(), index, park, Arc::clone(&launch_worker)) } }) .collect(); @@ -58,10 +114,17 @@ where P: Send + Park, { // unsafe because new may only be called once for each index in pool's set - pub(super) unsafe fn new(pool: Arc<Set<P::Unpark>>, index: usize, park: P) -> Self { + pub(super) unsafe fn new( + pool: Arc<Set<P::Unpark>>, + index: usize, + park: Box<P>, + launch_worker: LaunchWorker<P>, + ) -> Self { Worker { entry: Entry::new(pool, index), park, + launch_worker, + gone: Cell::new(false), } } @@ -72,18 +135,149 @@ where let mut executor = &**pool; let entry = &mut self.entry; - let park = &mut self.park; + let launch_worker = &self.launch_worker; let blocking = &executor.blocking; + let gone = &self.gone; + + let mut park = DropNotGone:: |