From b00b62f0e784b5eb616868c60985292cb3a7239d Mon Sep 17 00:00:00 2001 From: Costa Tsaousis Date: Wed, 11 Jan 2023 17:45:25 +0200 Subject: cancel ml threads on shutdown and join them on host free (#14240) * cancel ml threads on shutdown and join them on host free * mark them not running before joining them --- ml/Host.cc | 32 ++++++++++++++++++++------------ ml/Host.h | 9 +++++++-- ml/ml.cc | 9 ++++++++- ml/ml.h | 1 + 4 files changed, 36 insertions(+), 15 deletions(-) diff --git a/ml/Host.cc b/ml/Host.cc index 5980f610d8..a94bc2beae 100644 --- a/ml/Host.cc +++ b/ml/Host.cc @@ -213,7 +213,7 @@ void Host::train() { worker_register_job_name(WORKER_JOB_TRAINING_TRAIN, "train"); worker_register_job_name(WORKER_JOB_TRAINING_STATS, "stats"); - service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_stop_anomaly_detection_threads, RH, true); + service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true); while (service_running(SERVICE_ML_TRAINING)) { auto P = TrainingQueue.pop(); @@ -293,7 +293,7 @@ void Host::detect() { worker_register_job_name(WORKER_JOB_DETECTION_STATS, "stats"); worker_register_job_name(WORKER_JOB_DETECTION_RESOURCES, "resources"); - service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_stop_anomaly_detection_threads, RH, true); + service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true); heartbeat_t HB; heartbeat_init(&HB); @@ -332,29 +332,37 @@ void Host::startAnomalyDetectionThreads() { } ThreadsRunning = true; + ThreadsCancelled = false; + ThreadsJoined = false; char Tag[NETDATA_THREAD_TAG_MAX + 1]; snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "TRAIN[%s]", rrdhost_hostname(RH)); - netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, train_main, static_cast(this)); + netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast(this)); snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "DETECT[%s]", rrdhost_hostname(RH)); - netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, detect_main, static_cast(this)); + netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, detect_main, static_cast(this)); } -void Host::stopAnomalyDetectionThreads() { +void Host::stopAnomalyDetectionThreads(bool join) { if (!ThreadsRunning) { error("Anomaly detections threads for host %s have already been stopped.", rrdhost_hostname(RH)); return; } - ThreadsRunning = false; + if(!ThreadsCancelled) { + ThreadsCancelled = true; - // Signal the training queue to stop popping-items - TrainingQueue.signal(); - netdata_thread_cancel(TrainingThread); - // netdata_thread_join(TrainingThread, nullptr); + // Signal the training queue to stop popping-items + TrainingQueue.signal(); + netdata_thread_cancel(TrainingThread); + netdata_thread_cancel(DetectionThread); + } - netdata_thread_cancel(DetectionThread); - // netdata_thread_join(DetectionThread, nullptr); + if(join && !ThreadsJoined) { + ThreadsJoined = true; + ThreadsRunning = false; + netdata_thread_join(TrainingThread, nullptr); + netdata_thread_join(DetectionThread, nullptr); + } } diff --git a/ml/Host.h b/ml/Host.h index e68ea4f050..289cb5ab7e 100644 --- a/ml/Host.h +++ b/ml/Host.h @@ -26,7 +26,10 @@ public: MLS(), TS(), HostAnomalyRate(0.0), - ThreadsRunning(false) {} + ThreadsRunning(false), + ThreadsCancelled(false), + ThreadsJoined(false) + {} void addChart(Chart *C); void removeChart(Chart *C); @@ -36,7 +39,7 @@ public: void getDetectionInfoAsJson(nlohmann::json &Json) const; void startAnomalyDetectionThreads(); - void stopAnomalyDetectionThreads(); + void stopAnomalyDetectionThreads(bool join); void scheduleForTraining(TrainingRequest TR); void train(); @@ -50,6 +53,8 @@ private: TrainingStats TS; CalculatedNumber HostAnomalyRate{0.0}; std::atomic ThreadsRunning; + std::atomic ThreadsCancelled; + std::atomic ThreadsJoined; Queue TrainingQueue; diff --git a/ml/ml.cc b/ml/ml.cc index 2a0df8867c..461c83baae 100644 --- a/ml/ml.cc +++ b/ml/ml.cc @@ -156,7 +156,14 @@ void ml_start_anomaly_detection_threads(RRDHOST *RH) { void ml_stop_anomaly_detection_threads(RRDHOST *RH) { if (RH && RH->ml_host) { Host *H = reinterpret_cast(RH->ml_host); - H->stopAnomalyDetectionThreads(); + H->stopAnomalyDetectionThreads(true); + } +} + +void ml_cancel_anomaly_detection_threads(RRDHOST *RH) { + if (RH && RH->ml_host) { + Host *H = reinterpret_cast(RH->ml_host); + H->stopAnomalyDetectionThreads(false); } } diff --git a/ml/ml.h b/ml/ml.h index 790f881841..8bed627f5a 100644 --- a/ml/ml.h +++ b/ml/ml.h @@ -31,6 +31,7 @@ void ml_dimension_delete(RRDDIM *RD); void ml_start_anomaly_detection_threads(RRDHOST *RH); void ml_stop_anomaly_detection_threads(RRDHOST *RH); +void ml_cancel_anomaly_detection_threads(RRDHOST *RH); char *ml_get_host_info(RRDHOST *RH); char *ml_get_host_runtime_info(RRDHOST *RH); -- cgit v1.2.3