diff options
author | Stjepan Glavina <stjepang@gmail.com> | 2018-12-28 20:34:54 +0100 |
---|---|---|
committer | Toby Lawrence <tobz@users.noreply.github.com> | 2018-12-28 14:34:54 -0500 |
commit | fdf4aba6219fd5a30097a3a38231aa51691c4ed7 (patch) | |
tree | 8f9f3cda970b128b24f5d0b60238cc52b14b9f62 /tokio-threadpool | |
parent | 201b6ce53a0f2fe7cbba50802253fda33e6e3bd5 (diff) |
threadpool: introduce a global task queue (#798)
Diffstat (limited to 'tokio-threadpool')
-rw-r--r-- | tokio-threadpool/Cargo.toml | 1 | ||||
-rw-r--r-- | tokio-threadpool/src/builder.rs | 6 | ||||
-rw-r--r-- | tokio-threadpool/src/lib.rs | 1 | ||||
-rw-r--r-- | tokio-threadpool/src/pool/mod.rs | 98 | ||||
-rw-r--r-- | tokio-threadpool/src/sender.rs | 5 | ||||
-rw-r--r-- | tokio-threadpool/src/shutdown.rs | 8 | ||||
-rw-r--r-- | tokio-threadpool/src/task/mod.rs | 8 | ||||
-rw-r--r-- | tokio-threadpool/src/task/queue.rs | 113 | ||||
-rw-r--r-- | tokio-threadpool/src/worker/entry.rs | 43 | ||||
-rw-r--r-- | tokio-threadpool/src/worker/mod.rs | 95 |
10 files changed, 90 insertions, 288 deletions
diff --git a/tokio-threadpool/Cargo.toml b/tokio-threadpool/Cargo.toml index 21875ea1..f007b8ac 100644 --- a/tokio-threadpool/Cargo.toml +++ b/tokio-threadpool/Cargo.toml @@ -20,6 +20,7 @@ categories = ["concurrency", "asynchronous"] [dependencies] tokio-executor = { version = "0.1.2", path = "../tokio-executor" } futures = "0.1.19" +crossbeam-channel = "0.3.3" crossbeam-deque = "0.6.1" crossbeam-utils = "0.6.0" num_cpus = "1.2" diff --git a/tokio-threadpool/src/builder.rs b/tokio-threadpool/src/builder.rs index 05af1820..9c49135e 100644 --- a/tokio-threadpool/src/builder.rs +++ b/tokio-threadpool/src/builder.rs @@ -3,6 +3,7 @@ use config::{Config, MAX_WORKERS}; use park::{BoxPark, BoxedPark, DefaultPark}; use shutdown::ShutdownTrigger; use pool::{Pool, MAX_BACKUP}; +use task::Queue; use thread_pool::ThreadPool; use worker::{self, Worker, WorkerId}; @@ -413,11 +414,13 @@ impl Builder { workers.into() }; + let queue = Arc::new(Queue::new()); + // Create a trigger that will clean up resources on shutdown. // // The `Pool` contains a weak reference to it, while `Worker`s and the `ThreadPool` contain // strong references. - let trigger = Arc::new(ShutdownTrigger::new(workers.clone())); + let trigger = Arc::new(ShutdownTrigger::new(workers.clone(), queue.clone())); // Create the pool let pool = Arc::new(Pool::new( @@ -425,6 +428,7 @@ impl Builder { Arc::downgrade(&trigger), self.max_blocking, self.config.clone(), + queue, )); ThreadPool::new2(pool, trigger) diff --git a/tokio-threadpool/src/lib.rs b/tokio-threadpool/src/lib.rs index 9fe53116..8f6f82d1 100644 --- a/tokio-threadpool/src/lib.rs +++ b/tokio-threadpool/src/lib.rs @@ -79,6 +79,7 @@ extern crate tokio_executor; +extern crate crossbeam_channel; extern crate crossbeam_deque as deque; extern crate crossbeam_utils; #[macro_use] diff --git a/tokio-threadpool/src/pool/mod.rs b/tokio-threadpool/src/pool/mod.rs index 52dbf805..eb48e349 100644 --- a/tokio-threadpool/src/pool/mod.rs +++ b/tokio-threadpool/src/pool/mod.rs @@ -15,7 +15,7 @@ use self::backup_stack::BackupStack; use config::Config; use shutdown::ShutdownTrigger; -use task::{Task, Blocking}; +use task::{Blocking, Queue, Task}; use worker::{self, Worker, WorkerId}; use futures::Poll; @@ -59,6 +59,12 @@ pub(crate) struct Pool { // The number of workers will *usually* be small. pub workers: Arc<[worker::Entry]>, + // The global MPMC queue of tasks. + // + // Spawned tasks are pushed into this queue. Although worker threads have their own dedicated + // task queues, they periodically steal tasks from this global queue, too. + pub queue: Arc<Queue>, + // Completes the shutdown process when the `ThreadPool` and all `Worker`s get dropped. // // When spawning a new `Worker`, this weak reference is upgraded and handed out to the new @@ -90,6 +96,7 @@ impl Pool { trigger: Weak<ShutdownTrigger>, max_blocking: usize, config: Config, + queue: Arc<Queue>, ) -> Pool { let pool_size = workers.len(); let total_size = max_blocking + pool_size; @@ -117,6 +124,7 @@ impl Pool { sleep_stack: CachePadded::new(worker::Stack::new()), num_workers: AtomicUsize::new(0), workers, + queue, trigger, backup, backup_stack, @@ -264,50 +272,10 @@ impl Pool { pub fn submit_external(&self, task: Arc<Task>, pool: &Arc<Pool>) { debug_assert_eq!(*self, **pool); - use worker::Lifecycle::Notified; - - // First try to get a handle to a sleeping worker. This ensures that - // sleeping tasks get woken up - if let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Notified, false) { - trace!("submit to existing worker; idx={}; state={:?}", idx, worker_state); - self.submit_to_external(idx, task, worker_state, pool); - return; - } - - // All workers are active, so pick a random worker and submit the - // task to it. - self.submit_to_random(task, pool); - } - - /// Submit a task to a random worker - /// - /// Called from outside of the scheduler, this function is how new tasks - /// enter the system. - pub fn submit_to_random(&self, task: Arc<Task>, pool: &Arc<Pool>) { - debug_assert_eq!(*self, **pool); - - let len = self.workers.len(); - let idx = self.rand_usize() % len; - - trace!(" -> submitting to random; idx={}", idx); - - let state = self.workers[idx].load_state(); - self.submit_to_external(idx, task, state, pool); - } - - fn submit_to_external(&self, - idx: usize, - task: Arc<Task>, - state: worker::State, - pool: &Arc<Pool>) - { - debug_assert_eq!(*self, **pool); - - let entry = &self.workers[idx]; + trace!(" -> submit external"); - if !entry.submit_external(task, state) { - self.spawn_thread(WorkerId::new(idx), pool); - } + self.queue.push(task); + self.signal_work(pool); } pub fn release_backup(&self, backup_id: BackupId) -> Result<(), ()> { @@ -438,43 +406,21 @@ impl Pool { pub fn signal_work(&self, pool: &Arc<Pool>) { debug_assert_eq!(*self, **pool); - use worker::Lifecycle::*; + use worker::Lifecycle::Signaled; - if let Some((idx, mut worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, false) { + if let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, false) { let entry = &self.workers[idx]; - debug_assert!(worker_state.lifecycle() != Signaled, "actual={:?}", worker_state.lifecycle()); - - // Transition the worker state to signaled - loop { - let mut next = worker_state; - - next.set_lifecycle(Signaled); + debug_assert!( + worker_state.lifecycle() != Signaled, + "actual={:?}", worker_state.lifecycle(), + ); - let actual = entry.state.compare_and_swap( - worker_state.into(), next.into(), AcqRel).into(); + trace!("signal_work -- notify; idx={}", idx); - if actual == worker_state { - break; - } - - worker_state = actual; - } - - // The state has been transitioned to signal, now we need to wake up - // the worker if necessary. - match worker_state.lifecycle() { - Sleeping => { - trace!("signal_work -- wakeup; idx={}", idx); - self.workers[idx].wakeup(); - } - Shutdown => { - trace!("signal_work -- spawn; idx={}", idx); - self.spawn_thread(WorkerId(idx), pool); - } - Running | Notified | Signaled => { - // The workers are already active. No need to wake them up. - } + if !entry.notify(worker_state) { + trace!("signal_work -- spawn; idx={}", idx); + self.spawn_thread(WorkerId(idx), pool); } } } diff --git a/tokio-threadpool/src/sender.rs b/tokio-threadpool/src/sender.rs index 0ff99d73..de5f0e07 100644 --- a/tokio-threadpool/src/sender.rs +++ b/tokio-threadpool/src/sender.rs @@ -161,7 +161,10 @@ impl<'a> tokio_executor::Executor for &'a Sender { // Create a new task for the future let task = Arc::new(Task::new(future)); - self.pool.submit_to_random(task, &self.pool); + // Call `submit_external()` in order to place the task into the global + // queue. This way all workers have equal chance of running this task, + // which means IO handles will be assigned to reactors more evenly. + self.pool.submit_external(task, &self.pool); Ok(()) } diff --git a/tokio-threadpool/src/shutdown.rs b/tokio-threadpool/src/shutdown.rs index 8167d014..1cc19b54 100644 --- a/tokio-threadpool/src/shutdown.rs +++ b/tokio-threadpool/src/shutdown.rs @@ -1,3 +1,4 @@ +use task::Queue; use worker; use futures::{Future, Poll, Async}; @@ -61,25 +62,30 @@ impl Future for Shutdown { pub(crate) struct ShutdownTrigger { inner: Arc<Mutex<Inner>>, workers: Arc<[worker::Entry]>, + queue: Arc<Queue>, } unsafe impl Send for ShutdownTrigger {} unsafe impl Sync for ShutdownTrigger {} impl ShutdownTrigger { - pub(crate) fn new(workers: Arc<[worker::Entry]>) -> ShutdownTrigger { + pub(crate) fn new(workers: Arc<[worker::Entry]>, queue: Arc<Queue>) -> ShutdownTrigger { ShutdownTrigger { inner: Arc::new(Mutex::new(Inner { task: AtomicTask::new(), completed: false, })), workers, + queue, } } } impl Drop for ShutdownTrigger { fn drop(&mut self) { + // Drain the global task queue. + while self.queue.pop().is_some() {} + // Notify the task interested in shutdown. let mut inner = self.inner.lock().unwrap(); inner.completed = true; diff --git a/tokio-threadpool/src/task/mod.rs b/tokio-threadpool/src/task/mod.rs index be95b2dc..ef873af5 100644 --- a/tokio-threadpool/src/task/mod.rs +++ b/tokio-threadpool/src/task/mod.rs @@ -4,7 +4,7 @@ mod queue; mod state; pub(crate) use self::blocking::{Blocking, CanBlock}; -pub(crate) use self::queue::{Queue, Poll}; +pub(crate) use self::queue::Queue; use self::blocking_state::BlockingState; use self::state::State; @@ -31,9 +31,6 @@ pub(crate) struct Task { /// Task blocking related state blocking: AtomicUsize, - /// Next pointer in the queue that submits tasks to a worker. - next: AtomicPtr<Task>, - /// Next pointer in the queue of tasks pending blocking capacity. next_blocking: AtomicPtr<Task>, @@ -63,7 +60,6 @@ impl Task { Task { state: AtomicUsize::new(State::new().into()), blocking: AtomicUsize::new(BlockingState::new().into()), - next: AtomicPtr::new(ptr::null_mut()), next_blocking: AtomicPtr::new(ptr::null_mut()), future: UnsafeCell::new(Some(task_fut)), } @@ -78,7 +74,6 @@ impl Task { Task { state: AtomicUsize::new(State::stub().into()), blocking: AtomicUsize::new(BlockingState::new().into()), - next: AtomicPtr::new(ptr::null_mut()), next_blocking: AtomicPtr::new(ptr::null_mut()), future: UnsafeCell::new(Some(task_fut)), } @@ -237,7 +232,6 @@ impl Task { impl fmt::Debug for Task { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("Task") - .field("next", &self.next) .field("state", &self.state) .field("future", &"Spawn<BoxFuture>") .finish() diff --git a/tokio-threadpool/src/task/queue.rs b/tokio-threadpool/src/task/queue.rs index 546b8c50..166438e7 100644 --- a/tokio-threadpool/src/task/queue.rs +++ b/tokio-threadpool/src/task/queue.rs @@ -1,33 +1,13 @@ use task::Task; -use std::cell::UnsafeCell; -use std::ptr; use std::sync::Arc; -use std::sync::atomic::AtomicPtr; -use std::sync::atomic::Ordering::{Acquire, Release, AcqRel, Relaxed}; -use crossbeam_utils::CachePadded; +use crossbeam_channel::{unbounded, Receiver, Sender}; #[derive(Debug)] pub(crate) struct Queue { - /// Queue head. - /// - /// This is a strong reference to `Task` (i.e, `Arc<Task>`) - head: CachePadded<AtomicPtr<Task>>, - - /// Tail pointer. This is `Arc<Task>` unless it points to `stub`. - tail: UnsafeCell<*mut Task>, - - /// Stub pointer, used as part of the intrusive mpsc channel algorithm - /// described by 1024cores. - stub: Box<Task>, -} - -#[derive(Debug)] -pub(crate) enum Poll { - Empty, - Inconsistent, - Data(Arc<Task>), + // TODO(stjepang): Use a custom, faster MPMC queue implementation that supports `steal_many()`. + chan: (Sender<Arc<Task>>, Receiver<Arc<Task>>), } // ===== impl Queue ===== @@ -35,93 +15,20 @@ pub(crate) enum Poll { impl Queue { /// Create a new, empty, `Queue`. pub fn new() -> Queue { - let stub = Box::new(Task::stub()); - let ptr = &*stub as *const _ as *mut _; - Queue { - head: CachePadded::new(AtomicPtr::new(ptr)), - tail: UnsafeCell::new(ptr), - stub: stub, + chan: unbounded(), } } /// Push a task onto the queue. - /// - /// This function is `Sync`. + #[inline] pub fn push(&self, task: Arc<Task>) { - unsafe { - self.push2(Arc::into_raw(task)); - } - } - - unsafe fn push2(&self, task: *const Task) { - let task = task as *mut Task; - - // Set the next pointer. This does not require an atomic operation as - // this node is not accessible. The write will be flushed with the next - // operation - (*task).next.store(ptr::null_mut(), Relaxed); - - // Update the head to point to the new node. We need to see the previous - // node in order to update the next pointer as well as release `task` - // to any other threads calling `push`. - let prev = self.head.swap(task, AcqRel); - - // Release `task` to the consume end. - (*prev).next.store(task, Release); + self.chan.0.send(task).unwrap(); } - /// Poll a task from the queue. - /// - /// This function is **not** `Sync` and requires coordination by the caller. - pub unsafe fn poll(&self) -> Poll { - let mut tail = *self.tail.get(); - let mut next = (*tail).next.load(Acquire); - - let stub = &*self.stub as *const _ as *mut _; - - if tail == stub { - if next.is_null() { - return Poll::Empty; - } - - *self.tail.get() = next; - tail = next; - next = (*next).next.load(Acquire); - } - - if !next.is_null() { - *self.tail.get() = next; - - // No ref_count inc is necessary here as this poll is paired - // with a `push` which "forgets" the handle. - return Poll::Data(Arc::from_raw(tail)); - } - - if self.head.load(Acquire) != tail { - return Poll::Inconsistent; - } - - self.push2(stub); - - next = (*tail).next.load(Acquire); - - if !next.is_null() { - *self.tail.get() = next; - - return Poll::Data(Arc::from_raw(tail)); - } - - Poll::Inconsistent - } -} - -impl Drop for Queue { - fn drop(&mut self) { - loop { - if let Poll::Empty = unsafe { self.poll() } { - break - } - } + /// Pop a task from the queue. + #[inline] + pub fn pop(&self) -> Option<Arc<Task>> { + self.chan.1.try_recv().ok() } } diff --git a/tokio-threadpool/src/worker/entry.rs b/tokio-threadpool/src/worker/entry.rs index 459e7cc1..d1013381 100644 --- a/tokio-threadpool/src/worker/entry.rs +++ b/tokio-threadpool/src/worker/entry.rs @@ -1,12 +1,12 @@ use park::{BoxPark, BoxUnpark}; -use task::{Task, Queue}; +use task::Task; use worker::state::{State, PUSHED_MASK}; use std::cell::UnsafeCell; use std::fmt; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::atomic::Ordering::{Acquire, AcqRel, Relaxed}; +use std::sync::atomic::Ordering::{AcqRel, Relaxed}; use crossbeam_utils::CachePadded; use deque; @@ -36,9 +36,6 @@ pub(crate) struct WorkerEntry { // Thread unparker pub unpark: BoxUnpark, - - // MPSC queue of jobs submitted to the worker from an external source. - pub inbound: Queue, } impl WorkerEntry { @@ -50,21 +47,11 @@ impl WorkerEntry { next_sleeper: UnsafeCell::new(0), worker: w, stealer: s, - inbound: Queue::new(), park: UnsafeCell::new(park), unpark, } } - /// Atomically load the worker's state - /// - /// # Ordering - /// - /// An `Acquire` ordering is established on the entry's state variable. - pub fn load_state(&self) -> State { - self.state.load(Acquire).into() - } - /// Atomically unset the pushed flag. /// /// # Return @@ -85,20 +72,15 @@ impl WorkerEntry { self.push_internal(task); } - /// Submits a task to the worker. This assumes that the caller is external - /// to the worker. Internal submissions go through another path. - /// - /// Returns `false` if the worker needs to be spawned. + /// Notifies the worker and returns `false` if it needs to be spawned. /// /// # Ordering /// /// The `state` must have been obtained with an `Acquire` ordering. - pub fn submit_external(&self, task: Arc<Task>, mut state: State) -> bool { + #[inline] + pub fn notify(&self, mut state: State) -> bool { use worker::Lifecycle::*; - // Push the task onto the external queue - self.push_external(task); - loop { let mut next = state; next.notify(); @@ -188,6 +170,7 @@ impl WorkerEntry { /// /// This **must** only be called by the thread that owns the worker entry. /// This function is not `Sync`. + #[inline] pub fn pop_task(&self) -> deque::Pop<Arc<Task>> { self.worker.pop() } @@ -208,23 +191,18 @@ impl WorkerEntry { /// /// This is called when the pool is shutting down. pub fn drain_tasks(&self) { - use deque::Pop; + use deque::Pop::*; loop { match self.worker.pop() { - Pop::Data(_) => {} - Pop::Empty => break, - Pop::Retry => {} + Data(_) => {} + Empty => break, + Retry => {} } } } #[inline] - fn push_external(&self, task: Arc<Task>) { - self.inbound.push(task); - } - - #[inline] pub fn push_internal(&self, task: Arc<Task>) { self.worker.push(task); } @@ -254,7 +232,6 @@ impl fmt::Debug for WorkerEntry { .field("stealer", &self.stealer) .field("park", &"UnsafeCell<BoxPark>") .field("unpark", &"BoxUnpark") - .field("inbound", &self.inbound) .finish() } } diff --git a/tokio-threadpool/src/worker/mod.rs b/tokio-threadpool/src/worker/mod.rs index 98005ca7..a82fe9f5 100644 --- a/tokio-threadpool/src/worker/mod.rs +++ b/tokio-threadpool/src/worker/mod.rs @@ -242,10 +242,6 @@ impl Worker { while self.check_run_state(first) { first = false; - // Poll inbound until empty, transferring all tasks to the internal - // queue. - let consistent = self.drain_inbound(); - // Run the next available task if self.try_run_task(¬ify) { if self.is_blocking.get() { @@ -253,6 +249,8 @@ impl Worker { return; } + // Poll the reactor and the global queue every now and then to + // ensure no task gets left behind. if tick % LIGHT_SLEEP_INTERVAL == 0 { self.sleep_light(); } @@ -264,11 +262,6 @@ impl Worker { continue; } - if !consistent { - spin_cnt = 0; - continue; - } - spin_cnt += 1; // Yield the thread several times before it actually goes to sleep. @@ -423,7 +416,7 @@ impl Worker { if idx < len { match self.pool.workers[idx].steal_tasks(self.entry()) { Steal::Data(task) => { - trace!("stole task"); + trace!("stole task from another worker"); self.run_task(task, notify); @@ -562,48 +555,6 @@ impl Worker { task.run(notify) } - /// Drains all tasks on the extern queue and pushes them onto the internal - /// queue. - /// - /// Returns `true` if the operation was able to complete in a consistent - /// state. - #[inline] - fn drain_inbound(&self) -> bool { - use task::Poll::*; - - let mut found_work = false; - - loop { - let task = unsafe { self.entry().inbound.poll() }; - - match task { - Empty => { - if found_work { - // TODO: Why is this called on every iteration? Would it - // not be better to only signal when work was found - // after waking up? - trace!("found work while draining; signal_work"); - self.pool.signal_work(&self.pool); - } - - return true; - } - Inconsistent => { - if found_work { - trace!("found work while draining; signal_work"); - self.pool.signal_work(&self.pool); - } - - return false; - } - Data(task) => { - found_work = true; - self.entry().push_internal(task); - } - } - } - } - /// Put the worker to sleep /// /// Returns `true` if woken up due to new work arriving. @@ -679,18 +630,16 @@ impl Worker { trace!(" -> starting to sleep; idx={}", self.id.0); + // Do a quick check to see if there are any notifications in the + // reactor or new tasks in the global queue. Since this call will + // clear the wakeup token, we need to check the state again and + // only after that go to sleep. + self.sleep_light(); + // The state has been transitioned to sleeping, we can now wait by // calling the parker. This is done in a loop as condvars can wakeup // spuriously. loop { - unsafe { - (*self.entry().park.get()) - .park() - .unwrap(); - } - - trace!(" -> wakeup; idx={}", self.id.0); - // Reload the state state = self.entry().state.load(Acquire).into(); @@ -722,18 +671,38 @@ impl Worker { unreachable!(); } } + + unsafe { + (*self.entry().park.get()) + .park() + .unwrap(); + } + + trace!(" -> wakeup; idx={}", self.id.0); } } /// This doesn't actually put the thread to sleep. It calls /// `park.park_timeout` with a duration of 0. This allows the park /// implementation to perform any work that might be done on an interval. + /// + /// Returns `true` if this worker has tasks in its queue. fn sleep_light(&self) { + const STEAL_COUNT: usize = 32; + unsafe { (*self.entry().park.get()) .park_timeout(Duration::from_millis(0)) .unwrap(); } + + for _ in 0..STEAL_COUNT { + if let Some(task) = self.pool.queue.pop() { + self.pool.submit(task, &self.pool); + } else { + break; + } + } } fn entry(&self) -> &Entry { @@ -747,14 +716,8 @@ impl Drop for Worker { trace!("shutting down thread; idx={}", self.id.0); if self.should_finalize.get() { - // Get all inbound work and push it onto the work queue. The work - // queue is drained in the next step. - self.drain_inbound(); - // Drain the work queue self.entry().drain_tasks(); - - // TODO: Drain the work queue... } } } |