From 5321ca8d1ef8d974a6a2b2128ca8804de6acb693 Mon Sep 17 00:00:00 2001 From: vkalintiris Date: Tue, 21 Mar 2023 18:31:56 +0200 Subject: Revert "Use static thread-pool for training. (#14702)" (#14782) This reverts commit 5046e034212c008557dd014196b6f6204eda24b2. Will re-apply once we investigate an issue that occurs during the shutdown of the agent. --- ml/Config.cc | 12 +- ml/ad_charts.cc | 167 +++++++---------- ml/ad_charts.h | 2 +- ml/ml-dummy.c | 6 - ml/ml-private.h | 36 ++-- ml/ml.cc | 546 ++++++++++++++++++++++++++++---------------------------- ml/ml.h | 8 +- 7 files changed, 358 insertions(+), 419 deletions(-) (limited to 'ml') diff --git a/ml/Config.cc b/ml/Config.cc index 415d11b838..8b04590d77 100644 --- a/ml/Config.cc +++ b/ml/Config.cc @@ -34,7 +34,7 @@ void ml_config_load(ml_config_t *cfg) { unsigned smooth_n = config_get_number(config_section_ml, "num samples to smooth", 3); unsigned lag_n = config_get_number(config_section_ml, "num samples to lag", 5); - double random_sampling_ratio = config_get_float(config_section_ml, "random sampling ratio", 1.0 / 5.0 /* default lag_n */); + double random_sampling_ratio = config_get_float(config_section_ml, "random sampling ratio", 1.0 / lag_n); unsigned max_kmeans_iters = config_get_number(config_section_ml, "maximum number of k-means iterations", 1000); double dimension_anomaly_rate_threshold = config_get_float(config_section_ml, "dimension anomaly score threshold", 0.99); @@ -43,10 +43,6 @@ void ml_config_load(ml_config_t *cfg) { std::string anomaly_detection_grouping_method = config_get(config_section_ml, "anomaly detection grouping method", "average"); time_t anomaly_detection_query_duration = config_get_number(config_section_ml, "anomaly detection grouping duration", 5 * 60); - size_t num_training_threads = config_get_number(config_section_ml, "num training threads", 4); - - bool enable_statistics_charts = config_get_boolean(config_section_ml, "enable statistics charts", false); - /* * Clamp */ @@ -68,8 +64,6 @@ void ml_config_load(ml_config_t *cfg) { host_anomaly_rate_threshold = clamp(host_anomaly_rate_threshold, 0.1, 10.0); anomaly_detection_query_duration = clamp(anomaly_detection_query_duration, 60, 15 * 60); - num_training_threads = clamp(num_training_threads, 1, 128); - /* * Validate */ @@ -115,8 +109,4 @@ void ml_config_load(ml_config_t *cfg) { cfg->sp_charts_to_skip = simple_pattern_create(cfg->charts_to_skip.c_str(), NULL, SIMPLE_PATTERN_EXACT, true); cfg->stream_anomaly_detection_charts = config_get_boolean(config_section_ml, "stream anomaly detection charts", true); - - cfg->num_training_threads = num_training_threads; - - cfg->enable_statistics_charts = enable_statistics_charts; } diff --git a/ml/ad_charts.cc b/ml/ad_charts.cc index 086cd5aa02..a32ff6c650 100644 --- a/ml/ad_charts.cc +++ b/ml/ad_charts.cc @@ -6,7 +6,7 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats /* * Machine learning status */ - if (Cfg.enable_statistics_charts) { + { if (!host->machine_learning_status_rs) { char id_buf[1024]; char name_buf[1024]; @@ -48,7 +48,7 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats /* * Metric type */ - if (Cfg.enable_statistics_charts) { + { if (!host->metric_type_rs) { char id_buf[1024]; char name_buf[1024]; @@ -90,7 +90,7 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats /* * Training status */ - if (Cfg.enable_statistics_charts) { + { if (!host->training_status_rs) { char id_buf[1024]; char name_buf[1024]; @@ -179,6 +179,7 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats rrdset_done(host->dimensions_rs); } + } void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number AnomalyRate) { @@ -300,20 +301,20 @@ void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number } } -void ml_update_training_statistics_chart(ml_training_thread_t *training_thread, const ml_training_stats_t &ts) { +void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stats_t &ts) { /* * queue stats */ { - if (!training_thread->queue_stats_rs) { + if (!host->queue_stats_rs) { char id_buf[1024]; char name_buf[1024]; - snprintfz(id_buf, 1024, "training_queue_%zu_stats", training_thread->id); - snprintfz(name_buf, 1024, "training_queue_%zu_stats", training_thread->id); + snprintfz(id_buf, 1024, "queue_stats_on_%s", localhost->machine_guid); + snprintfz(name_buf, 1024, "queue_stats_on_%s", rrdhost_hostname(localhost)); - training_thread->queue_stats_rs = rrdset_create( - localhost, + host->queue_stats_rs = rrdset_create( + host->rh, "netdata", // type id_buf, // id name_buf, // name @@ -327,35 +328,35 @@ void ml_update_training_statistics_chart(ml_training_thread_t *training_thread, localhost->rrd_update_every, // update_every RRDSET_TYPE_LINE// chart_type ); - rrdset_flag_set(training_thread->queue_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION); + rrdset_flag_set(host->queue_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION); - training_thread->queue_stats_queue_size_rd = - rrddim_add(training_thread->queue_stats_rs, "queue_size", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - training_thread->queue_stats_popped_items_rd = - rrddim_add(training_thread->queue_stats_rs, "popped_items", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + host->queue_stats_queue_size_rd = + rrddim_add(host->queue_stats_rs, "queue_size", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + host->queue_stats_popped_items_rd = + rrddim_add(host->queue_stats_rs, "popped_items", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); } - rrddim_set_by_pointer(training_thread->queue_stats_rs, - training_thread->queue_stats_queue_size_rd, ts.queue_size); - rrddim_set_by_pointer(training_thread->queue_stats_rs, - training_thread->queue_stats_popped_items_rd, ts.num_popped_items); + rrddim_set_by_pointer(host->queue_stats_rs, + host->queue_stats_queue_size_rd, ts.queue_size); + rrddim_set_by_pointer(host->queue_stats_rs, + host->queue_stats_popped_items_rd, ts.num_popped_items); - rrdset_done(training_thread->queue_stats_rs); + rrdset_done(host->queue_stats_rs); } /* * training stats */ { - if (!training_thread->training_time_stats_rs) { + if (!host->training_time_stats_rs) { char id_buf[1024]; char name_buf[1024]; - snprintfz(id_buf, 1024, "training_queue_%zu_time_stats", training_thread->id); - snprintfz(name_buf, 1024, "training_queue_%zu_time_stats", training_thread->id); + snprintfz(id_buf, 1024, "training_time_stats_on_%s", localhost->machine_guid); + snprintfz(name_buf, 1024, "training_time_stats_on_%s", rrdhost_hostname(localhost)); - training_thread->training_time_stats_rs = rrdset_create( - localhost, + host->training_time_stats_rs = rrdset_create( + host->rh, "netdata", // type id_buf, // id name_buf, // name @@ -369,39 +370,39 @@ void ml_update_training_statistics_chart(ml_training_thread_t *training_thread, localhost->rrd_update_every, // update_every RRDSET_TYPE_LINE// chart_type ); - rrdset_flag_set(training_thread->training_time_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION); - - training_thread->training_time_stats_allotted_rd = - rrddim_add(training_thread->training_time_stats_rs, "allotted", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); - training_thread->training_time_stats_consumed_rd = - rrddim_add(training_thread->training_time_stats_rs, "consumed", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); - training_thread->training_time_stats_remaining_rd = - rrddim_add(training_thread->training_time_stats_rs, "remaining", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); + rrdset_flag_set(host->training_time_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION); + + host->training_time_stats_allotted_rd = + rrddim_add(host->training_time_stats_rs, "allotted", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); + host->training_time_stats_consumed_rd = + rrddim_add(host->training_time_stats_rs, "consumed", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); + host->training_time_stats_remaining_rd = + rrddim_add(host->training_time_stats_rs, "remaining", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); } - rrddim_set_by_pointer(training_thread->training_time_stats_rs, - training_thread->training_time_stats_allotted_rd, ts.allotted_ut); - rrddim_set_by_pointer(training_thread->training_time_stats_rs, - training_thread->training_time_stats_consumed_rd, ts.consumed_ut); - rrddim_set_by_pointer(training_thread->training_time_stats_rs, - training_thread->training_time_stats_remaining_rd, ts.remaining_ut); + rrddim_set_by_pointer(host->training_time_stats_rs, + host->training_time_stats_allotted_rd, ts.allotted_ut); + rrddim_set_by_pointer(host->training_time_stats_rs, + host->training_time_stats_consumed_rd, ts.consumed_ut); + rrddim_set_by_pointer(host->training_time_stats_rs, + host->training_time_stats_remaining_rd, ts.remaining_ut); - rrdset_done(training_thread->training_time_stats_rs); + rrdset_done(host->training_time_stats_rs); } /* * training result stats */ { - if (!training_thread->training_results_rs) { + if (!host->training_results_rs) { char id_buf[1024]; char name_buf[1024]; - snprintfz(id_buf, 1024, "training_queue_%zu_results", training_thread->id); - snprintfz(name_buf, 1024, "training_queue_%zu_results", training_thread->id); + snprintfz(id_buf, 1024, "training_results_on_%s", localhost->machine_guid); + snprintfz(name_buf, 1024, "training_results_on_%s", rrdhost_hostname(localhost)); - training_thread->training_results_rs = rrdset_create( - localhost, + host->training_results_rs = rrdset_create( + host->rh, "netdata", // type id_buf, // id name_buf, // name @@ -415,61 +416,31 @@ void ml_update_training_statistics_chart(ml_training_thread_t *training_thread, localhost->rrd_update_every, // update_every RRDSET_TYPE_LINE// chart_type ); - rrdset_flag_set(training_thread->training_results_rs, RRDSET_FLAG_ANOMALY_DETECTION); - - training_thread->training_results_ok_rd = - rrddim_add(training_thread->training_results_rs, "ok", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - training_thread->training_results_invalid_query_time_range_rd = - rrddim_add(training_thread->training_results_rs, "invalid-queries", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - training_thread->training_results_not_enough_collected_values_rd = - rrddim_add(training_thread->training_results_rs, "not-enough-values", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - training_thread->training_results_null_acquired_dimension_rd = - rrddim_add(training_thread->training_results_rs, "null-acquired-dimensions", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - training_thread->training_results_chart_under_replication_rd = - rrddim_add(training_thread->training_results_rs, "chart-under-replication", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rrdset_flag_set(host->training_results_rs, RRDSET_FLAG_ANOMALY_DETECTION); + + host->training_results_ok_rd = + rrddim_add(host->training_results_rs, "ok", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + host->training_results_invalid_query_time_range_rd = + rrddim_add(host->training_results_rs, "invalid-queries", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + host->training_results_not_enough_collected_values_rd = + rrddim_add(host->training_results_rs, "not-enough-values", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + host->training_results_null_acquired_dimension_rd = + rrddim_add(host->training_results_rs, "null-acquired-dimensions", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + host->training_results_chart_under_replication_rd = + rrddim_add(host->training_results_rs, "chart-under-replication", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); } - rrddim_set_by_pointer(training_thread->training_results_rs, - training_thread->training_results_ok_rd, ts.training_result_ok); - rrddim_set_by_pointer(training_thread->training_results_rs, - training_thread->training_results_invalid_query_time_range_rd, ts.training_result_invalid_query_time_range); - rrddim_set_by_pointer(training_thread->training_results_rs, - training_thread->training_results_not_enough_collected_values_rd, ts.training_result_not_enough_collected_values); - rrddim_set_by_pointer(training_thread->training_results_rs, - training_thread->training_results_null_acquired_dimension_rd, ts.training_result_null_acquired_dimension); - rrddim_set_by_pointer(training_thread->training_results_rs, - training_thread->training_results_chart_under_replication_rd, ts.training_result_chart_under_replication); - - rrdset_done(training_thread->training_results_rs); - } -} - -void ml_update_global_statistics_charts(uint64_t models_consulted) { - if (Cfg.enable_statistics_charts) { - static RRDSET *st = NULL; - static RRDDIM *rd = NULL; - - if (unlikely(!st)) { - st = rrdset_create_localhost( - "netdata" // type - , "ml_models_consulted" // id - , NULL // name - , NETDATA_ML_CHART_FAMILY // family - , NULL // context - , "KMeans models used for prediction" // title - , "models" // units - , NETDATA_ML_PLUGIN // plugin - , NETDATA_ML_MODULE_DETECTION // module - , NETDATA_ML_CHART_PRIO_MACHINE_LEARNING_STATUS // priority - , localhost->rrd_update_every // update_every - , RRDSET_TYPE_AREA // chart_type - ); - - rd = rrddim_add(st, "num_models_consulted", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); - } - - rrddim_set_by_pointer(st, rd, (collected_number) models_consulted); - - rrdset_done(st); + rrddim_set_by_pointer(host->training_results_rs, + host->training_results_ok_rd, ts.training_result_ok); + rrddim_set_by_pointer(host->training_results_rs, + host->training_results_invalid_query_time_range_rd, ts.training_result_invalid_query_time_range); + rrddim_set_by_pointer(host->training_results_rs, + host->training_results_not_enough_collected_values_rd, ts.training_result_not_enough_collected_values); + rrddim_set_by_pointer(host->training_results_rs, + host->training_results_null_acquired_dimension_rd, ts.training_result_null_acquired_dimension); + rrddim_set_by_pointer(host->training_results_rs, + host->training_results_chart_under_replication_rd, ts.training_result_chart_under_replication); + + rrdset_done(host->training_results_rs); } } diff --git a/ml/ad_charts.h b/ml/ad_charts.h index 349b369a24..a973b44a51 100644 --- a/ml/ad_charts.h +++ b/ml/ad_charts.h @@ -9,6 +9,6 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number anomaly_rate); -void ml_update_training_statistics_chart(ml_training_thread_t *training_thread, const ml_training_stats_t &ts); +void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stats_t &ts); #endif /* ML_ADCHARTS_H */ diff --git a/ml/ml-dummy.c b/ml/ml-dummy.c index 8db252a841..53444e246f 100644 --- a/ml/ml-dummy.c +++ b/ml/ml-dummy.c @@ -19,8 +19,6 @@ bool ml_streaming_enabled() { void ml_init(void) {} -void ml_fini(void) {} - void ml_host_new(RRDHOST *rh) { UNUSED(rh); } @@ -88,8 +86,4 @@ bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool return false; } -void ml_update_global_statistics_charts(uint64_t models_consulted) { - UNUSED(models_consulted); -} - #endif diff --git a/ml/ml-private.h b/ml/ml-private.h index 8535c9262d..173b82e265 100644 --- a/ml/ml-private.h +++ b/ml/ml-private.h @@ -33,7 +33,6 @@ typedef struct { /* * KMeans */ - typedef struct { size_t num_clusters; size_t max_iterations; @@ -124,7 +123,6 @@ enum ml_training_result { typedef struct { // Chart/dimension we want to train - STRING *host_id; STRING *chart_id; STRING *dimension_id; @@ -170,7 +168,6 @@ typedef struct { /* * Queue */ - typedef struct { std::queue internal; netdata_mutex_t mutex; @@ -178,6 +175,7 @@ typedef struct { std::atomic exit; } ml_queue_t; + typedef struct { RRDDIM *rd; @@ -209,13 +207,20 @@ typedef struct { RRDHOST *rh; ml_machine_learning_stats_t mls; + ml_training_stats_t ts; calculated_number_t host_anomaly_rate; - netdata_mutex_t mutex; + std::atomic threads_running; + std::atomic threads_cancelled; + std::atomic threads_joined; ml_queue_t *training_queue; + netdata_mutex_t mutex; + + netdata_thread_t training_thread; + /* * bookkeeping for anomaly detection charts */ @@ -244,19 +249,6 @@ typedef struct { RRDSET *detector_events_rs; RRDDIM *detector_events_above_threshold_rd; RRDDIM *detector_events_new_anomaly_event_rd; -} ml_host_t; - -typedef struct { - size_t id; - netdata_thread_t nd_thread; - netdata_mutex_t nd_mutex; - - ml_queue_t *training_queue; - ml_training_stats_t training_stats; - - calculated_number_t *training_cns; - calculated_number_t *scratch_training_cns; - std::vector training_samples; RRDSET *queue_stats_rs; RRDDIM *queue_stats_queue_size_rd; @@ -273,7 +265,7 @@ typedef struct { RRDDIM *training_results_not_enough_collected_values_rd; RRDDIM *training_results_null_acquired_dimension_rd; RRDDIM *training_results_chart_under_replication_rd; -} ml_training_thread_t; +} ml_host_t; typedef struct { bool enable_anomaly_detection; @@ -310,14 +302,6 @@ typedef struct { std::vector random_nums; netdata_thread_t detection_thread; - std::atomic detection_stop; - - size_t num_training_threads; - - std::vector training_threads; - std::atomic training_stop; - - bool enable_statistics_charts; } ml_config_t; void ml_config_load(ml_config_t *cfg); diff --git a/ml/ml.cc b/ml/ml.cc index c7d4671c04..cf9ea379a6 100644 --- a/ml/ml.cc +++ b/ml/ml.cc @@ -8,13 +8,14 @@ #include "ad_charts.h" -#define WORKER_TRAIN_QUEUE_POP 0 -#define WORKER_TRAIN_ACQUIRE_DIMENSION 1 -#define WORKER_TRAIN_QUERY 2 -#define WORKER_TRAIN_KMEANS 3 -#define WORKER_TRAIN_UPDATE_MODELS 4 -#define WORKER_TRAIN_RELEASE_DIMENSION 5 -#define WORKER_TRAIN_UPDATE_HOST 6 +typedef struct { + calculated_number_t *training_cns; + calculated_number_t *scratch_training_cns; + + std::vector training_samples; +} ml_tls_data_t; + +static thread_local ml_tls_data_t tls_data; /* * Functions to convert enums to strings @@ -263,14 +264,7 @@ ml_queue_pop(ml_queue_t *q) { netdata_mutex_lock(&q->mutex); - ml_training_request_t req = { - NULL, // host_id - NULL, // chart id - NULL, // dimension id - 0, // current time - 0, // first entry - 0 // last entry - }; + ml_training_request_t req = { NULL, NULL, 0, 0, 0 }; while (q->internal.empty()) { pthread_cond_wait(&q->cond_var, &q->mutex); @@ -313,7 +307,7 @@ ml_queue_signal(ml_queue_t *q) */ static std::pair -ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request) +ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t &training_request) { ml_training_response_t training_response = {}; @@ -357,7 +351,7 @@ ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimens STORAGE_PRIORITY_BEST_EFFORT); size_t idx = 0; - memset(training_thread->training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1)); + memset(tls_data.training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1)); calculated_number_t last_value = std::numeric_limits::quiet_NaN(); while (!ops->is_finished(&handle)) { @@ -374,11 +368,11 @@ ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimens training_response.db_after_t = timestamp; training_response.db_before_t = timestamp; - training_thread->training_cns[idx] = value; - last_value = training_thread->training_cns[idx]; + tls_data.training_cns[idx] = value; + last_value = tls_data.training_cns[idx]; training_response.collected_values++; } else - training_thread->training_cns[idx] = last_value; + tls_data.training_cns[idx] = last_value; idx++; } @@ -393,21 +387,20 @@ ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimens } // Find first non-NaN value. - for (idx = 0; std::isnan(training_thread->training_cns[idx]); idx++, training_response.total_values--) { } + for (idx = 0; std::isnan(tls_data.training_cns[idx]); idx++, training_response.total_values--) { } // Overwrite NaN values. if (idx != 0) - memmove(training_thread->training_cns, &training_thread->training_cns[idx], sizeof(calculated_number_t) * training_response.total_values); + memmove(tls_data.training_cns, &tls_data.training_cns[idx], sizeof(calculated_number_t) * training_response.total_values); training_response.result = TRAINING_RESULT_OK; - return { training_thread->training_cns, training_response }; + return { tls_data.training_cns, training_response }; } static enum ml_training_result -ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request) +ml_dimension_train_model(ml_dimension_t *dim, const ml_training_request_t &training_request) { - worker_is_busy(WORKER_TRAIN_QUERY); - auto P = ml_dimension_calculated_numbers(training_thread, dim, training_request); + auto P = ml_dimension_calculated_numbers(dim, training_request); ml_training_response_t training_response = P.second; if (training_response.result != TRAINING_RESULT_OK) { @@ -436,16 +429,15 @@ ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t * } // compute kmeans - worker_is_busy(WORKER_TRAIN_KMEANS); { - memcpy(training_thread->scratch_training_cns, training_thread->training_cns, + memcpy(tls_data.scratch_training_cns, tls_data.training_cns, training_response.total_values * sizeof(calculated_number_t)); ml_features_t features = { Cfg.diff_n, Cfg.smooth_n, Cfg.lag_n, - training_thread->scratch_training_cns, training_response.total_values, - training_thread->training_cns, training_response.total_values, - training_thread->training_samples + tls_data.scratch_training_cns, training_response.total_values, + tls_data.training_cns, training_response.total_values, + tls_data.training_samples }; ml_features_preprocess(&features); @@ -454,7 +446,6 @@ ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t * } // update kmeans models - worker_is_busy(WORKER_TRAIN_UPDATE_MODELS); { netdata_mutex_lock(&dim->mutex); @@ -506,16 +497,11 @@ ml_dimension_schedule_for_training(ml_dimension_t *dim, time_t curr_time) } if (schedule_for_training) { + ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host; ml_training_request_t req = { - string_dup(dim->rd->rrdset->rrdhost->hostname), - string_dup(dim->rd->rrdset->id), - string_dup(dim->rd->id), - curr_time, - rrddim_first_entry_s(dim->rd), - rrddim_last_entry_s(dim->rd), + string_dup(dim->rd->rrdset->id), string_dup(dim->rd->id), + curr_time, rrddim_first_entry_s(dim->rd), rrddim_last_entry_s(dim->rd), }; - - ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host; ml_queue_push(host->training_queue, req); } } @@ -691,6 +677,7 @@ ml_host_detect_once(ml_host_t *host) host->mls = {}; ml_machine_learning_stats_t mls_copy = {}; + ml_training_stats_t ts_copy = {}; { netdata_mutex_lock(&host->mutex); @@ -734,14 +721,54 @@ ml_host_detect_once(ml_host_t *host) mls_copy = host->mls; + /* + * training stats + */ + ts_copy = host->ts; + + host->ts.queue_size = 0; + host->ts.num_popped_items = 0; + + host->ts.allotted_ut = 0; + host->ts.consumed_ut = 0; + host->ts.remaining_ut = 0; + + host->ts.training_result_ok = 0; + host->ts.training_result_invalid_query_time_range = 0; + host->ts.training_result_not_enough_collected_values = 0; + host->ts.training_result_null_acquired_dimension = 0; + host->ts.training_result_chart_under_replication = 0; + netdata_mutex_unlock(&host->mutex); } + // Calc the avg values + if (ts_copy.num_popped_items) { + ts_copy.queue_size /= ts_copy.num_popped_items; + ts_copy.allotted_ut /= ts_copy.num_popped_items; + ts_copy.consumed_ut /= ts_copy.num_popped_items; + ts_copy.remaining_ut /= ts_copy.num_popped_items; + + ts_copy.training_result_ok /= ts_copy.num_popped_items; + ts_copy.training_result_invalid_query_time_range /= ts_copy.num_popped_items; + ts_copy.training_result_not_enough_collected_values /= ts_copy.num_popped_items; + ts_copy.training_result_null_acquired_dimension /= ts_copy.num_popped_items; + ts_copy.training_result_chart_under_replication /= ts_copy.num_popped_items; + } else { + ts_copy.queue_size = 0; + ts_copy.allotted_ut = 0; + ts_copy.consumed_ut = 0; + ts_copy.remaining_ut = 0; + } + worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART); ml_update_dimensions_chart(host, mls_copy); worker_is_busy(WORKER_JOB_DETECTION_HOST_CHART); ml_update_host_and_detection_rate_charts(host, host->host_anomaly_rate * 10000.0); + + worker_is_busy(WORKER_JOB_DETECTION_STATS); + ml_update_training_statistics_chart(host, ts_copy); } typedef struct { @@ -750,21 +777,18 @@ typedef struct { } ml_acquired_dimension_t; static ml_acquired_dimension_t -ml_acquired_dimension_get(STRING *host_id, STRING *chart_id, STRING *dimension_id) +ml_acquired_dimension_get(RRDHOST *rh, STRING *chart_id, STRING *dimension_id) { RRDDIM_ACQUIRED *acq_rd = NULL; ml_dimension_t *dim = NULL; - RRDHOST *rh = rrdhost_find_by_hostname(string2str(host_id)); - if (rh) { - RRDSET *rs = rrdset_find(rh, string2str(chart_id)); - if (rs) { - acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id)); - if (acq_rd) { - RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd); - if (rd) - dim = (ml_dimension_t *) rd->ml_dimension; - } + RRDSET *rs = rrdset_find(rh, string2str(chart_id)); + if (rs) { + acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id)); + if (acq_rd) { + RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd); + if (rd) + dim = (ml_dimension_t *) rd->ml_dimension; } } @@ -785,12 +809,110 @@ ml_acquired_dimension_release(ml_acquired_dimension_t acq_dim) } static enum ml_training_result -ml_acquired_dimension_train(ml_training_thread_t *training_thread, ml_acquired_dimension_t acq_dim, const ml_training_request_t &tr) +ml_acquired_dimension_train(ml_acquired_dimension_t acq_dim, const ml_training_request_t &TR) { if (!acq_dim.dim) return TRAINING_RESULT_NULL_ACQUIRED_DIMENSION; - return ml_dimension_train_model(training_thread, acq_dim.dim, tr); + return ml_dimension_train_model(acq_dim.dim, TR); +} + +#define WORKER_JOB_TRAINING_FIND 0 +#define WORKER_JOB_TRAINING_TRAIN 1 +#define WORKER_JOB_TRAINING_STATS 2 + +static void +ml_host_train(ml_host_t *host) +{ + worker_register("MLTRAIN"); + worker_register_job_name(WORKER_JOB_TRAINING_FIND, "find"); + worker_register_job_name(WORKER_JOB_TRAINING_TRAIN, "train"); + worker_register_job_name(WORKER_JOB_TRAINING_STATS, "stats"); + + service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t ) ml_host_cancel_training_thread, host->rh, true); + + while (service_running(SERVICE_ML_TRAINING)) { + ml_training_request_t training_req = ml_queue_pop(host->training_queue); + size_t queue_size = ml_queue_size(host->training_queue) + 1; + + if (host->threads_cancelled) { + info("Stopping training thread for host %s because it was cancelled", rrdhost_hostname(host->rh)); + break; + } + + usec_t allotted_ut = (Cfg.train_every * host->rh->rrd_update_every * USEC_PER_SEC) / queue_size; + if (allotted_ut > USEC_PER_SEC) + allotted_ut = USEC_PER_SEC; + + usec_t start_ut = now_monotonic_usec(); + enum ml_training_result training_res; + { + worker_is_busy(WORKER_JOB_TRAINING_FIND); + ml_acquired_dimension_t acq_dim = ml_acquired_dimension_get(host->rh, training_req.chart_id, training_req.dimension_id); + + worker_is_busy(WORKER_JOB_TRAINING_TRAIN); + training_res = ml_acquired_dimension_train(acq_dim, training_req); + + string_freez(training_req.chart_id); + string_freez(training_req.dimension_id); + + ml_acquired_dimension_release(acq_dim); + } + usec_t consumed_ut = now_monotonic_usec() - start_ut; + + worker_is_busy(WORKER_JOB_TRAINING_STATS); + + usec_t remaining_ut = 0; + if (consumed_ut < allotted_ut) + remaining_ut = allotted_ut - consumed_ut; + + { + netdata_mutex_lock(&host->mutex); + + host->ts.queue_size += queue_size; + host->ts.num_popped_items += 1; + + host->ts.allotted_ut += allotted_ut; + host->ts.consumed_ut += consumed_ut; + host->ts.remaining_ut += remaining_ut; + + switch (training_res) { + case TRAINING_RESULT_OK: + host->ts.training_result_ok += 1; + break; + case TRAINING_RESULT_INVALID_QUERY_TIME_RANGE: + host->ts.training_result_invalid_query_time_range += 1; + break; + case TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES: + host->ts.training_result_not_enough_collected_values += 1; + break; + case TRAINING_RESULT_NULL_ACQUIRED_DIMENSION: + host->ts.training_result_null_acquired_dimension += 1; + break; + case TRAINING_RESULT_CHART_UNDER_REPLICATION: + host->ts.training_result_chart_under_replication += 1; + break; + } + + netdata_mutex_unlock(&host->mutex); + } + + worker_is_idle(); + std::this_thread::sleep_for(std::chrono::microseconds{remaining_ut}); + worker_is_busy(0); + } +} + +static void * +train_main(void *arg) +{ + size_t max_elements_needed_for_training = Cfg.max_train_samples * (Cfg.lag_n + 1); + tls_data.training_cns = new calculated_number_t[max_elements_needed_for_training](); + tls_data.scratch_training_cns = new calculated_number_t[max_elements_needed_for_training](); + + ml_host_t *host = (ml_host_t *) arg; + ml_host_train(host); + return NULL; } static void * @@ -804,10 +926,12 @@ ml_detect_main(void *arg) worker_register_job_name(WORKER_JOB_DETECTION_HOST_CHART, "host chart"); worker_register_job_name(WORKER_JOB_DETECTION_STATS, "training stats"); + service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, NULL, NULL, true); + heartbeat_t hb; heartbeat_init(&hb); - while (!Cfg.detection_stop) { + while (service_running((SERVICE_TYPE)(SERVICE_ML_PREDICTION | SERVICE_COLLECTORS))) { worker_is_idle(); heartbeat_next(&hb, USEC_PER_SEC); @@ -821,39 +945,6 @@ ml_detect_main(void *arg) ml_host_detect_once((ml_host_t *) rh->ml_host); } dfe_done(rhp); - - if (Cfg.enable_statistics_charts) { - // collect and update training thread stats - for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) { - ml_training_thread_t *training_thread = &Cfg.training_threads[idx]; - - netdata_mutex_lock(&training_thread->nd_mutex); - ml_training_stats_t training_stats = training_thread->training_stats; - training_thread->training_stats = {}; - netdata_mutex_unlock(&training_thread->nd_mutex); - - // calc the avg values - if (training_stats.num_popped_items) { - training_stats.queue_size /= training_stats.num_popped_items; - training_stats.allotted_ut /= training_stats.num_popped_items; - training_stats.consumed_ut /= training_stats.num_popped_items; - training_stats.remaining_ut /= training_stats.num_popped_items; - } else { - training_stats.queue_size = 0; - training_stats.allotted_ut = 0; - training_stats.consumed_ut = 0; - training_stats.remaining_ut = 0; - - training_stats.training_result_ok = 0; - training_stats.training_result_invalid_query_time_range = 0; - training_stats.training_result_not_enough_collected_values = 0; - training_stats.training_result_null_acquired_dimension = 0; - training_stats.training_result_chart_under_replication = 0; - } - - ml_update_training_statistics_chart(training_thread, training_stats); - } - } } return NULL; @@ -887,6 +978,31 @@ bool ml_streaming_enabled() return Cfg.stream_anomaly_detection_charts; } +void ml_init() +{ + // Read config values + ml_config_load(&Cfg); + + if (!Cfg.enable_anomaly_detection) + return; + + // Generate random numbers to efficiently sample the features we need + // for KMeans clustering. + std::random_device RD; + std::mt19937 Gen(RD()); + + Cfg.random_nums.reserve(Cfg.max_train_samples); + for (size_t Idx = 0; Idx != Cfg.max_train_samples; Idx++) + Cfg.random_nums.push_back(Gen()); + + + // start detection & training threads + char tag[NETDATA_THREAD_TAG_MAX + 1]; + + snprintfz(tag, NETDATA_THREAD_TAG_MAX, "%s", "PREDICT"); + netdata_thread_create(&Cfg.detection_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_detect_main, NULL); +} + void ml_host_new(RRDHOST *rh) { if (!ml_enabled(rh)) @@ -896,12 +1012,14 @@ void ml_host_new(RRDHOST *rh) host->rh = rh; host->mls = ml_machine_learning_stats_t(); - //host->ts = ml_training_stats_t(); - - static std::atomic times_called(0); - host->training_queue = Cfg.training_threads[times_called++ % Cfg.num_training_threads].training_queue; + host->ts = ml_training_stats_t(); host->host_anomaly_rate = 0.0; + host->threads_running = false; + host->threads_cancelled = false; + host->threads_joined = false; + + host->training_queue = ml_queue_init(); netdata_mutex_init(&host->mutex); @@ -915,6 +1033,7 @@ void ml_host_delete(RRDHOST *rh) return; netdata_mutex_destroy(&host->mutex); + ml_queue_destroy(host->training_queue); delete host; rh->ml_host = NULL; @@ -981,6 +1100,69 @@ void ml_host_get_models(RRDHOST *rh, BUFFER *wb) error("Fetching KMeans models is not supported yet"); } +void ml_host_start_training_thread(RRDHOST *rh) +{ + if (!rh || !rh->ml_host) + return; + + ml_host_t *host = (ml_host_t *) rh->ml_host; + + if (host->threads_running) { + error("Anomaly detections threads for host %s are already-up and running.", rrdhost_hostname(host->rh)); + return; + } + + host->threads_running = true; + host->threads_cancelled = false; + host->threads_joined = false; + + char tag[NETDATA_THREAD_TAG_MAX + 1]; + + snprintfz(tag, NETDATA_THREAD_TAG_MAX, "MLTR[%s]", rrdhost_hostname(host->rh)); + netdata_thread_create(&host->training_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast(host)); +} + +void ml_host_cancel_training_thread(RRDHOST *rh) +{ + if (!rh || !rh->ml_host) + return; + + ml_host_t *host = (ml_host_t *) rh->ml_host; + + if (!host->threads_running) { + error("Anomaly detections threads for host %s have already been stopped.", rrdhost_hostname(host->rh)); + return; + } + + if (!host->threads_cancelled) { + host->threads_cancelled = true; + + // Signal the training queue to stop popping-items + ml_queue_signal(host->training_queue); + netdata_thread_cancel(host->training_thread); + } +} + +void ml_host_stop_training_thread(RRDHOST *rh) +{ + if (!rh || !rh->ml_host) + return; + + ml_host_cancel_training_thread(rh); + + ml_host_t *host = (ml_host_t *) rh->ml_host; + + if (!host->threads_joined) { + host->threads_joined = true; + host->threads_running = false; + + delete[] tls_data.training_cns; + delete[] tls_data.scratch_training_cns; + + netdata_thread_join(host->training_thread, NULL); + } +} + void ml_chart_new(RRDSET *rs) { ml_host_t *host = (ml_host_t *) rs->rrdhost->ml_host; @@ -1085,185 +1267,3 @@ bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool return is_anomalous; } - -static void *ml_train_main(void *arg) { - ml_training_thread_t *training_thread = (ml_training_thread_t *) arg; - - char worker_name[1024]; - snprintfz(worker_name, 1024, "training_thread_%zu", training_thread->id); - worker_register("MLTRAIN"); - - worker_register_job_name(WORKER_TRAIN_QUEUE_POP, "pop queue"); - worker_register_job_name(WORKER_TRAIN_ACQUIRE_DIMENSION, "acquire"); - worker_register_job_name(WORKER_TRAIN_QUERY, "query"); - worker_register_job_name(WORKER_TRAIN_KMEANS, "kmeans"); - worker_register_job_name(WORKER_TRAIN_UPDATE_MODELS, "update models"); - worker_register_job_name(WORKER_TRAIN_RELEASE_DIMENSION, "release"); - worker_register_job_name(WORKER_TRAIN_UPDATE_HOST, "update host"); - - while (!Cfg.training_stop) { - worker_is_busy(WORKER_TRAIN_QUEUE_POP); - - ml_training_request_t training_req = ml_queue_pop(training_thread->training_queue); - - // we know this thread has been cancelled, when the queue starts - // returning "null" requests without blocking on queue's pop(). - if (training_req.host_id == NULL) - break; - - size_t queue_size = ml_queue_size(training_thread->training_queue) + 1; - - usec_t allotted_ut = (Cfg.train_every * USEC_PER_SEC) / queue_size; - if (allotted_ut > USEC_PER_SEC) - allotted_ut = USEC_PER_SEC; - - usec_t start_ut = now_monotonic_usec(); - - enum ml_training_result training_res; - { - worker_is_busy(WORKER_TRAIN_ACQUIRE_DIMENSION); - ml_acquired_dimension_t acq_dim = ml_acquired_dimension_get( - training_req.host_id, - training_req.chart_id, - training_req.dimension_id); - - training_res = ml_acquired_dimension_train(training_thread, acq_dim, training_req); - - string_freez(training_req.host_id); - string_freez(training_req.chart_id); - string_freez(training_req.dimension_id); - - worker_is_busy(WORKER_TRAIN_RELEASE_DIMENSION); - ml_acquired_dimension_release(acq_dim); - } - - usec_t consumed_ut = now_monotonic_usec() - start_ut; - - usec_t remaining_ut = 0; - if (consumed_ut < allotted_ut) - remaining_ut = allotted_ut - consumed_ut; - - if (Cfg.enable_statistics_charts) { - worker_is_busy(WORKER_TRAIN_UPDATE_HOST); - - netdata_mutex_lock(&training_thread->nd_mutex); - - training_thread->training_stats.queue_size += queue_size; - training_thread->training_stats.num_popped_items += 1; - - training_thread->training_stats.allotted_ut += allotted_ut; - training_thread->training_stats.consumed_ut += consumed_ut; - training_thread->training_stats.remaining_ut += remaining_ut; - - switch (training_res) { - case TRAINING_RESULT_OK: - training_thread->training_stats.training_result_ok += 1; - break; - case TRAINING_RESULT_INVALID_QUERY_TIME_RANGE: - training_thread->training_stats.training_result_invalid_query_time_range += 1; - break; - case TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES: - training_thread->training_stats.training_result_not_enough_collected_values += 1; - break; - case TRAINING_RESULT_NULL_ACQUIRED_DIMENSION: - training_thread->training_stats.training_result_null_acquired_dimension += 1; - break; - case TRAINING_RESULT_CHART_UNDER_REPLICATION: - training_thread->training_stats.training_result_chart_under_replication += 1; - break; - } - - netdata_mutex_unlock(&training_thread->nd_mutex); - } - - worker_is_idle(); - std::this_thread::sleep_for(std::chrono::microseconds{remaining_ut}); - } - - return NULL; -} - -void ml_init() -{ - // Read config values - ml_config_load(&Cfg); - - if (!Cfg.enable_anomaly_detection) - return; - - // Generate random numbers to efficiently sample the features we need - // for KMeans clustering. - std::random_device RD; - std::mt19937 Gen(RD()); - - Cfg.random_nums.reserve(Cfg.max_train_samples); - for (size_t Idx = 0; Idx != Cfg.max_train_samples; Idx++) - Cfg.random_nums.push_back(Gen()); - - - // start detection & training threads - Cfg.detection_stop = false; - Cfg.training_stop = false; - - char tag[NETDATA_THREAD_TAG_MAX + 1]; - - snprintfz(tag, NETDATA_THREAD_TAG_MAX, "%s", "PREDICT"); - netdata_thread_create(&Cfg.detection_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_detect_main, NULL); - - Cfg.training_threads.resize(Cfg.num_training_threads); - for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) { - ml_training_thread_t *training_thread = &Cfg.training_threads[idx]; - - - size_t max_elements_needed_for_training = Cfg.max_train_samples * (Cfg.lag_n + 1); - training_thread->training_cns = new calculated_number_t[max_elements_needed_for_training](); - training_thread->scratch_training_cns = new calculated_number_t[max_elements_needed_for_training](); - - training_thread->id = idx; - training_thread->training_queue = ml_queue_init(); - netdata_mutex_init(&training_thread->nd_mutex); - - snprintfz(tag, NETDATA_THREAD_TAG_MAX, "TRAIN[%zu]", training_thread->id); - netdata_thread_create(&training_thread->nd_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_train_main, training_thread); - } -} - -void ml_fini() -{ - Cfg.detection_stop = true; - Cfg.training_stop = true; - - netdata_thread_cancel(Cfg.detection_thread); - netdata_thread_join(Cfg.detection_thread, NULL); - - // signal the training queue of each thread - for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) { - ml_training_thread_t *training_thread = &Cfg.training_threads[idx]; - - ml_queue_signal(training_thread->training_queue); - } - - // cancel training threads - for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) { - ml_training_thread_t *training_thread = &Cfg.training_threads[idx]; - - netdata_thread_cancel(training_thread->nd_thread); - } - - // join training threads - for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) { - ml_training_thread_t *training_thread = &Cfg.training_threads[idx]; - - netdata_thread_join(training_thread->nd_thread, NULL); - } - - // clear training thread data - for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) { - ml_training_thread_t *training_thread = &Cfg.training_threads[idx]; - - delete[] training_thread->training_cns; - delete[] training_thread->scratch_training_cns; - ml_queue_destroy(training_thread->training_queue); - netdata_mutex_destroy(&training_thread->nd_mutex); - } -} diff --git a/ml/ml.h b/ml/ml.h index 7f35635ace..60c520d2e7 100644 --- a/ml/ml.h +++ b/ml/ml.h @@ -13,9 +13,7 @@ extern "C" { bool ml_capable(); bool ml_enabled(RRDHOST *rh); bool ml_streaming_enabled(); - void ml_init(void); -void ml_fini(void); void ml_host_new(RRDHOST *rh); void ml_host_delete(RRDHOST *rh); @@ -24,6 +22,10 @@ void ml_host_get_info(RRDHOST *RH, BUFFER *wb); void ml_host_get_detection_info(RRDHOST *RH, BUFFER *wb); void ml_host_get_models(RRDHOST *RH, BUFFER *wb); +void ml_host_start_training_thread(RRDHOST *rh); +void ml_host_cancel_training_thread(RRDHOST *rh); +void ml_host_stop_training_thread(RRDHOST *rh); + void ml_chart_new(RRDSET *rs); void ml_chart_delete(RRDSET *rs); bool ml_chart_update_begin(RRDSET *rs); @@ -33,8 +35,6 @@ void ml_dimension_new(RRDDIM *rd); void ml_dimension_delete(RRDDIM *rd); bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool exists); -void ml_update_global_statistics_charts(uint64_t models_consulted); - #ifdef __cplusplus }; #endif -- cgit v1.2.3