summaryrefslogtreecommitdiffstats
path: root/ml
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2023-03-21 18:31:56 +0200
committerGitHub <noreply@github.com>2023-03-21 18:31:56 +0200
commit5321ca8d1ef8d974a6a2b2128ca8804de6acb693 (patch)
tree05846664d6fdb9ba33845e9081a90dc5d7e60e26 /ml
parentcbdf236f0e62655a468fe07e2f3c00037bff327c (diff)
Revert "Use static thread-pool for training. (#14702)" (#14782)
This reverts commit 5046e034212c008557dd014196b6f6204eda24b2. Will re-apply once we investigate an issue that occurs during the shutdown of the agent.
Diffstat (limited to 'ml')
-rw-r--r--ml/Config.cc12
-rw-r--r--ml/ad_charts.cc167
-rw-r--r--ml/ad_charts.h2
-rw-r--r--ml/ml-dummy.c6
-rw-r--r--ml/ml-private.h36
-rw-r--r--ml/ml.cc546
-rw-r--r--ml/ml.h8
7 files changed, 358 insertions, 419 deletions
diff --git a/ml/Config.cc b/ml/Config.cc
index 415d11b838..8b04590d77 100644
--- a/ml/Config.cc
+++ b/ml/Config.cc
@@ -34,7 +34,7 @@ void ml_config_load(ml_config_t *cfg) {
unsigned smooth_n = config_get_number(config_section_ml, "num samples to smooth", 3);
unsigned lag_n = config_get_number(config_section_ml, "num samples to lag", 5);
- double random_sampling_ratio = config_get_float(config_section_ml, "random sampling ratio", 1.0 / 5.0 /* default lag_n */);
+ double random_sampling_ratio = config_get_float(config_section_ml, "random sampling ratio", 1.0 / lag_n);
unsigned max_kmeans_iters = config_get_number(config_section_ml, "maximum number of k-means iterations", 1000);
double dimension_anomaly_rate_threshold = config_get_float(config_section_ml, "dimension anomaly score threshold", 0.99);
@@ -43,10 +43,6 @@ void ml_config_load(ml_config_t *cfg) {
std::string anomaly_detection_grouping_method = config_get(config_section_ml, "anomaly detection grouping method", "average");
time_t anomaly_detection_query_duration = config_get_number(config_section_ml, "anomaly detection grouping duration", 5 * 60);
- size_t num_training_threads = config_get_number(config_section_ml, "num training threads", 4);
-
- bool enable_statistics_charts = config_get_boolean(config_section_ml, "enable statistics charts", false);
-
/*
* Clamp
*/
@@ -68,8 +64,6 @@ void ml_config_load(ml_config_t *cfg) {
host_anomaly_rate_threshold = clamp(host_anomaly_rate_threshold, 0.1, 10.0);
anomaly_detection_query_duration = clamp<time_t>(anomaly_detection_query_duration, 60, 15 * 60);
- num_training_threads = clamp<size_t>(num_training_threads, 1, 128);
-
/*
* Validate
*/
@@ -115,8 +109,4 @@ void ml_config_load(ml_config_t *cfg) {
cfg->sp_charts_to_skip = simple_pattern_create(cfg->charts_to_skip.c_str(), NULL, SIMPLE_PATTERN_EXACT, true);
cfg->stream_anomaly_detection_charts = config_get_boolean(config_section_ml, "stream anomaly detection charts", true);
-
- cfg->num_training_threads = num_training_threads;
-
- cfg->enable_statistics_charts = enable_statistics_charts;
}
diff --git a/ml/ad_charts.cc b/ml/ad_charts.cc
index 086cd5aa02..a32ff6c650 100644
--- a/ml/ad_charts.cc
+++ b/ml/ad_charts.cc
@@ -6,7 +6,7 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats
/*
* Machine learning status
*/
- if (Cfg.enable_statistics_charts) {
+ {
if (!host->machine_learning_status_rs) {
char id_buf[1024];
char name_buf[1024];
@@ -48,7 +48,7 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats
/*
* Metric type
*/
- if (Cfg.enable_statistics_charts) {
+ {
if (!host->metric_type_rs) {
char id_buf[1024];
char name_buf[1024];
@@ -90,7 +90,7 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats
/*
* Training status
*/
- if (Cfg.enable_statistics_charts) {
+ {
if (!host->training_status_rs) {
char id_buf[1024];
char name_buf[1024];
@@ -179,6 +179,7 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats
rrdset_done(host->dimensions_rs);
}
+
}
void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number AnomalyRate) {
@@ -300,20 +301,20 @@ void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number
}
}
-void ml_update_training_statistics_chart(ml_training_thread_t *training_thread, const ml_training_stats_t &ts) {
+void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stats_t &ts) {
/*
* queue stats
*/
{
- if (!training_thread->queue_stats_rs) {
+ if (!host->queue_stats_rs) {
char id_buf[1024];
char name_buf[1024];
- snprintfz(id_buf, 1024, "training_queue_%zu_stats", training_thread->id);
- snprintfz(name_buf, 1024, "training_queue_%zu_stats", training_thread->id);
+ snprintfz(id_buf, 1024, "queue_stats_on_%s", localhost->machine_guid);
+ snprintfz(name_buf, 1024, "queue_stats_on_%s", rrdhost_hostname(localhost));
- training_thread->queue_stats_rs = rrdset_create(
- localhost,
+ host->queue_stats_rs = rrdset_create(
+ host->rh,
"netdata", // type
id_buf, // id
name_buf, // name
@@ -327,35 +328,35 @@ void ml_update_training_statistics_chart(ml_training_thread_t *training_thread,
localhost->rrd_update_every, // update_every
RRDSET_TYPE_LINE// chart_type
);
- rrdset_flag_set(training_thread->queue_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION);
+ rrdset_flag_set(host->queue_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION);
- training_thread->queue_stats_queue_size_rd =
- rrddim_add(training_thread->queue_stats_rs, "queue_size", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- training_thread->queue_stats_popped_items_rd =
- rrddim_add(training_thread->queue_stats_rs, "popped_items", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ host->queue_stats_queue_size_rd =
+ rrddim_add(host->queue_stats_rs, "queue_size", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ host->queue_stats_popped_items_rd =
+ rrddim_add(host->queue_stats_rs, "popped_items", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
}
- rrddim_set_by_pointer(training_thread->queue_stats_rs,
- training_thread->queue_stats_queue_size_rd, ts.queue_size);
- rrddim_set_by_pointer(training_thread->queue_stats_rs,
- training_thread->queue_stats_popped_items_rd, ts.num_popped_items);
+ rrddim_set_by_pointer(host->queue_stats_rs,
+ host->queue_stats_queue_size_rd, ts.queue_size);
+ rrddim_set_by_pointer(host->queue_stats_rs,
+ host->queue_stats_popped_items_rd, ts.num_popped_items);
- rrdset_done(training_thread->queue_stats_rs);
+ rrdset_done(host->queue_stats_rs);
}
/*
* training stats
*/
{
- if (!training_thread->training_time_stats_rs) {
+ if (!host->training_time_stats_rs) {
char id_buf[1024];
char name_buf[1024];
- snprintfz(id_buf, 1024, "training_queue_%zu_time_stats", training_thread->id);
- snprintfz(name_buf, 1024, "training_queue_%zu_time_stats", training_thread->id);
+ snprintfz(id_buf, 1024, "training_time_stats_on_%s", localhost->machine_guid);
+ snprintfz(name_buf, 1024, "training_time_stats_on_%s", rrdhost_hostname(localhost));
- training_thread->training_time_stats_rs = rrdset_create(
- localhost,
+ host->training_time_stats_rs = rrdset_create(
+ host->rh,
"netdata", // type
id_buf, // id
name_buf, // name
@@ -369,39 +370,39 @@ void ml_update_training_statistics_chart(ml_training_thread_t *training_thread,
localhost->rrd_update_every, // update_every
RRDSET_TYPE_LINE// chart_type
);
- rrdset_flag_set(training_thread->training_time_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION);
-
- training_thread->training_time_stats_allotted_rd =
- rrddim_add(training_thread->training_time_stats_rs, "allotted", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
- training_thread->training_time_stats_consumed_rd =
- rrddim_add(training_thread->training_time_stats_rs, "consumed", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
- training_thread->training_time_stats_remaining_rd =
- rrddim_add(training_thread->training_time_stats_rs, "remaining", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
+ rrdset_flag_set(host->training_time_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION);
+
+ host->training_time_stats_allotted_rd =
+ rrddim_add(host->training_time_stats_rs, "allotted", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
+ host->training_time_stats_consumed_rd =
+ rrddim_add(host->training_time_stats_rs, "consumed", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
+ host->training_time_stats_remaining_rd =
+ rrddim_add(host->training_time_stats_rs, "remaining", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
}
- rrddim_set_by_pointer(training_thread->training_time_stats_rs,
- training_thread->training_time_stats_allotted_rd, ts.allotted_ut);
- rrddim_set_by_pointer(training_thread->training_time_stats_rs,
- training_thread->training_time_stats_consumed_rd, ts.consumed_ut);
- rrddim_set_by_pointer(training_thread->training_time_stats_rs,
- training_thread->training_time_stats_remaining_rd, ts.remaining_ut);
+ rrddim_set_by_pointer(host->training_time_stats_rs,
+ host->training_time_stats_allotted_rd, ts.allotted_ut);
+ rrddim_set_by_pointer(host->training_time_stats_rs,
+ host->training_time_stats_consumed_rd, ts.consumed_ut);
+ rrddim_set_by_pointer(host->training_time_stats_rs,
+ host->training_time_stats_remaining_rd, ts.remaining_ut);
- rrdset_done(training_thread->training_time_stats_rs);
+ rrdset_done(host->training_time_stats_rs);
}
/*
* training result stats
*/
{
- if (!training_thread->training_results_rs) {
+ if (!host->training_results_rs) {
char id_buf[1024];
char name_buf[1024];
- snprintfz(id_buf, 1024, "training_queue_%zu_results", training_thread->id);
- snprintfz(name_buf, 1024, "training_queue_%zu_results", training_thread->id);
+ snprintfz(id_buf, 1024, "training_results_on_%s", localhost->machine_guid);
+ snprintfz(name_buf, 1024, "training_results_on_%s", rrdhost_hostname(localhost));
- training_thread->training_results_rs = rrdset_create(
- localhost,
+ host->training_results_rs = rrdset_create(
+ host->rh,
"netdata", // type
id_buf, // id
name_buf, // name
@@ -415,61 +416,31 @@ void ml_update_training_statistics_chart(ml_training_thread_t *training_thread,
localhost->rrd_update_every, // update_every
RRDSET_TYPE_LINE// chart_type
);
- rrdset_flag_set(training_thread->training_results_rs, RRDSET_FLAG_ANOMALY_DETECTION);
-
- training_thread->training_results_ok_rd =
- rrddim_add(training_thread->training_results_rs, "ok", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- training_thread->training_results_invalid_query_time_range_rd =
- rrddim_add(training_thread->training_results_rs, "invalid-queries", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- training_thread->training_results_not_enough_collected_values_rd =
- rrddim_add(training_thread->training_results_rs, "not-enough-values", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- training_thread->training_results_null_acquired_dimension_rd =
- rrddim_add(training_thread->training_results_rs, "null-acquired-dimensions", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- training_thread->training_results_chart_under_replication_rd =
- rrddim_add(training_thread->training_results_rs, "chart-under-replication", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rrdset_flag_set(host->training_results_rs, RRDSET_FLAG_ANOMALY_DETECTION);
+
+ host->training_results_ok_rd =
+ rrddim_add(host->training_results_rs, "ok", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ host->training_results_invalid_query_time_range_rd =
+ rrddim_add(host->training_results_rs, "invalid-queries", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ host->training_results_not_enough_collected_values_rd =
+ rrddim_add(host->training_results_rs, "not-enough-values", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ host->training_results_null_acquired_dimension_rd =
+ rrddim_add(host->training_results_rs, "null-acquired-dimensions", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ host->training_results_chart_under_replication_rd =
+ rrddim_add(host->training_results_rs, "chart-under-replication", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
}
- rrddim_set_by_pointer(training_thread->training_results_rs,
- training_thread->training_results_ok_rd, ts.training_result_ok);
- rrddim_set_by_pointer(training_thread->training_results_rs,
- training_thread->training_results_invalid_query_time_range_rd, ts.training_result_invalid_query_time_range);
- rrddim_set_by_pointer(training_thread->training_results_rs,
- training_thread->training_results_not_enough_collected_values_rd, ts.training_result_not_enough_collected_values);
- rrddim_set_by_pointer(training_thread->training_results_rs,
- training_thread->training_results_null_acquired_dimension_rd, ts.training_result_null_acquired_dimension);
- rrddim_set_by_pointer(training_thread->training_results_rs,
- training_thread->training_results_chart_under_replication_rd, ts.training_result_chart_under_replication);
-
- rrdset_done(training_thread->training_results_rs);
- }
-}
-
-void ml_update_global_statistics_charts(uint64_t models_consulted) {
- if (Cfg.enable_statistics_charts) {
- static RRDSET *st = NULL;
- static RRDDIM *rd = NULL;
-
- if (unlikely(!st)) {
- st = rrdset_create_localhost(
- "netdata" // type
- , "ml_models_consulted" // id
- , NULL // name
- , NETDATA_ML_CHART_FAMILY // family
- , NULL // context
- , "KMeans models used for prediction" // title
- , "models" // units
- , NETDATA_ML_PLUGIN // plugin
- , NETDATA_ML_MODULE_DETECTION // module
- , NETDATA_ML_CHART_PRIO_MACHINE_LEARNING_STATUS // priority
- , localhost->rrd_update_every // update_every
- , RRDSET_TYPE_AREA // chart_type
- );
-
- rd = rrddim_add(st, "num_models_consulted", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
- }
-
- rrddim_set_by_pointer(st, rd, (collected_number) models_consulted);
-
- rrdset_done(st);
+ rrddim_set_by_pointer(host->training_results_rs,
+ host->training_results_ok_rd, ts.training_result_ok);
+ rrddim_set_by_pointer(host->training_results_rs,
+ host->training_results_invalid_query_time_range_rd, ts.training_result_invalid_query_time_range);
+ rrddim_set_by_pointer(host->training_results_rs,
+ host->training_results_not_enough_collected_values_rd, ts.training_result_not_enough_collected_values);
+ rrddim_set_by_pointer(host->training_results_rs,
+ host->training_results_null_acquired_dimension_rd, ts.training_result_null_acquired_dimension);
+ rrddim_set_by_pointer(host->training_results_rs,
+ host->training_results_chart_under_replication_rd, ts.training_result_chart_under_replication);
+
+ rrdset_done(host->training_results_rs);
}
}
diff --git a/ml/ad_charts.h b/ml/ad_charts.h
index 349b369a24..a973b44a51 100644
--- a/ml/ad_charts.h
+++ b/ml/ad_charts.h
@@ -9,6 +9,6 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats
void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number anomaly_rate);
-void ml_update_training_statistics_chart(ml_training_thread_t *training_thread, const ml_training_stats_t &ts);
+void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stats_t &ts);
#endif /* ML_ADCHARTS_H */
diff --git a/ml/ml-dummy.c b/ml/ml-dummy.c
index 8db252a841..53444e246f 100644
--- a/ml/ml-dummy.c
+++ b/ml/ml-dummy.c
@@ -19,8 +19,6 @@ bool ml_streaming_enabled() {
void ml_init(void) {}
-void ml_fini(void) {}
-
void ml_host_new(RRDHOST *rh) {
UNUSED(rh);
}
@@ -88,8 +86,4 @@ bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool
return false;
}
-void ml_update_global_statistics_charts(uint64_t models_consulted) {
- UNUSED(models_consulted);
-}
-
#endif
diff --git a/ml/ml-private.h b/ml/ml-private.h
index 8535c9262d..173b82e265 100644
--- a/ml/ml-private.h
+++ b/ml/ml-private.h
@@ -33,7 +33,6 @@ typedef struct {
/*
* KMeans
*/
-
typedef struct {
size_t num_clusters;
size_t max_iterations;
@@ -124,7 +123,6 @@ enum ml_training_result {
typedef struct {
// Chart/dimension we want to train
- STRING *host_id;
STRING *chart_id;
STRING *dimension_id;
@@ -170,7 +168,6 @@ typedef struct {
/*
* Queue
*/
-
typedef struct {
std::queue<ml_training_request_t> internal;
netdata_mutex_t mutex;
@@ -178,6 +175,7 @@ typedef struct {
std::atomic<bool> exit;
} ml_queue_t;
+
typedef struct {
RRDDIM *rd;
@@ -209,13 +207,20 @@ typedef struct {
RRDHOST *rh;
ml_machine_learning_stats_t mls;
+ ml_training_stats_t ts;
calculated_number_t host_anomaly_rate;
- netdata_mutex_t mutex;
+ std::atomic<bool> threads_running;
+ std::atomic<bool> threads_cancelled;
+ std::atomic<bool> threads_joined;
ml_queue_t *training_queue;
+ netdata_mutex_t mutex;
+
+ netdata_thread_t training_thread;
+
/*
* bookkeeping for anomaly detection charts
*/
@@ -244,19 +249,6 @@ typedef struct {
RRDSET *detector_events_rs;
RRDDIM *detector_events_above_threshold_rd;
RRDDIM *detector_events_new_anomaly_event_rd;
-} ml_host_t;
-
-typedef struct {
- size_t id;
- netdata_thread_t nd_thread;
- netdata_mutex_t nd_mutex;
-
- ml_queue_t *training_queue;
- ml_training_stats_t training_stats;
-
- calculated_number_t *training_cns;
- calculated_number_t *scratch_training_cns;
- std::vector<DSample> training_samples;
RRDSET *queue_stats_rs;
RRDDIM *queue_stats_queue_size_rd;
@@ -273,7 +265,7 @@ typedef struct {
RRDDIM *training_results_not_enough_collected_values_rd;
RRDDIM *training_results_null_acquired_dimension_rd;
RRDDIM *training_results_chart_under_replication_rd;
-} ml_training_thread_t;
+} ml_host_t;
typedef struct {
bool enable_anomaly_detection;
@@ -310,14 +302,6 @@ typedef struct {
std::vector<uint32_t> random_nums;
netdata_thread_t detection_thread;
- std::atomic<bool> detection_stop;
-
- size_t num_training_threads;
-
- std::vector<ml_training_thread_t> training_threads;
- std::atomic<bool> training_stop;
-
- bool enable_statistics_charts;
} ml_config_t;
void ml_config_load(ml_config_t *cfg);
diff --git a/ml/ml.cc b/ml/ml.cc
index c7d4671c04..cf9ea379a6 100644
--- a/ml/ml.cc
+++ b/ml/ml.cc
@@ -8,13 +8,14 @@
#include "ad_charts.h"
-#define WORKER_TRAIN_QUEUE_POP 0
-#define WORKER_TRAIN_ACQUIRE_DIMENSION 1
-#define WORKER_TRAIN_QUERY 2
-#define WORKER_TRAIN_KMEANS 3
-#define WORKER_TRAIN_UPDATE_MODELS 4
-#define WORKER_TRAIN_RELEASE_DIMENSION 5
-#define WORKER_TRAIN_UPDATE_HOST 6
+typedef struct {
+ calculated_number_t *training_cns;
+ calculated_number_t *scratch_training_cns;
+
+ std::vector<DSample> training_samples;
+} ml_tls_data_t;
+
+static thread_local ml_tls_data_t tls_data;
/*
* Functions to convert enums to strings
@@ -263,14 +264,7 @@ ml_queue_pop(ml_queue_t *q)
{
netdata_mutex_lock(&q->mutex);
- ml_training_request_t req = {
- NULL, // host_id
- NULL, // chart id
- NULL, // dimension id
- 0, // current time
- 0, // first entry
- 0 // last entry
- };
+ ml_training_request_t req = { NULL, NULL, 0, 0, 0 };
while (q->internal.empty()) {
pthread_cond_wait(&q->cond_var, &q->mutex);
@@ -313,7 +307,7 @@ ml_queue_signal(ml_queue_t *q)
*/
static std::pair<calculated_number_t *, ml_training_response_t>
-ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request)
+ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t &training_request)
{
ml_training_response_t training_response = {};
@@ -357,7 +351,7 @@ ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimens
STORAGE_PRIORITY_BEST_EFFORT);
size_t idx = 0;
- memset(training_thread->training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1));
+ memset(tls_data.training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1));
calculated_number_t last_value = std::numeric_limits<calculated_number_t>::quiet_NaN();
while (!ops->is_finished(&handle)) {
@@ -374,11 +368,11 @@ ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimens
training_response.db_after_t = timestamp;
training_response.db_before_t = timestamp;
- training_thread->training_cns[idx] = value;
- last_value = training_thread->training_cns[idx];
+ tls_data.training_cns[idx] = value;
+ last_value = tls_data.training_cns[idx];
training_response.collected_values++;
} else
- training_thread->training_cns[idx] = last_value;
+ tls_data.training_cns[idx] = last_value;
idx++;
}
@@ -393,21 +387,20 @@ ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimens
}
// Find first non-NaN value.
- for (idx = 0; std::isnan(training_thread->training_cns[idx]); idx++, training_response.total_values--) { }
+ for (idx = 0; std::isnan(tls_data.training_cns[idx]); idx++, training_response.total_values--) { }
// Overwrite NaN values.
if (idx != 0)
- memmove(training_thread->training_cns, &training_thread->training_cns[idx], sizeof(calculated_number_t) * training_response.total_values);
+ memmove(tls_data.training_cns, &tls_data.training_cns[idx], sizeof(calculated_number_t) * training_response.total_values);
training_response.result = TRAINING_RESULT_OK;
- return { training_thread->training_cns, training_response };
+ return { tls_data.training_cns, training_response };
}
static enum ml_training_result
-ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request)
+ml_dimension_train_model(ml_dimension_t *dim, const ml_training_request_t &training_request)
{
- worker_is_busy(WORKER_TRAIN_QUERY);
- auto P = ml_dimension_calculated_numbers(training_thread, dim, training_request);
+ auto P = ml_dimension_calculated_numbers(dim, training_request);
ml_training_response_t training_response = P.second;
if (training_response.result != TRAINING_RESULT_OK) {
@@ -436,16 +429,15 @@ ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *
}
// compute kmeans
- worker_is_busy(WORKER_TRAIN_KMEANS);
{
- memcpy(training_thread->scratch_training_cns, training_thread->training_cns,
+ memcpy(tls_data.scratch_training_cns, tls_data.training_cns,
training_response.total_values * sizeof(calculated_number_t));
ml_features_t features = {
Cfg.diff_n, Cfg.smooth_n, Cfg.lag_n,
- training_thread->scratch_training_cns, training_response.total_values,
- training_thread->training_cns, training_response.total_values,
- training_thread->training_samples
+ tls_data.scratch_training_cns, training_response.total_values,
+ tls_data.training_cns, training_response.total_values,
+ tls_data.training_samples
};
ml_features_preprocess(&features);
@@ -454,7 +446,6 @@ ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *
}
// update kmeans models
- worker_is_busy(WORKER_TRAIN_UPDATE_MODELS);
{
netdata_mutex_lock(&dim->mutex);
@@ -506,16 +497,11 @@ ml_dimension_schedule_for_training(ml_dimension_t *dim, time_t curr_time)
}
if (schedule_for_training) {
+ ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host;
ml_training_request_t req = {
- string_dup(dim->rd->rrdset->rrdhost->hostname),
- string_dup(dim->rd->rrdset->id),
- string_dup(dim->rd->id),
- curr_time,
- rrddim_first_entry_s(dim->rd),
- rrddim_last_entry_s(dim->rd),
+ string_dup(dim->rd->rrdset->id), string_dup(dim->rd->id),
+ curr_time, rrddim_first_entry_s(dim->rd), rrddim_last_entry_s(dim->rd),
};
-
- ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host;
ml_queue_push(host->training_queue, req);
}
}
@@ -691,6 +677,7 @@ ml_host_detect_once(ml_host_t *host)
host->mls = {};
ml_machine_learning_stats_t mls_copy = {};
+ ml_training_stats_t ts_copy = {};
{
netdata_mutex_lock(&host->mutex);
@@ -734,14 +721,54 @@ ml_host_detect_once(ml_host_t *host)
mls_copy = host->mls;
+ /*
+ * training stats
+ */
+ ts_copy = host->ts;
+
+ host->ts.queue_size = 0;
+ host->ts.num_popped_items = 0;
+
+ host->ts.allotted_ut = 0;
+ host->ts.consumed_ut = 0;
+ host->ts.remaining_ut = 0;
+
+ host->ts.training_result_ok = 0;
+ host->ts.training_result_invalid_query_time_range = 0;
+ host->ts.training_result_not_enough_collected_values = 0;
+ host->ts.training_result_null_acquired_dimension = 0;
+ host->ts.training_result_chart_under_replication = 0;
+
netdata_mutex_unlock(&host->mutex);
}
+ // Calc the avg values
+ if (ts_copy.num_popped_items) {
+ ts_copy.queue_size /= ts_copy.num_popped_items;
+ ts_copy.allotted_ut /= ts_copy.num_popped_items;
+ ts_copy.consumed_ut /= ts_copy.num_popped_items;
+ ts_copy.remaining_ut /= ts_copy.num_popped_items;
+
+ ts_copy.training_result_ok /= ts_copy.num_popped_items;
+ ts_copy.training_result_invalid_query_time_range /= ts_copy.num_popped_items;
+ ts_copy.training_result_not_enough_collected_values /= ts_copy.num_popped_items;
+ ts_copy.training_result_null_acquired_dimension /= ts_copy.num_popped_items;
+ ts_copy.training_result_chart_under_replication /= ts_copy.num_popped_items;
+ } else {
+ ts_copy.queue_size = 0;
+ ts_copy.allotted_ut = 0;
+ ts_copy.consumed_ut = 0;
+ ts_copy.remaining_ut = 0;
+ }
+
worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART);
ml_update_dimensions_chart(host, mls_copy);
worker_is_busy(WORKER_JOB_DETECTION_HOST_CHART);
ml_update_host_and_detection_rate_charts(host, host->host_anomaly_rate * 10000.0);
+
+ worker_is_busy(WORKER_JOB_DETECTION_STATS);
+ ml_update_training_statistics_chart(host, ts_copy);
}
typedef struct {
@@ -750,21 +777,18 @@ typedef struct {
} ml_acquired_dimension_t;
static ml_acquired_dimension_t
-ml_acquired_dimension_get(STRING *host_id, STRING *chart_id, STRING *dimension_id)
+ml_acquired_dimension_get(RRDHOST *rh, STRING *chart_id, STRING *dimension_id)
{
RRDDIM_ACQUIRED *acq_rd = NULL;
ml_dimension_t *dim = NULL;
- RRDHOST *rh = rrdhost_find_by_hostname(string2str(host_id));
- if (rh) {
- RRDSET *rs = rrdset_find(rh, string2str(chart_id));
- if (rs) {
- acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id));
- if (acq_rd) {
- RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd);
- if (rd)
- dim = (ml_dimension_t *) rd->ml_dimension;
- }
+ RRDSET *rs = rrdset_find(rh, string2str(chart_id));
+ if (rs) {
+ acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id));
+ if (acq_rd) {
+ RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd);
+ if (rd)
+ dim = (ml_dimension_t *) rd->ml_dimension;
}
}
@@ -785,12 +809,110 @@ ml_acquired_dimension_release(ml_acquired_dimension_t acq_dim)
}
static enum ml_training_result
-ml_acquired_dimension_train(ml_training_thread_t *training_thread, ml_acquired_dimension_t acq_dim, const ml_training_request_t &tr)
+ml_acquired_dimension_train(ml_acquired_dimension_t acq_dim, const ml_training_request_t &TR)
{
if (!acq_dim.dim)
return TRAINING_RESULT_NULL_ACQUIRED_DIMENSION;
- return ml_dimension_train_model(training_thread, acq_dim.dim, tr);
+ return ml_dimension_train_model(acq_dim.dim, TR);
+}
+
+#define WORKER_JOB_TRAINING_FIND 0
+#define WORKER_JOB_TRAINING_TRAIN 1
+#define WORKER_JOB_TRAINING_STATS 2
+
+static void
+ml_host_train(ml_host_t *host)
+{
+ worker_register("MLTRAIN");
+ worker_register_job_name(WORKER_JOB_TRAINING_FIND, "find");
+ worker_register_job_name(WORKER_JOB_TRAINING_TRAIN, "train");
+ worker_register_job_name(WORKER_JOB_TRAINING_STATS, "stats");
+
+ service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t ) ml_host_cancel_training_thread, host->rh, true);
+
+ while (service_running(SERVICE_ML_TRAINING)) {
+ ml_training_request_t training_req = ml_queue_pop(host->training_queue);
+ size_t queue_size = ml_queue_size(host->training_queue) + 1;
+
+ if (host->threads_cancelled) {
+ info("Stopping training thread for host %s because it was cancelled", rrdhost_hostname(host->rh));
+ break;
+ }
+
+ usec_t allotted_ut = (Cfg.train_every * host->rh->rrd_update_every * USEC_PER_SEC) / queue_size;
+ if (allotted_ut > USEC_PER_SEC)
+ allotted_ut = USEC_PER_SEC;
+
+ usec_t start_ut = now_monotonic_usec();
+ enum ml_training_result training_res;
+ {
+ worker_is_busy(WORKER_JOB_TRAINING_FIND);
+ ml_acquired_dimension_t acq_dim = ml_acquired_dimension_get(host->rh, training_req.chart_id, training_req.dimension_id);
+
+ worker_is_busy(WORKER_JOB_TRAINING_TRAIN);
+ training_res = ml_acquired_dimension_train(acq_dim, training_req);
+
+ string_freez(training_req.chart_id);
+ string_freez(training_req.dimension_id);
+
+ ml_acquired_dimension_release(acq_dim);
+ }
+ usec_t consumed_ut = now_monotonic_usec() - start_ut;
+
+ worker_is_busy(WORKER_JOB_TRAINING_STATS);
+
+ usec_t remaining_ut = 0;
+ if (consumed_ut < allotted_ut)
+ remaining_ut = allotted_ut - consumed_ut;
+
+ {
+ netdata_mutex_lock(&host->mutex);
+
+ host->ts.queue_size += queue_size;
+ host->ts.num_popped_items += 1;
+
+ host->ts.allotted_ut += allotted_ut;
+ host->ts.consumed_ut += consumed_ut;
+ host->ts.remaining_ut += remaining_ut;
+
+ switch (training_res) {
+ case TRAINING_RESULT_OK:
+ host->ts.training_result_ok += 1;
+ break;
+ case TRAINING_RESULT_INVALID_QUERY_TIME_RANGE:
+ host->ts.training_result_invalid_query_time_range += 1;
+ break;
+ case TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES:
+ host->ts.training_result_not_enough_collected_values += 1;
+ break;
+ case TRAINING_RESULT_NULL_ACQUIRED_DIMENSION:
+ host->ts.training_result_null_acquired_dimension += 1;
+ break;
+ case TRAINING_RESULT_CHART_UNDER_REPLICATION:
+ host->ts.training_result_chart_under_replication += 1;
+ break;
+ }
+
+ netdata_mutex_unlock(&host->mutex);
+ }
+
+ worker_is_idle();
+ std::this_thread::sleep_for(std::chrono::microseconds{remaining_ut});
+ worker_is_busy(0);
+ }
+}
+
+static void *
+train_main(void *arg)
+{
+ size_t max_elements_needed_for_training = Cfg.max_train_samples * (Cfg.lag_n + 1);
+ tls_data.training_cns = new calculated_number_t[max_elements_needed_for_training]();
+ tls_data.scratch_training_cns = new calculated_number_t[max_elements_needed_for_training]();
+
+ ml_host_t *host = (ml_host_t *) arg;
+ ml_host_train(host);
+ return NULL;
}
static void *
@@ -804,10 +926,12 @@ ml_detect_main(void *arg)
worker_register_job_name(WORKER_JOB_DETECTION_HOST_CHART, "host chart");
worker_register_job_name(WORKER_JOB_DETECTION_STATS, "training stats");
+ service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, NULL, NULL, true);
+
heartbeat_t hb;
heartbeat_init(&hb);
- while (!Cfg.detection_stop) {
+ while (service_running((SERVICE_TYPE)(SERVICE_ML_PREDICTION | SERVICE_COLLECTORS))) {
worker_is_idle();
heartbeat_next(&hb, USEC_PER_SEC);
@@ -821,39 +945,6 @@ ml_detect_main(void *arg)
ml_host_detect_once((ml_host_t *) rh->ml_host);
}
dfe_done(rhp);
-
- if (Cfg.enable_statistics_charts) {
- // collect and update training thread stats
- for (size_t idx = 0; idx != Cfg.num_training_threads; idx