diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-01-11 17:45:25 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-11 17:45:25 +0200 |
commit | b00b62f0e784b5eb616868c60985292cb3a7239d (patch) | |
tree | 68f0475ef1f35ef06d596631ac67aa31dfb919d6 | |
parent | db0eb4556d435c264210a7e9dbfe69cb016ef456 (diff) |
cancel ml threads on shutdown and join them on host free (#14240)
* cancel ml threads on shutdown and join them on host free
* mark them not running before joining them
-rw-r--r-- | ml/Host.cc | 32 | ||||
-rw-r--r-- | ml/Host.h | 9 | ||||
-rw-r--r-- | ml/ml.cc | 9 | ||||
-rw-r--r-- | ml/ml.h | 1 |
4 files changed, 36 insertions, 15 deletions
diff --git a/ml/Host.cc b/ml/Host.cc index 5980f610d8..a94bc2beae 100644 --- a/ml/Host.cc +++ b/ml/Host.cc @@ -213,7 +213,7 @@ void Host::train() { worker_register_job_name(WORKER_JOB_TRAINING_TRAIN, "train"); worker_register_job_name(WORKER_JOB_TRAINING_STATS, "stats"); - service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_stop_anomaly_detection_threads, RH, true); + service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true); while (service_running(SERVICE_ML_TRAINING)) { auto P = TrainingQueue.pop(); @@ -293,7 +293,7 @@ void Host::detect() { worker_register_job_name(WORKER_JOB_DETECTION_STATS, "stats"); worker_register_job_name(WORKER_JOB_DETECTION_RESOURCES, "resources"); - service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_stop_anomaly_detection_threads, RH, true); + service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true); heartbeat_t HB; heartbeat_init(&HB); @@ -332,29 +332,37 @@ void Host::startAnomalyDetectionThreads() { } ThreadsRunning = true; + ThreadsCancelled = false; + ThreadsJoined = false; char Tag[NETDATA_THREAD_TAG_MAX + 1]; snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "TRAIN[%s]", rrdhost_hostname(RH)); - netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, train_main, static_cast<void *>(this)); + netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast<void *>(this)); snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "DETECT[%s]", rrdhost_hostname(RH)); - netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, detect_main, static_cast<void *>(this)); + netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, detect_main, static_cast<void *>(this)); } -void Host::stopAnomalyDetectionThreads() { +void Host::stopAnomalyDetectionThreads(bool join) { if (!ThreadsRunning) { error("Anomaly detections threads for host %s have already been stopped.", rrdhost_hostname(RH)); return; } - ThreadsRunning = false; + if(!ThreadsCancelled) { + ThreadsCancelled = true; - // Signal the training queue to stop popping-items - TrainingQueue.signal(); - netdata_thread_cancel(TrainingThread); - // netdata_thread_join(TrainingThread, nullptr); + // Signal the training queue to stop popping-items + TrainingQueue.signal(); + netdata_thread_cancel(TrainingThread); + netdata_thread_cancel(DetectionThread); + } - netdata_thread_cancel(DetectionThread); - // netdata_thread_join(DetectionThread, nullptr); + if(join && !ThreadsJoined) { + ThreadsJoined = true; + ThreadsRunning = false; + netdata_thread_join(TrainingThread, nullptr); + netdata_thread_join(DetectionThread, nullptr); + } } @@ -26,7 +26,10 @@ public: MLS(), TS(), HostAnomalyRate(0.0), - ThreadsRunning(false) {} + ThreadsRunning(false), + ThreadsCancelled(false), + ThreadsJoined(false) + {} void addChart(Chart *C); void removeChart(Chart *C); @@ -36,7 +39,7 @@ public: void getDetectionInfoAsJson(nlohmann::json &Json) const; void startAnomalyDetectionThreads(); - void stopAnomalyDetectionThreads(); + void stopAnomalyDetectionThreads(bool join); void scheduleForTraining(TrainingRequest TR); void train(); @@ -50,6 +53,8 @@ private: TrainingStats TS; CalculatedNumber HostAnomalyRate{0.0}; std::atomic<bool> ThreadsRunning; + std::atomic<bool> ThreadsCancelled; + std::atomic<bool> ThreadsJoined; Queue<TrainingRequest> TrainingQueue; @@ -156,7 +156,14 @@ void ml_start_anomaly_detection_threads(RRDHOST *RH) { void ml_stop_anomaly_detection_threads(RRDHOST *RH) { if (RH && RH->ml_host) { Host *H = reinterpret_cast<Host *>(RH->ml_host); - H->stopAnomalyDetectionThreads(); + H->stopAnomalyDetectionThreads(true); + } +} + +void ml_cancel_anomaly_detection_threads(RRDHOST *RH) { + if (RH && RH->ml_host) { + Host *H = reinterpret_cast<Host *>(RH->ml_host); + H->stopAnomalyDetectionThreads(false); } } @@ -31,6 +31,7 @@ void ml_dimension_delete(RRDDIM *RD); void ml_start_anomaly_detection_threads(RRDHOST *RH); void ml_stop_anomaly_detection_threads(RRDHOST *RH); +void ml_cancel_anomaly_detection_threads(RRDHOST *RH); char *ml_get_host_info(RRDHOST *RH); char *ml_get_host_runtime_info(RRDHOST *RH); |