diff options
author | vkalintiris <vasilis@netdata.cloud> | 2023-02-03 21:40:02 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-02-03 21:40:02 +0200 |
commit | 3c73da09c3944947541ea359fe545d9f3f58a727 (patch) | |
tree | 286fc099e21235d3c2cee51402089989fbf5c5f7 /ml | |
parent | 828af8182f551cf7239c8e37ad6ff6b7a54d4575 (diff) |
Stop training thread from processing training requests once cancelled. (#14423)
Diffstat (limited to 'ml')
-rw-r--r-- | ml/Host.cc | 17 | ||||
-rw-r--r-- | ml/Queue.h | 11 |
2 files changed, 21 insertions, 7 deletions
diff --git a/ml/Host.cc b/ml/Host.cc index 662b470776..a5f276a807 100644 --- a/ml/Host.cc +++ b/ml/Host.cc @@ -223,6 +223,11 @@ void Host::train() { TrainingRequest TrainingReq = P.first; size_t Size = P.second; + if (ThreadsCancelled) { + info("Stopping training thread because it was cancelled."); + break; + } + usec_t AllottedUT = (Cfg.TrainEvery * RH->rrd_update_every * USEC_PER_SEC) / Size; if (AllottedUT > USEC_PER_SEC) AllottedUT = USEC_PER_SEC; @@ -340,11 +345,13 @@ void Host::startAnomalyDetectionThreads() { char Tag[NETDATA_THREAD_TAG_MAX + 1]; +// #define ML_DISABLE_JOINING + snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "MLTR[%s]", rrdhost_hostname(RH)); - netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, train_main, static_cast<void *>(this)); + netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast<void *>(this)); snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "MLDT[%s]", rrdhost_hostname(RH)); - netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, detect_main, static_cast<void *>(this)); + netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, detect_main, static_cast<void *>(this)); } void Host::stopAnomalyDetectionThreads(bool join) { @@ -362,7 +369,7 @@ void Host::stopAnomalyDetectionThreads(bool join) { netdata_thread_cancel(DetectionThread); } - if(join && !ThreadsJoined) { + if (join && !ThreadsJoined) { ThreadsJoined = true; ThreadsRunning = false; @@ -374,7 +381,7 @@ void Host::stopAnomalyDetectionThreads(bool join) { // to enable again: // NETDATA_THREAD_OPTION_DEFAULT needs to become NETDATA_THREAD_OPTION_JOINABLE - //netdata_thread_join(TrainingThread, nullptr); - //netdata_thread_join(DetectionThread, nullptr); + netdata_thread_join(TrainingThread, nullptr); + netdata_thread_join(DetectionThread, nullptr); } } diff --git a/ml/Queue.h b/ml/Queue.h index b78979565d..37a74bd077 100644 --- a/ml/Queue.h +++ b/ml/Queue.h @@ -32,8 +32,15 @@ public: while (Q.empty()) { pthread_cond_wait(&CV, M.inner()); - if (Exit) - pthread_exit(nullptr); + if (Exit) { + // This should happen only when we are destroying a host. + // Callers should use a flag dedicated to checking if we + // are about to delete the host or exit the agent. The original + // implementation would call pthread_exit which would cause + // the queue's mutex to be destroyed twice (and fail on the + // 2nd time) + return { T(), 0 }; + } } T V = Q.front(); |