summaryrefslogtreecommitdiffstats
path: root/ml
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2023-02-03 21:40:02 +0200
committerGitHub <noreply@github.com>2023-02-03 21:40:02 +0200
commit3c73da09c3944947541ea359fe545d9f3f58a727 (patch)
tree286fc099e21235d3c2cee51402089989fbf5c5f7 /ml
parent828af8182f551cf7239c8e37ad6ff6b7a54d4575 (diff)
Stop training thread from processing training requests once cancelled. (#14423)
Diffstat (limited to 'ml')
-rw-r--r--ml/Host.cc17
-rw-r--r--ml/Queue.h11
2 files changed, 21 insertions, 7 deletions
diff --git a/ml/Host.cc b/ml/Host.cc
index 662b470776..a5f276a807 100644
--- a/ml/Host.cc
+++ b/ml/Host.cc
@@ -223,6 +223,11 @@ void Host::train() {
TrainingRequest TrainingReq = P.first;
size_t Size = P.second;
+ if (ThreadsCancelled) {
+ info("Stopping training thread because it was cancelled.");
+ break;
+ }
+
usec_t AllottedUT = (Cfg.TrainEvery * RH->rrd_update_every * USEC_PER_SEC) / Size;
if (AllottedUT > USEC_PER_SEC)
AllottedUT = USEC_PER_SEC;
@@ -340,11 +345,13 @@ void Host::startAnomalyDetectionThreads() {
char Tag[NETDATA_THREAD_TAG_MAX + 1];
+// #define ML_DISABLE_JOINING
+
snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "MLTR[%s]", rrdhost_hostname(RH));
- netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, train_main, static_cast<void *>(this));
+ netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast<void *>(this));
snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "MLDT[%s]", rrdhost_hostname(RH));
- netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, detect_main, static_cast<void *>(this));
+ netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, detect_main, static_cast<void *>(this));
}
void Host::stopAnomalyDetectionThreads(bool join) {
@@ -362,7 +369,7 @@ void Host::stopAnomalyDetectionThreads(bool join) {
netdata_thread_cancel(DetectionThread);
}
- if(join && !ThreadsJoined) {
+ if (join && !ThreadsJoined) {
ThreadsJoined = true;
ThreadsRunning = false;
@@ -374,7 +381,7 @@ void Host::stopAnomalyDetectionThreads(bool join) {
// to enable again:
// NETDATA_THREAD_OPTION_DEFAULT needs to become NETDATA_THREAD_OPTION_JOINABLE
- //netdata_thread_join(TrainingThread, nullptr);
- //netdata_thread_join(DetectionThread, nullptr);
+ netdata_thread_join(TrainingThread, nullptr);
+ netdata_thread_join(DetectionThread, nullptr);
}
}
diff --git a/ml/Queue.h b/ml/Queue.h
index b78979565d..37a74bd077 100644
--- a/ml/Queue.h
+++ b/ml/Queue.h
@@ -32,8 +32,15 @@ public:
while (Q.empty()) {
pthread_cond_wait(&CV, M.inner());
- if (Exit)
- pthread_exit(nullptr);
+ if (Exit) {
+ // This should happen only when we are destroying a host.
+ // Callers should use a flag dedicated to checking if we
+ // are about to delete the host or exit the agent. The original
+ // implementation would call pthread_exit which would cause
+ // the queue's mutex to be destroyed twice (and fail on the
+ // 2nd time)
+ return { T(), 0 };
+ }
}
T V = Q.front();