summaryrefslogtreecommitdiffstats
path: root/ml/Host.cc
diff options
context:
space:
mode:
Diffstat (limited to 'ml/Host.cc')
-rw-r--r--ml/Host.cc32
1 files changed, 20 insertions, 12 deletions
diff --git a/ml/Host.cc b/ml/Host.cc
index 5980f610d8..a94bc2beae 100644
--- a/ml/Host.cc
+++ b/ml/Host.cc
@@ -213,7 +213,7 @@ void Host::train() {
worker_register_job_name(WORKER_JOB_TRAINING_TRAIN, "train");
worker_register_job_name(WORKER_JOB_TRAINING_STATS, "stats");
- service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_stop_anomaly_detection_threads, RH, true);
+ service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true);
while (service_running(SERVICE_ML_TRAINING)) {
auto P = TrainingQueue.pop();
@@ -293,7 +293,7 @@ void Host::detect() {
worker_register_job_name(WORKER_JOB_DETECTION_STATS, "stats");
worker_register_job_name(WORKER_JOB_DETECTION_RESOURCES, "resources");
- service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_stop_anomaly_detection_threads, RH, true);
+ service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true);
heartbeat_t HB;
heartbeat_init(&HB);
@@ -332,29 +332,37 @@ void Host::startAnomalyDetectionThreads() {
}
ThreadsRunning = true;
+ ThreadsCancelled = false;
+ ThreadsJoined = false;
char Tag[NETDATA_THREAD_TAG_MAX + 1];
snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "TRAIN[%s]", rrdhost_hostname(RH));
- netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, train_main, static_cast<void *>(this));
+ netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast<void *>(this));
snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "DETECT[%s]", rrdhost_hostname(RH));
- netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, detect_main, static_cast<void *>(this));
+ netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, detect_main, static_cast<void *>(this));
}
-void Host::stopAnomalyDetectionThreads() {
+void Host::stopAnomalyDetectionThreads(bool join) {
if (!ThreadsRunning) {
error("Anomaly detections threads for host %s have already been stopped.", rrdhost_hostname(RH));
return;
}
- ThreadsRunning = false;
+ if(!ThreadsCancelled) {
+ ThreadsCancelled = true;
- // Signal the training queue to stop popping-items
- TrainingQueue.signal();
- netdata_thread_cancel(TrainingThread);
- // netdata_thread_join(TrainingThread, nullptr);
+ // Signal the training queue to stop popping-items
+ TrainingQueue.signal();
+ netdata_thread_cancel(TrainingThread);
+ netdata_thread_cancel(DetectionThread);
+ }
- netdata_thread_cancel(DetectionThread);
- // netdata_thread_join(DetectionThread, nullptr);
+ if(join && !ThreadsJoined) {
+ ThreadsJoined = true;
+ ThreadsRunning = false;
+ netdata_thread_join(TrainingThread, nullptr);
+ netdata_thread_join(DetectionThread, nullptr);
+ }
}