summaryrefslogtreecommitdiffstats
path: root/src/analyzer/trackanalysisscheduler.cpp
blob: 04730c3a3d0fa21ec88e3f1540c44112c90a23e3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#include "analyzer/trackanalysisscheduler.h"

#include "library/library.h"
#include "library/trackcollectionmanager.h"
#include "moc_trackanalysisscheduler.cpp"
#include "util/logger.h"

namespace {

mixxx::Logger kLogger("TrackAnalysisScheduler");

constexpr QThread::Priority kWorkerThreadPriority = QThread::LowPriority;

// Maximum frequency of progress updates
constexpr std::chrono::milliseconds kProgressInhibitDuration(100);

void deleteTrackAnalysisScheduler(TrackAnalysisScheduler* plainPtr) {
    if (plainPtr) {
        // Trigger stop
        plainPtr->stop();
        // Release ownership and let Qt delete the queue later
        plainPtr->deleteLater();
    }
}

} // anonymous namespace

TrackAnalysisScheduler::NullPointer::NullPointer()
    : Pointer(nullptr, [](TrackAnalysisScheduler*){}) {
}

//static
TrackAnalysisScheduler::Pointer TrackAnalysisScheduler::createInstance(
        Library* library,
        int numWorkerThreads,
        const UserSettingsPointer& pConfig,
        AnalyzerModeFlags modeFlags) {
    return Pointer(new TrackAnalysisScheduler(
            library,
            numWorkerThreads,
            pConfig,
            modeFlags),
            deleteTrackAnalysisScheduler);
}

TrackAnalysisScheduler::TrackAnalysisScheduler(
        Library* library,
        int numWorkerThreads,
        const UserSettingsPointer& pConfig,
        AnalyzerModeFlags modeFlags)
        : m_library(library),
          m_currentTrackProgress(kAnalyzerProgressUnknown),
          m_currentTrackNumber(0),
          m_dequeuedTracksCount(0),
          // The first signal should always be emitted
          m_lastProgressEmittedAt(Clock::now() - kProgressInhibitDuration) {
    VERIFY_OR_DEBUG_ASSERT(numWorkerThreads > 0) {
            kLogger.warning()
                    << "Invalid number of worker threads:"
                    << numWorkerThreads;
    } else {
        kLogger.debug()
                << "Starting"
                << numWorkerThreads
                << "worker threads. Priority: "
                << (modeFlags & AnalyzerModeFlags::LowPriority ? "low" : "normal");
    }
    // 1st pass: Create worker threads
    m_workers.reserve(numWorkerThreads);
    for (int threadId = 0; threadId < numWorkerThreads; ++threadId) {
        m_workers.emplace_back(AnalyzerThread::createInstance(
                threadId,
                library->dbConnectionPool(),
                pConfig,
                modeFlags));
        connect(m_workers.back().thread(), &AnalyzerThread::progress,
            this, &TrackAnalysisScheduler::onWorkerThreadProgress);
    }
    // 2nd pass: Start worker threads in a suspended state
    for (const auto& worker: m_workers) {
        worker.thread()->suspend();
        worker.thread()->start(kWorkerThreadPriority);
    }
}

TrackAnalysisScheduler::~TrackAnalysisScheduler() {
    kLogger.debug() << "Destroying";
}

void TrackAnalysisScheduler::emitProgressOrFinished() {
    // The finished() signal is emitted regardless of when the last
    // signal has been emitted
    if (allTracksFinished()) {
        m_currentTrackProgress = kAnalyzerProgressUnknown;
        m_currentTrackNumber = 0;
        m_dequeuedTracksCount = 0;
        emit finished();
        return;
    }

    const auto now = Clock::now();
    if (now < (m_lastProgressEmittedAt + kProgressInhibitDuration)) {
        // Inhibit signal
        return;
    }
    m_lastProgressEmittedAt = now;

    DEBUG_ASSERT(m_pendingTrackIds.size() <=
            static_cast<size_t>(m_dequeuedTracksCount));
    const int finishedTracksCount =
            m_dequeuedTracksCount - static_cast<int>(m_pendingTrackIds.size());

    AnalyzerProgress workerProgressSum = 0;
    int workerProgressCount = 0;
    for (const auto& worker: m_workers) {
        const AnalyzerProgress workerProgress = worker.analyzerProgress();
        if (workerProgress >= kAnalyzerProgressNone) {
            workerProgressSum += workerProgress;
            ++workerProgressCount;
        }
    }
    // The following algorithm/heuristic is used for calculating the
    // amortized analysis progress (current track number + current
    // track progress) across all worker threads. It results in a
    // simple and almost linear progress display when multiple threads
    // are running in parallel. It also covers the expected behavior
    // for the single-threaded case. The receiver of progress updates
    // should not need to know how many threads are actually processing
    // tracks concurrently behind the scenes.
    if (workerProgressCount > 0) {
        DEBUG_ASSERT(kAnalyzerProgressNone == 0);
        DEBUG_ASSERT(kAnalyzerProgressDone == 1);
        const int inProgressCount =
                math_max(1, int(std::ceil(workerProgressSum)));
        const AnalyzerProgress currentTrackProgress =
                workerProgressSum - std::floor(workerProgressSum);
        // The calculation of inProgressCount is only an approximation.
        // In some situations the calculated virtual current track number
        // = finishedTracksCount + inProgressCount exceeds the upper
        // bound m_dequeuedTracksCount. Using the minimum of both values
        // is an appropriate choice for reporting continuous progress.
        const int currentTrackNumber =
                math_min(finishedTracksCount + inProgressCount, m_dequeuedTracksCount);
        // The combination of the values current count (primary) and current
        // progress (secondary) should never decrease to avoid confusion
        if (m_currentTrackNumber < currentTrackNumber) {
            m_currentTrackNumber = currentTrackNumber;
            // Unconditional progress update
            m_currentTrackProgress = currentTrackProgress;
        } else if (m_currentTrackNumber == currentTrackNumber) {
            // Conditional progress update if current count didn't change
            if (m_currentTrackProgress >= kAnalyzerProgressNone) {
                // Current progress should not decrease while the count is constant
                m_currentTrackProgress = math_max(m_currentTrackProgress, currentTrackProgress);
            } else {
                // Initialize current progress
                m_currentTrackProgress = currentTrackProgress;
            }
        }
    } else {
        if (m_currentTrackNumber < finishedTracksCount) {
            m_currentTrackNumber = finishedTracksCount;
        }
    }
    const int totalTracksCount =
            m_dequeuedTracksCount + static_cast<int>(m_queuedTrackIds.size());
    DEBUG_ASSERT(m_currentTrackNumber <= m_dequeuedTracksCount);
    DEBUG_ASSERT(m_dequeuedTracksCount <= totalTracksCount);
    emit progress(
            m_currentTrackProgress,
            m_currentTrackNumber,
            totalTracksCount);
}

