summaryrefslogtreecommitdiffstats
path: root/src/util
diff options
context:
space:
mode:
authorUwe Klotz <uklotz@mixxx.org>2018-04-18 16:39:07 +0200
committerUwe Klotz <uklotz@mixxx.org>2018-10-30 23:26:56 +0100
commitc236956884e3c7b9a47536f98eb7eb5a126a26e4 (patch)
tree85ea9323501bb2f5176c35679e2307e50eeed49c /src/util
parentdb7fe92a9a5c63e0d9b6f0fe6f1ad379f2b0b46a (diff)
Fix and enable multi-threaded analysis
Diffstat (limited to 'src/util')
-rw-r--r--src/util/workerthread.cpp164
-rw-r--r--src/util/workerthread.h131
-rw-r--r--src/util/workerthreadscheduler.cpp52
-rw-r--r--src/util/workerthreadscheduler.h32
4 files changed, 379 insertions, 0 deletions
diff --git a/src/util/workerthread.cpp b/src/util/workerthread.cpp
new file mode 100644
index 0000000000..a2ca086cff
--- /dev/null
+++ b/src/util/workerthread.cpp
@@ -0,0 +1,164 @@
+#include "util/workerthread.h"
+
+
+namespace {
+
+// Enable trace logging only temporary for debugging purposes
+// during development!
+constexpr bool kEnableTraceLogging = false;
+
+inline
+void logTrace(const mixxx::Logger& log, const char* msg) {
+ if (kEnableTraceLogging) {
+ log.trace() << (msg);
+ }
+}
+
+std::atomic<int> s_threadCounter(0);
+
+} // anonymous namespace
+
+WorkerThread::WorkerThread(
+ const QString& name)
+ : m_name(name),
+ m_logger(m_name.isEmpty() ? "WorkerThread" : m_name.toLatin1().constData()),
+ m_suspend(false),
+ m_stop(false) {
+}
+
+WorkerThread::~WorkerThread() {
+ m_logger.debug() << "Destroying";
+ VERIFY_OR_DEBUG_ASSERT(isFinished()) {
+ stop();
+ m_logger.warning() << "Waiting until finished";
+ // The following operation will block the calling thread!
+ wait();
+ DEBUG_ASSERT(isFinished());
+ }
+}
+
+void WorkerThread::deleteAfterFinished() {
+ if (!isFinished()) {
+ connect(this, SIGNAL(finished()), this, SLOT(deleteLater()));
+ }
+ if (isFinished()) {
+ // Already finished or just finished in the meantime. Calling
+ // deleteLater() twice is safe, though.
+ deleteLater();
+ }
+}
+
+void WorkerThread::run() {
+ if (isStopping()) {
+ return;
+ }
+
+ const int threadNumber = s_threadCounter.fetch_add(1) + 1;
+ const QString threadName =
+ m_name.isEmpty() ? QString::number(threadNumber) : QString("%1 #%2").arg(m_name, QString::number(threadNumber));
+ QThread::currentThread()->setObjectName(threadName);
+
+ m_logger.debug() << "Running";
+
+ doRun();
+
+ m_logger.debug() << "Exiting";
+
+ m_stop.store(true);
+}
+
+void WorkerThread::suspend() {
+ logTrace(m_logger, "Suspending");
+ m_suspend.store(true);
+}
+
+void WorkerThread::resume() {
+ bool suspended = true;
+ // Reset value: true -> false
+ if (m_suspend.compare_exchange_strong(suspended, false)) {
+ logTrace(m_logger, "Resuming");
+ // The thread might just be preparing to suspend after
+ // reading detecting that m_suspend was true. To avoid
+ // a race condition we need to acquire the mutex that
+ // is associated with the wait condition, before
+ // signalling the condition. Otherwise the signal
+ // of the wait condition might arrive before the
+ // thread actually got suspended.
+ std::unique_lock<std::mutex> locked(m_sleepMutex);
+ wake();
+ } else {
+ // Just in case, wake up the thread even if it wasn't
+ // explicitly suspended without locking the mutex. The
+ // thread will suspend itself if it is idle.
+ wake();
+ }
+}
+
+void WorkerThread::wake() {
+ m_logger.debug() << "Waking up";
+ m_sleepWaitCond.notify_one();
+}
+
+void WorkerThread::stop() {
+ m_logger.debug() << "Stopping";
+ m_stop.store(true);
+ // Wake up the thread to make sure that the stop flag is
+ // detected and the thread commits suicide by exiting the
+ // run loop in exec(). Resuming will reset the suspend flag
+ // to wake up not only an idle but also a suspended thread!
+ resume();
+}
+
+void WorkerThread::sleepWhileSuspended() {
+ DEBUG_ASSERT(QThread::currentThread() == this);
+ // The suspend flag is always reset after the stop flag has been set,
+ // so we don't need to check it separately here.
+ if (!m_suspend.load()) {
+ // Early exit without locking the mutex
+ return;
+ }
+ std::unique_lock<std::mutex> locked(m_sleepMutex);
+ sleepWhileSuspended(&locked);
+}
+
+void WorkerThread::sleepWhileSuspended(std::unique_lock<std::mutex>* locked) {
+ DEBUG_ASSERT(locked);
+ while (m_suspend.load()) {
+ logTrace(m_logger, "Sleeping while suspended");
+ m_sleepWaitCond.wait(*locked) ;
+ logTrace(m_logger, "Continuing after sleeping while suspended");
+ }
+}
+
+bool WorkerThread::waitUntilWorkItemsFetched() {
+ if (isStopping()) {
+ // Early exit without locking the mutex
+ return false;
+ }
+ // Keep the mutex locked while idle or suspended
+ std::unique_lock<std::mutex> locked(m_sleepMutex);
+ while (!isStopping()) {
+ FetchWorkResult fetchWorkResult = tryFetchWorkItems();
+ switch (fetchWorkResult) {
+ case FetchWorkResult::Ready:
+ logTrace(m_logger, "Work items fetched and ready");
+ return true;
+ case FetchWorkResult::Idle:
+ logTrace(m_logger, "Sleeping while idle");
+ m_sleepWaitCond.wait(locked) ;
+ logTrace(m_logger, "Continuing after slept while idle");
+ break;
+ case FetchWorkResult::Suspend:
+ logTrace(m_logger, "Suspending while idle");
+ suspend();
+ sleepWhileSuspended(&locked);
+ logTrace(m_logger, "Continuing after suspended while idle");
+ break;
+ case FetchWorkResult::Stop:
+ logTrace(m_logger, "Stopping after trying to fetch work items");
+ stop();
+ break;
+ }
+ }
+ return false;
+}
diff --git a/src/util/workerthread.h b/src/util/workerthread.h
new file mode 100644
index 0000000000..04eebcbfb1
--- /dev/null
+++ b/src/util/workerthread.h
@@ -0,0 +1,131 @@
+#pragma once
+
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+
+#include <QThread>
+
+#include "util/logger.h"
+
+
+// A worker thread without an event loop.
+//
+// This object lives in the creating thread of the host, i.e. does not
+// run its own event loop. It does not does not use slots for communication
+// with its host which would otherwise still be executed in the host's
+// thread.
+//
+// Signals emitted from the internal worker thread by derived classes
+// will queued connections. Communication in the opposite direction is
+// accomplished by using lock-free types to avoid locking the host
+// thread through priority inversion. Lock-free types might also used
+// for any shared state that is read from the host thread after being
+// notified about changes.
+//
+// Derived classes or their owners are responsible to start the thread
+// with the desired priority.
+class WorkerThread : public QThread {
+ Q_OBJECT
+
+ public:
+ explicit WorkerThread(
+ const QString& name = QString());
+ // The destructor must be triggered by calling deleteLater() to
+ // ensure that the thread has already finished and is not running
+ // while destroyed! Connect finished() to deleteAfter() and then
+ // call stop() on the running worker thread explicitly to trigger
+ // the destruction. Use deleteAfterFinished() for this purpose.
+ ~WorkerThread() override;
+
+ void deleteAfterFinished();
+
+ const QString& name() const {
+ return m_name;
+ }
+
+ // Commands the thread to suspend itself asap.
+ void suspend();
+
+ // Resumes a suspended thread by waking it up.
+ void resume();
+
+ // Wakes up a sleeping thread. If the thread has been suspended
+ // it will fall asleep again. A suspended thread needs to be
+ // resumed.
+ void wake();
+
+ // Commands the thread to stop asap. This action is irreversible,
+ // i.e. the thread cannot be restarted once it has been stopped.
+ void stop();
+
+ // Non-blocking atomic read of the stop flag which indicates that
+ // the thread is stopping, i.e. it will soon exit or already has
+ // exited the run loop.
+ bool isStopping() const {
+ return m_stop.load();
+ }
+
+ protected:
+ void run() final;
+
+ // The internal run loop. Not to be confused with the Qt event
+ // loop since the worker thread doesn't have one!
+ // An implementation may exit this loop after all work is done,
+ // which in turn exits and terminates the thread. The loop should
+ // also be left asap when isStopping() returns true. This condition
+ // should be checked repeatedly during execution of the loop and
+ // especially before starting any expensive subtasks.
+ virtual void doRun() = 0;
+
+ enum class FetchWorkResult {
+ Ready,
+ Idle,
+ Suspend,
+ Stop,
+ };
+
+ // Non-blocking function that determines whether the worker thread
+ // is idle (i.e. no new tasks have been scheduled) and should be
+ // either suspended until resumed or put to sleep until woken up.
+ //
+ // Implementing classes are able to control what to do if no more
+ // work is currently available. Returning FetchWorkResult::Idle
+ // preserves the current suspend state and just puts the thread
+ // to sleep until wake() is called. Returning FetchWorkResult::Suspend
+ // will suspend the thread until resume() is called. Returning
+ // FetchWorkResult::Stop will stop the worker thread.
+ //
+ // Implementing classes are responsible for storing the fetched
+ // work items internally for later processing during
+ // doRun().
+ //
+ // The stop flag does not have to be checked when entering this function,
+ // because it has already been checked just before the invocation. Though
+ // the fetch operation may check again before starting any expensive
+ // internal subtask.
+ virtual FetchWorkResult tryFetchWorkItems() = 0;
+
+ // Blocks while idle and not stopped. Returns true when new work items
+ // for processing have been fetched and false if the thread has been
+ // stopped while waiting.
+ bool waitUntilWorkItemsFetched();
+
+ // Blocks the worker thread while the suspend flag is set.
+ // This function must not be called from tryFetchWorkItems()
+ // to avoid a deadlock on the non-recursive mutex!
+ void sleepWhileSuspended();
+
+ private:
+ void sleepWhileSuspended(std::unique_lock<std::mutex>* locked);
+
+ const QString m_name;
+
+ const mixxx::Logger m_logger;
+
+ std::atomic<bool> m_suspend;
+ std::atomic<bool> m_stop;
+
+ std::mutex m_sleepMutex;
+ std::condition_variable m_sleepWaitCond;
+};
diff --git a/src/util/workerthreadscheduler.cpp b/src/util/workerthreadscheduler.cpp
new file mode 100644
index 0000000000..9030ef7f64
--- /dev/null
+++ b/src/util/workerthreadscheduler.cpp
@@ -0,0 +1,52 @@
+#include "util/workerthreadscheduler.h"
+
+#include "util/workerthread.h"
+
+
+WorkerThreadScheduler::WorkerThreadScheduler(
+ int maxWorkers,
+ const QString& name)
+ : WorkerThread(name.isEmpty() ? QString("WorkerThreadScheduler") : name),
+ m_scheduledWorkers(maxWorkers),
+ m_fetchedWorker(nullptr) {
+}
+
+bool WorkerThreadScheduler::scheduleWorker(WorkerThread* worker) {
+ DEBUG_ASSERT(worker);
+ const auto written = m_scheduledWorkers.write(&worker, 1) == 1;
+ DEBUG_ASSERT((written == 0) || (written == 1));
+ return written == 1;
+}
+
+bool WorkerThreadScheduler::resumeWorkers() {
+ // Resume the scheduler thread if workers have been scheduled
+ // in the meantime
+ if (m_scheduledWorkers.readAvailable() > 0) {
+ resume();
+ return true;
+ } else {
+ return false;
+ }
+}
+
+WorkerThread::FetchWorkResult WorkerThreadScheduler::tryFetchWorkItems() {
+ DEBUG_ASSERT(!m_fetchedWorker);
+ WorkerThread* worker;
+ if (m_scheduledWorkers.read(&worker, 1) == 1) {
+ DEBUG_ASSERT(worker);
+ m_fetchedWorker = worker;
+ return FetchWorkResult::Ready;
+ } else {
+ // Suspend the thread after all scheduled workers have
+ // have been resumed.
+ return FetchWorkResult::Suspend;
+ }
+}
+
+void WorkerThreadScheduler::doRun() {
+ while (waitUntilWorkItemsFetched()) {
+ m_fetchedWorker->resume();
+ m_fetchedWorker = nullptr;
+ }
+ DEBUG_ASSERT(isStopping());
+}
diff --git a/src/util/workerthreadscheduler.h b/src/util/workerthreadscheduler.h
new file mode 100644
index 0000000000..25625e52e3
--- /dev/null
+++ b/src/util/workerthreadscheduler.h
@@ -0,0 +1,32 @@
+#pragma once
+
+#include "util/workerthread.h"
+#include "util/fifo.h"
+
+
+class WorkerThread;
+
+// Non-blocking scheduler for worker threads which itself runs
+// as a worker thread. The maximum number of worker threads is
+// limited.
+class WorkerThreadScheduler : public WorkerThread {
+ public:
+ explicit WorkerThreadScheduler(
+ int maxWorkers,
+ const QString& name = QString());
+ ~WorkerThreadScheduler() override = default;
+
+ bool scheduleWorker(WorkerThread* worker);
+
+ bool resumeWorkers();
+
+ protected:
+ void doRun() override;
+
+ FetchWorkResult tryFetchWorkItems() override;
+
+ private:
+ FIFO<WorkerThread*> m_scheduledWorkers;
+
+ WorkerThread* m_fetchedWorker;
+};