summaryrefslogtreecommitdiffstats
path: root/ml
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-11 17:45:25 +0200
committerGitHub <noreply@github.com>2023-01-11 17:45:25 +0200
commitb00b62f0e784b5eb616868c60985292cb3a7239d (patch)
tree68f0475ef1f35ef06d596631ac67aa31dfb919d6 /ml
parentdb0eb4556d435c264210a7e9dbfe69cb016ef456 (diff)
cancel ml threads on shutdown and join them on host free (#14240)
* cancel ml threads on shutdown and join them on host free * mark them not running before joining them
Diffstat (limited to 'ml')
-rw-r--r--ml/Host.cc32
-rw-r--r--ml/Host.h9
-rw-r--r--ml/ml.cc9
-rw-r--r--ml/ml.h1
4 files changed, 36 insertions, 15 deletions
diff --git a/ml/Host.cc b/ml/Host.cc
index 5980f610d8..a94bc2beae 100644
--- a/ml/Host.cc
+++ b/ml/Host.cc
@@ -213,7 +213,7 @@ void Host::train() {
worker_register_job_name(WORKER_JOB_TRAINING_TRAIN, "train");
worker_register_job_name(WORKER_JOB_TRAINING_STATS, "stats");
- service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_stop_anomaly_detection_threads, RH, true);
+ service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true);
while (service_running(SERVICE_ML_TRAINING)) {
auto P = TrainingQueue.pop();
@@ -293,7 +293,7 @@ void Host::detect() {
worker_register_job_name(WORKER_JOB_DETECTION_STATS, "stats");
worker_register_job_name(WORKER_JOB_DETECTION_RESOURCES, "resources");
- service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_stop_anomaly_detection_threads, RH, true);
+ service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true);
heartbeat_t HB;
heartbeat_init(&HB);
@@ -332,29 +332,37 @@ void Host::startAnomalyDetectionThreads() {
}
ThreadsRunning = true;
+ ThreadsCancelled = false;
+ ThreadsJoined = false;
char Tag[NETDATA_THREAD_TAG_MAX + 1];
snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "TRAIN[%s]", rrdhost_hostname(RH));
- netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, train_main, static_cast<void *>(this));
+ netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast<void *>(this));
snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "DETECT[%s]", rrdhost_hostname(RH));
- netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, detect_main, static_cast<void *>(this));
+ netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, detect_main, static_cast<void *>(this));
}
-void Host::stopAnomalyDetectionThreads() {
+void Host::stopAnomalyDetectionThreads(bool join) {
if (!ThreadsRunning) {
error("Anomaly detections threads for host %s have already been stopped.", rrdhost_hostname(RH));
return;
}
- ThreadsRunning = false;
+ if(!ThreadsCancelled) {
+ ThreadsCancelled = true;
- // Signal the training queue to stop popping-items
- TrainingQueue.signal();
- netdata_thread_cancel(TrainingThread);
- // netdata_thread_join(TrainingThread, nullptr);
+ // Signal the training queue to stop popping-items
+ TrainingQueue.signal();
+ netdata_thread_cancel(TrainingThread);
+ netdata_thread_cancel(DetectionThread);
+ }
- netdata_thread_cancel(DetectionThread);
- // netdata_thread_join(DetectionThread, nullptr);
+ if(join && !ThreadsJoined) {
+ ThreadsJoined = true;
+ ThreadsRunning = false;
+ netdata_thread_join(TrainingThread, nullptr);
+ netdata_thread_join(DetectionThread, nullptr);
+ }
}
diff --git a/ml/Host.h b/ml/Host.h
index e68ea4f050..289cb5ab7e 100644
--- a/ml/Host.h
+++ b/ml/Host.h
@@ -26,7 +26,10 @@ public:
MLS(),
TS(),
HostAnomalyRate(0.0),
- ThreadsRunning(false) {}
+ ThreadsRunning(false),
+ ThreadsCancelled(false),
+ ThreadsJoined(false)
+ {}
void addChart(Chart *C);
void removeChart(Chart *C);
@@ -36,7 +39,7 @@ public:
void getDetectionInfoAsJson(nlohmann::json &Json) const;
void startAnomalyDetectionThreads();
- void stopAnomalyDetectionThreads();
+ void stopAnomalyDetectionThreads(bool join);
void scheduleForTraining(TrainingRequest TR);
void train();
@@ -50,6 +53,8 @@ private:
TrainingStats TS;
CalculatedNumber HostAnomalyRate{0.0};
std::atomic<bool> ThreadsRunning;
+ std::atomic<bool> ThreadsCancelled;
+ std::atomic<bool> ThreadsJoined;
Queue<TrainingRequest> TrainingQueue;
diff --git a/ml/ml.cc b/ml/ml.cc
index 2a0df8867c..461c83baae 100644
--- a/ml/ml.cc
+++ b/ml/ml.cc
@@ -156,7 +156,14 @@ void ml_start_anomaly_detection_threads(RRDHOST *RH) {
void ml_stop_anomaly_detection_threads(RRDHOST *RH) {
if (RH && RH->ml_host) {
Host *H = reinterpret_cast<Host *>(RH->ml_host);
- H->stopAnomalyDetectionThreads();
+ H->stopAnomalyDetectionThreads(true);
+ }
+}
+
+void ml_cancel_anomaly_detection_threads(RRDHOST *RH) {
+ if (RH && RH->ml_host) {
+ Host *H = reinterpret_cast<Host *>(RH->ml_host);
+ H->stopAnomalyDetectionThreads(false);
}
}
diff --git a/ml/ml.h b/ml/ml.h
index 790f881841..8bed627f5a 100644
--- a/ml/ml.h
+++ b/ml/ml.h
@@ -31,6 +31,7 @@ void ml_dimension_delete(RRDDIM *RD);
void ml_start_anomaly_detection_threads(RRDHOST *RH);
void ml_stop_anomaly_detection_threads(RRDHOST *RH);
+void ml_cancel_anomaly_detection_threads(RRDHOST *RH);
char *ml_get_host_info(RRDHOST *RH);
char *ml_get_host_runtime_info(RRDHOST *RH);