void TrackAnalysisScheduler::onWorkerThreadProgress(
        int threadId,
        AnalyzerThreadState threadState,
        TrackId trackId,
        AnalyzerProgress analyzerProgress) {
    if (kLogger.traceEnabled()) {
        kLogger.trace() << "onWorkerThreadProgress"
                << threadId
                << int(threadState)
                << trackId
                << analyzerProgress;
    }
    auto& worker = m_workers.at(threadId);
    switch (threadState) {
    case AnalyzerThreadState::Void:
        DEBUG_ASSERT(!trackId.isValid());
        DEBUG_ASSERT(analyzerProgress == kAnalyzerProgressUnknown);
        break;
    case AnalyzerThreadState::Idle:
        DEBUG_ASSERT(!trackId.isValid());
        DEBUG_ASSERT(analyzerProgress == kAnalyzerProgressUnknown);
        worker.onAnalyzerProgress(analyzerProgress);
        submitNextTrack(&worker);
        break;
    case AnalyzerThreadState::Busy:
        DEBUG_ASSERT(trackId.isValid());
        // Ignore delayed signals for tracks that are no longer pending
        if (m_pendingTrackIds.find(trackId) != m_pendingTrackIds.end()) {
            DEBUG_ASSERT(analyzerProgress != kAnalyzerProgressUnknown);
            DEBUG_ASSERT(analyzerProgress < kAnalyzerProgressDone);
            worker.onAnalyzerProgress(analyzerProgress);
            emit trackProgress(trackId, analyzerProgress);
        }
        break;
    case AnalyzerThreadState::Done:
        DEBUG_ASSERT(trackId.isValid());
        // Ignore delayed signals for tracks that are no longer pending
        if (m_pendingTrackIds.find(trackId) != m_pendingTrackIds.end()) {
            DEBUG_ASSERT((analyzerProgress == kAnalyzerProgressDone) // success
                    || (analyzerProgress == kAnalyzerProgressUnknown)); // failure
            m_pendingTrackIds.erase(trackId);
            worker.onAnalyzerProgress(analyzerProgress);
            emit trackProgress(trackId, analyzerProgress);
        }
        break;
    case AnalyzerThreadState::Exit:
        DEBUG_ASSERT(!trackId.isValid());
        DEBUG_ASSERT(analyzerProgress == kAnalyzerProgressUnknown);
        worker.onThreadExit();
        DEBUG_ASSERT(!worker);
        break;
    default:
        DEBUG_ASSERT(!"Unhandled signal from worker thread");
    }
    emitProgressOrFinished();
}

bool TrackAnalysisScheduler::scheduleTrackById(TrackId trackId) {
    VERIFY_OR_DEBUG_ASSERT(trackId.isValid()) {
        qWarning()
                << "Cannot schedule track with invalid id"
                << trackId;
        return false;
    }
    m_queuedTrackIds.push_back(trackId);
    // Don't wake up the suspended thread now to avoid race conditions
    // if multiple threads are added in a row by calling this function
    // multiple times. The caller is responsible to finish the scheduling
    // of multiple tracks with resume().
    return true;
}

int TrackAnalysisScheduler::scheduleTracksById(const QList<TrackId>& trackIds) {
    int scheduledCount = 0;
    for (auto trackId: trackIds) {
        if (scheduleTrackById(std::move(trackId))) {
            ++scheduledCount;
        }
    }
    return scheduledCount;
}

void TrackAnalysisScheduler::suspend() {